Skip to content

Commit

Permalink
Merge pull request #34 from atomist-skills/goal-evaluation-skill-catc…
Browse files Browse the repository at this point in the history
…h-up

goal-evaluation-skill catch up
  • Loading branch information
rnorton5432 authored Jan 8, 2024
2 parents 9cb1585 + a963f0f commit 96ce518
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 62 deletions.
29 changes: 19 additions & 10 deletions policy/data/gql_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"context"
b64 "encoding/base64"
"encoding/json"
"fmt"
"github.com/atomist-skills/go-skill/policy/goals"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -35,33 +37,33 @@ type (
}

AsyncResultMetadata struct {
SubscriptionResults [][]edn.RawMessage `edn:"subscription"`
AsyncQueryResults map[string]AsyncQueryResponse `edn:"results"`
InFlightQueryName string `edn:"query-name"`
EvaluationMetadata goals.EvaluationMetadata `edn:"evalMeta"`
AsyncQueryResults map[string]AsyncQueryResponse `edn:"results"`
InFlightQueryName string `edn:"query-name"`
}

AsyncDataSource struct {
multipleQuerySupport bool
log skill.Logger
url string
token string
subscriptionResults [][]edn.RawMessage
evaluationMetadata goals.EvaluationMetadata
asyncResults map[string]AsyncQueryResponse
}
)

func NewAsyncDataSource(
multipleQuerySupport bool,
req skill.RequestContext,
subscriptionResults [][]edn.RawMessage,
evaluationMetadata goals.EvaluationMetadata,
asyncResults map[string]AsyncQueryResponse,
) AsyncDataSource {
return AsyncDataSource{
multipleQuerySupport: multipleQuerySupport,
log: req.Log,
url: fmt.Sprintf("%s:enqueue", req.Event.Urls.Graphql),
token: req.Event.Token,
subscriptionResults: subscriptionResults,
evaluationMetadata: evaluationMetadata,
asyncResults: asyncResults,
}
}
Expand All @@ -85,9 +87,9 @@ func (ds AsyncDataSource) Query(ctx context.Context, queryName string, query str
}

metadata := AsyncResultMetadata{
SubscriptionResults: ds.subscriptionResults,
AsyncQueryResults: ds.asyncResults,
InFlightQueryName: queryName,
EvaluationMetadata: ds.evaluationMetadata,
AsyncQueryResults: ds.asyncResults,
InFlightQueryName: queryName,
}
metadataEdn, err := edn.Marshal(metadata)
if err != nil {
Expand Down Expand Up @@ -129,7 +131,14 @@ func (ds AsyncDataSource) Query(ctx context.Context, queryName string, query str
_, _ = io.Copy(buf, r.Body)
body := buf.String()

return nil, fmt.Errorf("async request returned unexpected status %s: %s", r.Status, body)
headers := ""
if responseHeaderBytes, err := json.Marshal(req.Header); err != nil {
headers = "Unable to read headers"
} else {
headers = string(responseHeaderBytes)
}

return nil, fmt.Errorf("async request returned unexpected status %s - HEADERS: %s BODY: %s", r.Status, headers, body)
}

return &QueryResponse{AsyncRequestMade: true}, nil
Expand Down
15 changes: 10 additions & 5 deletions policy/goals/differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ import (
// It returns the storage id for the current query results.
func GoalResultsDiffer(log skill.Logger, queryResults []GoalEvaluationQueryResult, digest string, goal Goal, previousStorageId string) (bool, string, error) {
log.Infof("Generating storage id for goal %s, image %s", goal.Definition, digest)
hash, err := hashstructure.Hash(queryResults, hashstructure.FormatV2, nil)
if err != nil {
return false, "", fmt.Errorf("failed to generate storage id for goal %s, image %s: %s", goal.Definition, digest, err)
}

storageId := fmt.Sprint(hash)
storageId := "no-data"

if queryResults != nil {
hash, err := hashstructure.Hash(queryResults, hashstructure.FormatV2, nil)
if err != nil {
return false, "", fmt.Errorf("failed to generate storage id for goal %s, image %s: %s", goal.Definition, digest, err)
}

storageId = fmt.Sprint(hash)
}

differ := storageId != previousStorageId

Expand Down
30 changes: 21 additions & 9 deletions policy/goals/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,26 @@ package goals

import "time"

func CreateEntitiesFromResults(results []GoalEvaluationQueryResult, goalDefinition string, goalConfiguration string, image string, storageId string, configHash string, evaluationTs time.Time) GoalEvaluationResultEntity {
return GoalEvaluationResultEntity{
Definition: goalDefinition,
Configuration: goalConfiguration,
Subject: DockerImageEntity{Digest: image},
DeviationCount: len(results),
StorageId: storageId,
ConfigHash: configHash,
CreatedAt: evaluationTs,
func CreateEntitiesFromResults(results []GoalEvaluationQueryResult, goalDefinition string, goalConfiguration string, image string, storageId string, configHash string, evaluationTs time.Time, tx int64) GoalEvaluationResultEntity {
entity := GoalEvaluationResultEntity{
Definition: goalDefinition,
Configuration: goalConfiguration,
Subject: DockerImageEntity{Digest: image},
ConfigHash: configHash,
CreatedAt: evaluationTs,
TransactionCondition: TransactionConditionEntity{
Args: map[string]interface{}{"tx-arg": tx},
Where: []byte(`[[?entity :goal.result/created-at _ ?tx true]
[(< ?tx ?tx-arg)]]`),
},
}

if storageId != "no-data" {
deviationCount := len(results)

entity.DeviationCount = &deviationCount
entity.StorageId = &storageId
}

return entity
}
22 changes: 19 additions & 3 deletions policy/goals/entities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,29 @@ func TestCreateEntitiesFromResult(t *testing.T) {

evaluationTs := time.Date(2023, 7, 10, 20, 1, 41, 0, time.UTC)

entity := CreateEntitiesFromResults(resultModel, "test-definition", "test-configuration", "test-image", "storage-id", "config-hash", evaluationTs)
entity := CreateEntitiesFromResults(resultModel, "test-definition", "test-configuration", "test-image", "storage-id", "config-hash", evaluationTs, 123)

if entity.Definition != "test-definition" || entity.Configuration != "test-configuration" || entity.StorageId != "storage-id" || entity.CreatedAt.Format("2006-01-02T15:04:05.000Z") != "2023-07-10T20:01:41.000Z" {
if entity.Definition != "test-definition" || entity.Configuration != "test-configuration" || *entity.StorageId != "storage-id" || entity.CreatedAt.Format("2006-01-02T15:04:05.000Z") != "2023-07-10T20:01:41.000Z" {
t.Errorf("metadata not set correctly")
}

if entity.DeviationCount != 1 {
if *entity.DeviationCount != 1 {
t.Errorf("incorrect number of deviations, expected %d, got %d", 1, entity.DeviationCount)
}
}

func TestNoDataDoesntSetFields(t *testing.T) {
result := `[{:name "CVE-2023-2650", :details {:purl "pkg:alpine/[email protected]?os_name=alpine&os_version=3.18", :cve "CVE-2023-2650", :severity "HIGH", :fixed-by "3.1.1-r0"} }]`

resultModel := []GoalEvaluationQueryResult{}

edn.Unmarshal([]byte(result), &resultModel)

evaluationTs := time.Date(2023, 7, 10, 20, 1, 41, 0, time.UTC)

entity := CreateEntitiesFromResults(resultModel, "test-definition", "test-configuration", "test-image", "no-data", "config-hash", evaluationTs, 123)

if entity.StorageId != nil || entity.DeviationCount != nil {
t.Errorf("metadata not set correctly")
}
}
27 changes: 19 additions & 8 deletions policy/goals/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,20 @@ type (
}

GoalEvaluationResultEntity struct {
skill.Entity `entity-type:"goal/result"`
Definition string `edn:"goal.definition/name"`
Configuration string `edn:"goal.configuration/name"`
Subject DockerImageEntity `edn:"goal.result/subject"`
DeviationCount int `edn:"goal.result/deviation-count"`
StorageId string `edn:"goal.result/storage-id"`
ConfigHash string `edn:"goal.result/config-hash"`
CreatedAt time.Time `edn:"goal.result/created-at"`
skill.Entity `entity-type:"goal/result"`
Definition string `edn:"goal.definition/name"`
Configuration string `edn:"goal.configuration/name"`
Subject DockerImageEntity `edn:"goal.result/subject"`
DeviationCount *int `edn:"goal.result/deviation-count"`
StorageId *string `edn:"goal.result/storage-id"`
ConfigHash string `edn:"goal.result/config-hash"`
CreatedAt time.Time `edn:"goal.result/created-at"`
TransactionCondition TransactionConditionEntity `edn:"atomist/tx-iff"`
}

TransactionConditionEntity struct {
Args map[string]interface{} `edn:"args"`
Where edn.RawMessage `edn:"where"`
}

ImagePlatform struct {
Expand All @@ -61,6 +67,11 @@ type (
ImagePlatforms []ImagePlatform `edn:"docker.image/platform" json:"platforms"`
}

EvaluationMetadata struct {
SubscriptionResult [][]edn.RawMessage `edn:"subscription-result"`
SubscriptionTx int64 `edn:"subscription-tx"`
}

GoalEvaluator interface {
EvaluateGoal(ctx context.Context, req skill.RequestContext, commonData CommonSubscriptionQueryResult, subscriptionResults [][]edn.RawMessage) ([]GoalEvaluationQueryResult, error)
}
Expand Down
11 changes: 6 additions & 5 deletions policy/policy_handler/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
b64 "encoding/base64"
"fmt"
"github.com/atomist-skills/go-skill/policy/goals"

"github.com/atomist-skills/go-skill"
"github.com/atomist-skills/go-skill/policy/data"
Expand Down Expand Up @@ -39,7 +40,7 @@ func withAsync() Opt {
}
}

func getAsyncSubscriptionData(ctx context.Context, req skill.RequestContext) ([][]edn.RawMessage, skill.Configuration, error) {
func getAsyncSubscriptionData(ctx context.Context, req skill.RequestContext) (*goals.EvaluationMetadata, skill.Configuration, error) {
if req.Event.Context.AsyncQueryResult.Name != eventNameAsyncQuery {
return nil, skill.Configuration{}, nil
}
Expand All @@ -55,18 +56,18 @@ func getAsyncSubscriptionData(ctx context.Context, req skill.RequestContext) ([]
return nil, skill.Configuration{}, fmt.Errorf("failed to unmarshal async metadata: %w", err)
}

return metadata.SubscriptionResults, req.Event.Context.AsyncQueryResult.Configuration, nil
return &metadata.EvaluationMetadata, req.Event.Context.AsyncQueryResult.Configuration, nil
}

func buildAsyncDataSources(multipleQuerySupport bool) dataSourceProvider {
return func(ctx context.Context, req skill.RequestContext) ([]data.DataSource, error) {
return func(ctx context.Context, req skill.RequestContext, evalMeta goals.EvaluationMetadata) ([]data.DataSource, error) {
if req.Event.Context.SyncRequest.Name == eventNameLocalEval {
return []data.DataSource{}, nil
}

if req.Event.Context.AsyncQueryResult.Name != eventNameAsyncQuery {
return []data.DataSource{
data.NewAsyncDataSource(multipleQuerySupport, req, req.Event.Context.Subscription.Result, map[string]data.AsyncQueryResponse{}),
data.NewAsyncDataSource(multipleQuerySupport, req, evalMeta, map[string]data.AsyncQueryResponse{}),
}, nil
}

Expand All @@ -86,7 +87,7 @@ func buildAsyncDataSources(multipleQuerySupport bool) dataSourceProvider {
metadata.AsyncQueryResults[metadata.InFlightQueryName] = queryResponse

return []data.DataSource{
data.NewAsyncDataSource(multipleQuerySupport, req, metadata.SubscriptionResults, metadata.AsyncQueryResults),
data.NewAsyncDataSource(multipleQuerySupport, req, metadata.EvaluationMetadata, metadata.AsyncQueryResults),
}, nil
}
}
32 changes: 19 additions & 13 deletions policy/policy_handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type (
Start()
}

subscriptionProvider func(ctx context.Context, req skill.RequestContext) ([][]edn.RawMessage, skill.Configuration, error)
dataSourceProvider func(ctx context.Context, req skill.RequestContext) ([]data.DataSource, error)
subscriptionProvider func(ctx context.Context, req skill.RequestContext) (*goals.EvaluationMetadata, skill.Configuration, error)
dataSourceProvider func(ctx context.Context, req skill.RequestContext, evalMeta goals.EvaluationMetadata) ([]data.DataSource, error)
transactionFilter func(ctx context.Context, req skill.RequestContext) bool

EventHandler struct {
Expand Down Expand Up @@ -72,40 +72,45 @@ func (h EventHandler) Start() {

func (h EventHandler) handle(ctx context.Context, req skill.RequestContext) skill.Status {
var (
subscriptionResult [][]edn.RawMessage
evaluationMetadata *goals.EvaluationMetadata
configuration skill.Configuration
err error
)
for _, provider := range h.subscriptionDataProviders {
subscriptionResult, configuration, err = provider(ctx, req)
evaluationMetadata, configuration, err = provider(ctx, req)
if err != nil {
return skill.NewFailedStatus(fmt.Sprintf("failed to retrieve subscription result [%s]", err.Error()))
}
if subscriptionResult != nil {
if evaluationMetadata != nil {
break
}
}

if subscriptionResult == nil {
if evaluationMetadata == nil {
return skill.NewFailedStatus(fmt.Sprintf("subscription result was not found"))
}

sources := []data.DataSource{}
for _, provider := range h.dataSourceProviders {
ds, err := provider(ctx, req)
ds, err := provider(ctx, req, *evaluationMetadata)
if err != nil {
if err.Error() == "An unexpected error has occurred" {
return skill.NewRetryableStatus(fmt.Sprintf("Failed to create data source [%s]", err.Error()))
}
return skill.NewFailedStatus(fmt.Sprintf("failed to create data source [%s]", err.Error()))
}
sources = append(sources, ds...)
}

dataSource := data.NewChainDataSource(sources...)

return h.evaluate(ctx, req, dataSource, subscriptionResult, configuration)
return h.evaluate(ctx, req, dataSource, *evaluationMetadata, configuration)
}

func (h EventHandler) evaluate(ctx context.Context, req skill.RequestContext, dataSource data.DataSource, subscriptionResult [][]edn.RawMessage, configuration skill.Configuration) skill.Status {
func (h EventHandler) evaluate(ctx context.Context, req skill.RequestContext, dataSource data.DataSource, evaluationMetadata goals.EvaluationMetadata, configuration skill.Configuration) skill.Status {
goalName := req.Event.Skill.Name
tx := evaluationMetadata.SubscriptionTx
subscriptionResult := evaluationMetadata.SubscriptionResult

cfg := configuration.Name
params := configuration.Parameters
Expand Down Expand Up @@ -168,6 +173,7 @@ func (h EventHandler) evaluate(ctx context.Context, req skill.RequestContext, da
subscriptionResult,
evaluationTs,
goalResults,
tx,
)
}

Expand All @@ -181,14 +187,14 @@ func transact(
subscriptionResult [][]edn.RawMessage,
evaluationTs time.Time,
goalResults []goals.GoalEvaluationQueryResult,
tx int64,
) skill.Status {
storageTuple := util.Decode[[]string](subscriptionResult[0][1])
storageId := storageTuple[0]
configHash := storageTuple[1]

if goalResults == nil {
req.Log.Infof("goal %s returned no data for digest %s, skipping storing results", goal.Definition, digest)
return skill.NewCompletedStatus(fmt.Sprintf("Goal %s evaluated - no data found", goalName))
req.Log.Infof("goal %s returned no data for digest %s", goal.Definition, digest)
}

es, err := storage.NewEvaluationStorage(ctx)
Expand All @@ -210,15 +216,15 @@ func transact(
differ = true
}

if differ {
if differ && goalResults != nil {
if err := es.Store(ctx, goalResults, storageId, req.Event.Environment, req.Log); err != nil {
return skill.NewFailedStatus(fmt.Sprintf("Failed to store evaluation results for digest %s: %s", digest, err.Error()))
}
}

var entities []interface{}
if differ || configDiffer {
entity := goals.CreateEntitiesFromResults(goalResults, goal.Definition, goal.Configuration, digest, storageId, configHash, evaluationTs)
entity := goals.CreateEntitiesFromResults(goalResults, goal.Definition, goal.Configuration, digest, storageId, configHash, evaluationTs, tx)
entities = append(entities, entity)
}

Expand Down
9 changes: 4 additions & 5 deletions policy/policy_handler/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package policy_handler
import (
"context"
"encoding/json"

"github.com/atomist-skills/go-skill"
"github.com/atomist-skills/go-skill/policy/data"
"olympos.io/encoding/edn"
"github.com/atomist-skills/go-skill/policy/goals"
)

const eventNameLocalEval = "evaluate_goals_locally"
Expand All @@ -20,15 +19,15 @@ func WithLocal() Opt {
}
}

func getLocalSubscriptionData(ctx context.Context, req skill.RequestContext) ([][]edn.RawMessage, skill.Configuration, error) {
func getLocalSubscriptionData(ctx context.Context, req skill.RequestContext) (*goals.EvaluationMetadata, skill.Configuration, error) {
if req.Event.Context.SyncRequest.Name != eventNameLocalEval {
return nil, skill.Configuration{}, nil
}

return [][]edn.RawMessage{}, req.Event.Context.Subscription.Configuration, nil
return &goals.EvaluationMetadata{}, req.Event.Context.Subscription.Configuration, nil
}

func buildLocalDataSources(ctx context.Context, req skill.RequestContext) ([]data.DataSource, error) {
func buildLocalDataSources(ctx context.Context, req skill.RequestContext, evalMeta goals.EvaluationMetadata) ([]data.DataSource, error) {
if req.Event.Context.SyncRequest.Name != eventNameLocalEval {
return []data.DataSource{}, nil
}
Expand Down
Loading

0 comments on commit 96ce518

Please sign in to comment.