Skip to content

Commit

Permalink
add condition to result transaction to ensure ordering
Browse files Browse the repository at this point in the history
Co-authored-by: Chris Ainsworth-Patrick <[email protected]>
  • Loading branch information
Paul Norton and chrispatrick committed Jan 8, 2024
1 parent 8244497 commit 52a2b35
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 48 deletions.
19 changes: 10 additions & 9 deletions policy/data/gql_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
b64 "encoding/base64"
"encoding/json"
"fmt"
"github.com/atomist-skills/go-skill/policy/goals"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -36,33 +37,33 @@ type (
}

AsyncResultMetadata struct {
SubscriptionResults [][]edn.RawMessage `edn:"subscription"`
AsyncQueryResults map[string]AsyncQueryResponse `edn:"results"`
InFlightQueryName string `edn:"query-name"`
EvaluationMetadata goals.EvaluationMetadata `edn:"evalMeta"`
AsyncQueryResults map[string]AsyncQueryResponse `edn:"results"`
InFlightQueryName string `edn:"query-name"`
}

AsyncDataSource struct {
multipleQuerySupport bool
log skill.Logger
url string
token string
subscriptionResults [][]edn.RawMessage
evaluationMetadata goals.EvaluationMetadata
asyncResults map[string]AsyncQueryResponse
}
)

func NewAsyncDataSource(
multipleQuerySupport bool,
req skill.RequestContext,
subscriptionResults [][]edn.RawMessage,
evaluationMetadata goals.EvaluationMetadata,
asyncResults map[string]AsyncQueryResponse,
) AsyncDataSource {
return AsyncDataSource{
multipleQuerySupport: multipleQuerySupport,
log: req.Log,
url: fmt.Sprintf("%s:enqueue", req.Event.Urls.Graphql),
token: req.Event.Token,
subscriptionResults: subscriptionResults,
evaluationMetadata: evaluationMetadata,
asyncResults: asyncResults,
}
}
Expand All @@ -86,9 +87,9 @@ func (ds AsyncDataSource) Query(ctx context.Context, queryName string, query str
}

metadata := AsyncResultMetadata{
SubscriptionResults: ds.subscriptionResults,
AsyncQueryResults: ds.asyncResults,
InFlightQueryName: queryName,
EvaluationMetadata: ds.evaluationMetadata,
AsyncQueryResults: ds.asyncResults,
InFlightQueryName: queryName,
}
metadataEdn, err := edn.Marshal(metadata)
if err != nil {
Expand Down
15 changes: 10 additions & 5 deletions policy/goals/differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ import (
// It returns the storage id for the current query results.
func GoalResultsDiffer(log skill.Logger, queryResults []GoalEvaluationQueryResult, digest string, goal Goal, previousStorageId string) (bool, string, error) {
log.Infof("Generating storage id for goal %s, image %s", goal.Definition, digest)
hash, err := hashstructure.Hash(queryResults, hashstructure.FormatV2, nil)
if err != nil {
return false, "", fmt.Errorf("failed to generate storage id for goal %s, image %s: %s", goal.Definition, digest, err)
}

storageId := fmt.Sprint(hash)
storageId := "no-data"

if queryResults != nil {
hash, err := hashstructure.Hash(queryResults, hashstructure.FormatV2, nil)
if err != nil {
return false, "", fmt.Errorf("failed to generate storage id for goal %s, image %s: %s", goal.Definition, digest, err)
}

storageId = fmt.Sprint(hash)
}

differ := storageId != previousStorageId

Expand Down
7 changes: 6 additions & 1 deletion policy/goals/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package goals

import "time"

func CreateEntitiesFromResults(results []GoalEvaluationQueryResult, goalDefinition string, goalConfiguration string, image string, storageId string, configHash string, evaluationTs time.Time) GoalEvaluationResultEntity {
func CreateEntitiesFromResults(results []GoalEvaluationQueryResult, goalDefinition string, goalConfiguration string, image string, storageId string, configHash string, evaluationTs time.Time, tx int64) GoalEvaluationResultEntity {
return GoalEvaluationResultEntity{
Definition: goalDefinition,
Configuration: goalConfiguration,
Expand All @@ -27,5 +27,10 @@ func CreateEntitiesFromResults(results []GoalEvaluationQueryResult, goalDefiniti
StorageId: storageId,
ConfigHash: configHash,
CreatedAt: evaluationTs,
TransactionCondition: TransactionConditionEntity{
Args: map[string]interface{}{"tx-arg": tx},
Where: []byte(`[[?entity :goal.result/created-at _ ?tx true]
[(< ?tx ?tx-arg)]]`),
},
}
}
2 changes: 1 addition & 1 deletion policy/goals/entities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestCreateEntitiesFromResult(t *testing.T) {

evaluationTs := time.Date(2023, 7, 10, 20, 1, 41, 0, time.UTC)

entity := CreateEntitiesFromResults(resultModel, "test-definition", "test-configuration", "test-image", "storage-id", "config-hash", evaluationTs)
entity := CreateEntitiesFromResults(resultModel, "test-definition", "test-configuration", "test-image", "storage-id", "config-hash", evaluationTs, 123)

if entity.Definition != "test-definition" || entity.Configuration != "test-configuration" || entity.StorageId != "storage-id" || entity.CreatedAt.Format("2006-01-02T15:04:05.000Z") != "2023-07-10T20:01:41.000Z" {
t.Errorf("metadata not set correctly")
Expand Down
27 changes: 19 additions & 8 deletions policy/goals/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ type (
}

GoalEvaluationResultEntity struct {
skill.Entity `entity-type:"goal/result"`
Definition string `edn:"goal.definition/name"`
Configuration string `edn:"goal.configuration/name"`
Subject DockerImageEntity `edn:"goal.result/subject"`
DeviationCount int `edn:"goal.result/deviation-count"`
StorageId string `edn:"goal.result/storage-id"`
ConfigHash string `edn:"goal.result/config-hash"`
CreatedAt time.Time `edn:"goal.result/created-at"`
skill.Entity `entity-type:"goal/result"`
Definition string `edn:"goal.definition/name"`
Configuration string `edn:"goal.configuration/name"`
Subject DockerImageEntity `edn:"goal.result/subject"`
DeviationCount int `edn:"goal.result/deviation-count"`
StorageId string `edn:"goal.result/storage-id"`
ConfigHash string `edn:"goal.result/config-hash"`
CreatedAt time.Time `edn:"goal.result/created-at"`
TransactionCondition TransactionConditionEntity `edn:"atomist/tx-iff"`
}

TransactionConditionEntity struct {
Args map[string]interface{} `edn:"args"`
Where edn.RawMessage `edn:"where"`
}

ImagePlatform struct {
Expand All @@ -61,6 +67,11 @@ type (
ImagePlatforms []ImagePlatform `edn:"docker.image/platform" json:"platforms"`
}

EvaluationMetadata struct {
SubscriptionResult [][]edn.RawMessage `edn:"subscription-result"`
SubscriptionTx int64 `edn:"subscription-tx"`
}

GoalEvaluator interface {
EvaluateGoal(ctx context.Context, req skill.RequestContext, commonData CommonSubscriptionQueryResult, subscriptionResults [][]edn.RawMessage) ([]GoalEvaluationQueryResult, error)
}
Expand Down
11 changes: 6 additions & 5 deletions policy/policy_handler/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
b64 "encoding/base64"
"fmt"
"github.com/atomist-skills/go-skill/policy/goals"

"github.com/atomist-skills/go-skill"
"github.com/atomist-skills/go-skill/policy/data"
Expand Down Expand Up @@ -39,7 +40,7 @@ func withAsync() Opt {
}
}

func getAsyncSubscriptionData(ctx context.Context, req skill.RequestContext) ([][]edn.RawMessage, skill.Configuration, error) {
func getAsyncSubscriptionData(ctx context.Context, req skill.RequestContext) (*goals.EvaluationMetadata, skill.Configuration, error) {
if req.Event.Context.AsyncQueryResult.Name != eventNameAsyncQuery {
return nil, skill.Configuration{}, nil
}
Expand All @@ -55,18 +56,18 @@ func getAsyncSubscriptionData(ctx context.Context, req skill.RequestContext) ([]
return nil, skill.Configuration{}, fmt.Errorf("failed to unmarshal async metadata: %w", err)
}

return metadata.SubscriptionResults, req.Event.Context.AsyncQueryResult.Configuration, nil
return &metadata.EvaluationMetadata, req.Event.Context.AsyncQueryResult.Configuration, nil
}

func buildAsyncDataSources(multipleQuerySupport bool) dataSourceProvider {
return func(ctx context.Context, req skill.RequestContext) ([]data.DataSource, error) {
return func(ctx context.Context, req skill.RequestContext, evalMeta goals.EvaluationMetadata) ([]data.DataSource, error) {
if req.Event.Context.SyncRequest.Name == eventNameLocalEval {
return []data.DataSource{}, nil
}

if req.Event.Context.AsyncQueryResult.Name != eventNameAsyncQuery {
return []data.DataSource{
data.NewAsyncDataSource(multipleQuerySupport, req, req.Event.Context.Subscription.Result, map[string]data.AsyncQueryResponse{}),
data.NewAsyncDataSource(multipleQuerySupport, req, evalMeta, map[string]data.AsyncQueryResponse{}),
}, nil
}

Expand All @@ -86,7 +87,7 @@ func buildAsyncDataSources(multipleQuerySupport bool) dataSourceProvider {
metadata.AsyncQueryResults[metadata.InFlightQueryName] = queryResponse

return []data.DataSource{
data.NewAsyncDataSource(multipleQuerySupport, req, metadata.SubscriptionResults, metadata.AsyncQueryResults),
data.NewAsyncDataSource(multipleQuerySupport, req, metadata.EvaluationMetadata, metadata.AsyncQueryResults),
}, nil
}
}
24 changes: 14 additions & 10 deletions policy/policy_handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type (
Start()
}

subscriptionProvider func(ctx context.Context, req skill.RequestContext) ([][]edn.RawMessage, skill.Configuration, error)
dataSourceProvider func(ctx context.Context, req skill.RequestContext) ([]data.DataSource, error)
subscriptionProvider func(ctx context.Context, req skill.RequestContext) (*goals.EvaluationMetadata, skill.Configuration, error)
dataSourceProvider func(ctx context.Context, req skill.RequestContext, evalMeta goals.EvaluationMetadata) ([]data.DataSource, error)
transactionFilter func(ctx context.Context, req skill.RequestContext) bool

EventHandler struct {
Expand Down Expand Up @@ -72,27 +72,27 @@ func (h EventHandler) Start() {

func (h EventHandler) handle(ctx context.Context, req skill.RequestContext) skill.Status {
var (
subscriptionResult [][]edn.RawMessage
evaluationMetadata *goals.EvaluationMetadata
configuration skill.Configuration
err error
)
for _, provider := range h.subscriptionDataProviders {
subscriptionResult, configuration, err = provider(ctx, req)
evaluationMetadata, configuration, err = provider(ctx, req)
if err != nil {
return skill.NewFailedStatus(fmt.Sprintf("failed to retrieve subscription result [%s]", err.Error()))
}
if subscriptionResult != nil {
if evaluationMetadata != nil {
break
}
}

if subscriptionResult == nil {
if evaluationMetadata == nil {
return skill.NewFailedStatus(fmt.Sprintf("subscription result was not found"))
}

sources := []data.DataSource{}
for _, provider := range h.dataSourceProviders {
ds, err := provider(ctx, req)
ds, err := provider(ctx, req, *evaluationMetadata)
if err != nil {
if err.Error() == "An unexpected error has occurred" {
return skill.NewRetryableStatus(fmt.Sprintf("Failed to create data source [%s]", err.Error()))
Expand All @@ -104,11 +104,13 @@ func (h EventHandler) handle(ctx context.Context, req skill.RequestContext) skil

dataSource := data.NewChainDataSource(sources...)

return h.evaluate(ctx, req, dataSource, subscriptionResult, configuration)
return h.evaluate(ctx, req, dataSource, *evaluationMetadata, configuration)
}

func (h EventHandler) evaluate(ctx context.Context, req skill.RequestContext, dataSource data.DataSource, subscriptionResult [][]edn.RawMessage, configuration skill.Configuration) skill.Status {
func (h EventHandler) evaluate(ctx context.Context, req skill.RequestContext, dataSource data.DataSource, evaluationMetadata goals.EvaluationMetadata, configuration skill.Configuration) skill.Status {
goalName := req.Event.Skill.Name
tx := evaluationMetadata.SubscriptionTx
subscriptionResult := evaluationMetadata.SubscriptionResult

cfg := configuration.Name
params := configuration.Parameters
Expand Down Expand Up @@ -171,6 +173,7 @@ func (h EventHandler) evaluate(ctx context.Context, req skill.RequestContext, da
subscriptionResult,
evaluationTs,
goalResults,
tx,
)
}

Expand All @@ -184,6 +187,7 @@ func transact(
subscriptionResult [][]edn.RawMessage,
evaluationTs time.Time,
goalResults []goals.GoalEvaluationQueryResult,
tx int64,
) skill.Status {
storageTuple := util.Decode[[]string](subscriptionResult[0][1])
storageId := storageTuple[0]
Expand Down Expand Up @@ -221,7 +225,7 @@ func transact(

var entities []interface{}
if differ || configDiffer {
entity := goals.CreateEntitiesFromResults(goalResults, goal.Definition, goal.Configuration, digest, storageId, configHash, evaluationTs)
entity := goals.CreateEntitiesFromResults(goalResults, goal.Definition, goal.Configuration, digest, storageId, configHash, evaluationTs, tx)
entities = append(entities, entity)
}

Expand Down
9 changes: 4 additions & 5 deletions policy/policy_handler/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package policy_handler
import (
"context"
"encoding/json"

"github.com/atomist-skills/go-skill"
"github.com/atomist-skills/go-skill/policy/data"
"olympos.io/encoding/edn"
"github.com/atomist-skills/go-skill/policy/goals"
)

const eventNameLocalEval = "evaluate_goals_locally"
Expand All @@ -20,15 +19,15 @@ func WithLocal() Opt {
}
}

func getLocalSubscriptionData(ctx context.Context, req skill.RequestContext) ([][]edn.RawMessage, skill.Configuration, error) {
func getLocalSubscriptionData(ctx context.Context, req skill.RequestContext) (*goals.EvaluationMetadata, skill.Configuration, error) {
if req.Event.Context.SyncRequest.Name != eventNameLocalEval {
return nil, skill.Configuration{}, nil
}

return [][]edn.RawMessage{}, req.Event.Context.Subscription.Configuration, nil
return &goals.EvaluationMetadata{}, req.Event.Context.Subscription.Configuration, nil
}

func buildLocalDataSources(ctx context.Context, req skill.RequestContext) ([]data.DataSource, error) {
func buildLocalDataSources(ctx context.Context, req skill.RequestContext, evalMeta goals.EvaluationMetadata) ([]data.DataSource, error) {
if req.Event.Context.SyncRequest.Name != eventNameLocalEval {
return []data.DataSource{}, nil
}
Expand Down
11 changes: 8 additions & 3 deletions policy/policy_handler/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package policy_handler

import (
"context"
"github.com/atomist-skills/go-skill/policy/goals"

"github.com/atomist-skills/go-skill"
"olympos.io/encoding/edn"
)

func withSubscription() Opt {
Expand All @@ -12,10 +13,14 @@ func withSubscription() Opt {
}
}

func getSubscriptionData(ctx context.Context, req skill.RequestContext) ([][]edn.RawMessage, skill.Configuration, error) {
func getSubscriptionData(ctx context.Context, req skill.RequestContext) (*goals.EvaluationMetadata, skill.Configuration, error) {
if req.Event.Context.Subscription.Name == "" {
return nil, skill.Configuration{}, nil
}

return req.Event.Context.Subscription.Result, req.Event.Context.Subscription.Configuration, nil
evalMeta := &goals.EvaluationMetadata{
SubscriptionResult: req.Event.Context.Subscription.Result,
SubscriptionTx: req.Event.Context.Subscription.Metadata.Tx,
}
return evalMeta, req.Event.Context.Subscription.Configuration, nil
}
4 changes: 3 additions & 1 deletion policy/policy_handler/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package policy_handler
import (
"context"
"fmt"
"github.com/atomist-skills/go-skill/policy/goals"

"github.com/atomist-skills/go-skill"
"github.com/atomist-skills/go-skill/policy/data"
)
Expand All @@ -13,7 +15,7 @@ func WithSyncQuery() Opt {
}
}

func getSyncDataSources(ctx context.Context, req skill.RequestContext) ([]data.DataSource, error) {
func getSyncDataSources(ctx context.Context, req skill.RequestContext, evalMeta goals.EvaluationMetadata) ([]data.DataSource, error) {
gqlDs, err := data.NewSyncGraphqlDataSource(ctx, req)
if err != nil {
return nil, fmt.Errorf("unable to create data source: %w", err)
Expand Down

0 comments on commit 52a2b35

Please sign in to comment.