Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass an error message to the failure node #6181

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package executors
import (
"context"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

type FailureNodeLookup struct {
NodeLookup
FailureNode v1alpha1.ExecutableNode
FailureNodeStatus v1alpha1.ExecutableNodeStatus
OriginalError *core.ExecutionError
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
}

func (f FailureNodeLookup) GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool) {
Expand All @@ -35,10 +37,15 @@ func (f FailureNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, erro
return nil, nil
}

func NewFailureNodeLookup(nodeLookup NodeLookup, failureNode v1alpha1.ExecutableNode, failureNodeStatus v1alpha1.ExecutableNodeStatus) NodeLookup {
func (f FailureNodeLookup) GetOriginalError() (*core.ExecutionError, error) {
return f.OriginalError, nil
}

func NewFailureNodeLookup(nodeLookup NodeLookup, failureNode v1alpha1.ExecutableNode, failureNodeStatus v1alpha1.ExecutableNodeStatus, originalError *core.ExecutionError) NodeLookup {
return FailureNodeLookup{
NodeLookup: nodeLookup,
FailureNode: failureNode,
FailureNodeStatus: failureNodeStatus,
OriginalError: originalError,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks"
)
Expand All @@ -26,7 +27,10 @@ func TestNewFailureNodeLookup(t *testing.T) {
nl := nl{}
en := en{}
ns := ns{}
nodeLoopUp := NewFailureNodeLookup(nl, en, ns)
execErr := &core.ExecutionError{
Message: "node failure",
}
nodeLoopUp := NewFailureNodeLookup(nl, en, ns, execErr)
Comment on lines +30 to +33
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test assertion for error field

Consider adding assertions to verify that the execErr is correctly set in the FailureNodeLookup struct. Currently, the test doesn't validate the OriginalError field.

Code suggestion
Check the AI-generated fix before applying
 @@ -35,4 +35,5 @@
 	typed := nodeLoopUp.(FailureNodeLookup)
 	assert.Equal(t, nl, typed.NodeLookup)
 	assert.Equal(t, en, typed.FailureNode)
 	assert.Equal(t, ns, typed.FailureNodeStatus)
 +	assert.Equal(t, execErr, typed.OriginalError)

Code Review Run #4749b8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add an assert below

assert.Equal(t, execErr, typed.OriginalError)

assert.NotNil(t, nl)
typed := nodeLoopUp.(FailureNodeLookup)
assert.Equal(t, nl, typed.NodeLookup)
Expand All @@ -38,14 +42,17 @@ func TestNewTestFailureNodeLookup(t *testing.T) {
n := &mocks.ExecutableNode{}
ns := &mocks.ExecutableNodeStatus{}
failureNodeID := "fn1"
originalErr := &core.ExecutionError{
Message: "node failure",
}
nl := NewTestNodeLookup(
map[string]v1alpha1.ExecutableNode{v1alpha1.StartNodeID: n, failureNodeID: n},
map[string]v1alpha1.ExecutableNodeStatus{v1alpha1.StartNodeID: ns, failureNodeID: ns},
)

assert.NotNil(t, nl)

failureNodeLookup := NewFailureNodeLookup(nl, n, ns)
failureNodeLookup := NewFailureNodeLookup(nl, n, ns, originalErr).(FailureNodeLookup)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider checking type assertion result

Consider adding a test case to verify behavior when originalErr is nil. The type assertion to FailureNodeLookup assumes the cast will always succeed.

Code suggestion
Check the AI-generated fix before applying
Suggested change
failureNodeLookup := NewFailureNodeLookup(nl, n, ns, originalErr).(FailureNodeLookup)
failureNodeLookup, ok := NewFailureNodeLookup(nl, n, ns, originalErr).(FailureNodeLookup)
assert.True(t, ok)

Code Review Run #f10373


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

r, ok := failureNodeLookup.GetNode(v1alpha1.StartNodeID)
assert.True(t, ok)
assert.Equal(t, n, r)
Expand All @@ -64,4 +71,9 @@ func TestNewTestFailureNodeLookup(t *testing.T) {
nodeIDs, err = failureNodeLookup.FromNode(failureNodeID)
assert.Nil(t, nodeIDs)
assert.Nil(t, err)

oe, err := failureNodeLookup.GetOriginalError()
assert.NotNil(t, oe)
assert.Equal(t, originalErr, oe)
assert.Nil(t, err)
}
10 changes: 10 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,16 @@
}

if nodeInputs != nil {
// Resolve error input if current node is an on failure node
failureNodeLookup, ok := nCtx.ContextualNodeLookup().(executors.FailureNodeLookup)
if ok {
originalErr, err := failureNodeLookup.GetOriginalError()
if err != nil {
return handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "FailureNodeError", err.Error(), nil), nil
} else if originalErr != nil {
ResolveOnFailureNodeInput(ctx, nodeInputs, node.GetID(), originalErr)
}

Check warning on line 776 in flytepropeller/pkg/controller/nodes/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/executor.go#L771-L776

Added lines #L771 - L776 were not covered by tests
}
p := common.CheckOffloadingCompat(ctx, nCtx, nodeInputs.GetLiterals(), node, c.literalOffloadingConfig)
if p != nil {
return *p, nil
Expand Down
36 changes: 36 additions & 0 deletions flytepropeller/pkg/controller/nodes/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,39 @@ func Resolve(ctx context.Context, outputResolver OutputResolver, nl executors.No
Literals: literalMap,
}, nil
}

func ResolveOnFailureNodeInput(ctx context.Context, nodeInputs *core.LiteralMap, nodeID v1alpha1.NodeID, execErr *core.ExecutionError) {
literals := nodeInputs.GetLiterals()
if literal, exists := literals["err"]; exists {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider nil check for execErr parameter

Consider adding error handling for the case when execErr is nil. Currently, the function assumes execErr is non-nil when accessing GetMessage().

Code suggestion
Check the AI-generated fix before applying
Suggested change
literals := nodeInputs.GetLiterals()
if literal, exists := literals["err"]; exists {
literals := nodeInputs.GetLiterals()
if literal, exists := literals["err"]; exists {
if execErr == nil {
return
}

Code Review Run #4749b8


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

// make new Scalar for literal map
errorUnion := &core.Scalar_Union{
Union: &core.Union{
Value: &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Error{
Error: &core.Error{
Message: execErr.GetMessage(),
FailedNodeId: nodeID,
},
},
},
},
},
Type: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_ERROR,
},
Structure: &core.TypeStructure{
Tag: "FlyteError",
},
},
},
}
literal.Value = &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: errorUnion,
},
}
}
}
63 changes: 63 additions & 0 deletions flytepropeller/pkg/controller/nodes/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,3 +467,66 @@ func TestResolve(t *testing.T) {
})

}

func TestResolveOnFailureNodeInput(t *testing.T) {
ctx := context.Background()
t.Run("ResolveErrorInputs", func(t *testing.T) {
noneLiteral, _ := coreutils.MakeLiteral(nil)
inputLiterals := make(map[string]*core.Literal, 1)
inputLiterals["err"] = &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Union{
Union: &core.Union{
Value: noneLiteral,
Type: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_NONE,
},
Structure: &core.TypeStructure{
Tag: "none",
},
},
},
},
},
},
}
inputLiteralMap := &core.LiteralMap{
Literals: inputLiterals,
}
nID := "fn"
execErr := &core.ExecutionError{
Message: "node failure",
}
expectedLiterals := make(map[string]*core.Literal, 1)
errorLiteral, err := coreutils.MakeLiteral(&core.Error{Message: execErr.GetMessage(), FailedNodeId: nID})
assert.NoError(t, err)
expectedLiterals["err"] = &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Union{
Union: &core.Union{
Value: errorLiteral,
Type: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_ERROR,
},
Structure: &core.TypeStructure{
Tag: "FlyteError",
},
},
},
},
},
},
}
expectedLiteralMap := &core.LiteralMap{
Literals: expectedLiterals,
}
// Execute resolve
ResolveOnFailureNodeInput(ctx, inputLiteralMap, nID, execErr)
flyteassert.EqualLiteralMap(t, expectedLiteralMap, inputLiteralMap)
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@
status := nCtx.NodeStatus()
subworkflowNodeLookup := executors.NewNodeLookup(subworkflow, status, subworkflow)
failureNodeStatus := status.GetNodeExecutionStatus(ctx, failureNode.GetID())
failureNodeLookup := executors.NewFailureNodeLookup(subworkflowNodeLookup, failureNode, failureNodeStatus)
failureNodeLookup := executors.NewFailureNodeLookup(subworkflowNodeLookup, failureNode, failureNodeStatus, originalError)

Check warning on line 172 in flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go#L172

Added line #L172 was not covered by tests

state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, execContext, failureNodeLookup, failureNodeLookup, failureNode)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl
execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())

failureNodeStatus := w.GetExecutionStatus().GetNodeExecutionStatus(ctx, failureNode.GetID())
failureNodeLookup := executors.NewFailureNodeLookup(w, failureNode, failureNodeStatus)
failureNodeLookup := executors.NewFailureNodeLookup(w, failureNode, failureNodeStatus, execErr)
state, handlerErr := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, failureNodeLookup, failureNodeLookup, failureNode)
c.updateExecutionStats(ctx, execcontext)

Expand Down
39 changes: 39 additions & 0 deletions flytepropeller/pkg/utils/assert/literals.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,24 @@
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
)

func EqualLiteralType(t *testing.T, lt1 *core.LiteralType, lt2 *core.LiteralType) {
if !assert.Equal(t, lt1 == nil, lt2 == nil) {
assert.FailNow(t, "One of the values is nil")
}

Check warning on line 15 in flytepropeller/pkg/utils/assert/literals.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/utils/assert/literals.go#L14-L15

Added lines #L14 - L15 were not covered by tests
assert.Equal(t, reflect.TypeOf(lt1.GetType()), reflect.TypeOf(lt2.GetType()))
switch lt1.GetType().(type) {
case *core.LiteralType_Simple:
assert.Equal(t, lt1.GetType().(*core.LiteralType_Simple).Simple, lt2.GetType().(*core.LiteralType_Simple).Simple)
default:
assert.FailNow(t, "Not yet implemented for types %v", reflect.TypeOf(lt1.GetType()))

Check warning on line 21 in flytepropeller/pkg/utils/assert/literals.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/utils/assert/literals.go#L20-L21

Added lines #L20 - L21 were not covered by tests
}
structure1 := lt1.GetStructure()
structure2 := lt2.GetStructure()
Comment on lines +23 to +24
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if one of the structure is nil, we should also raise an error

if structure1 != nil && structure2 != nil {
assert.Equal(t, structure1.GetTag(), structure2.GetTag())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider additional nil check edge cases

Consider adding a test case to verify behavior when one structure is nil and the other is not. Currently, the code only checks when both are nil or both are non-nil.

Code suggestion
Check the AI-generated fix before applying
Suggested change
structure1 := lt1.GetStructure()
structure2 := lt2.GetStructure()
if structure1 != nil && structure2 != nil {
assert.Equal(t, structure1.GetTag(), structure2.GetTag())
}
structure1 := lt1.GetStructure()
structure2 := lt2.GetStructure()
assert.Equal(t, structure1 == nil, structure2 == nil, "Both structures should be either nil or non-nil")
if structure1 != nil && structure2 != nil {
assert.Equal(t, structure1.GetTag(), structure2.GetTag())
}

Code Review Run #f0fe6a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

}

func EqualPrimitive(t *testing.T, p1 *core.Primitive, p2 *core.Primitive) {
if !assert.Equal(t, p1 == nil, p2 == nil) {
assert.FailNow(t, "One of the values is nil")
Expand All @@ -27,6 +45,23 @@
}
}

func EqualError(t *testing.T, e1 *core.Error, e2 *core.Error) {
if !assert.Equal(t, e1 == nil, e2 == nil) {
assert.FailNow(t, "One of the values is nil")
}

Check warning on line 51 in flytepropeller/pkg/utils/assert/literals.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/utils/assert/literals.go#L50-L51

Added lines #L50 - L51 were not covered by tests
assert.Equal(t, e1.GetMessage(), e2.GetMessage())
assert.Equal(t, e1.GetFailedNodeId(), e2.GetFailedNodeId())
}

func EqualUnion(t *testing.T, u1 *core.Union, u2 *core.Union) {
if !assert.Equal(t, u1 == nil, u2 == nil) {
assert.FailNow(t, "One of the values is nil")
}

Check warning on line 59 in flytepropeller/pkg/utils/assert/literals.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/utils/assert/literals.go#L58-L59

Added lines #L58 - L59 were not covered by tests
assert.Equal(t, reflect.TypeOf(u1.GetValue()), reflect.TypeOf(u2.GetValue()))
EqualLiterals(t, u1.GetValue(), u2.GetValue())
EqualLiteralType(t, u1.GetType(), u2.GetType())
}

func EqualScalar(t *testing.T, p1 *core.Scalar, p2 *core.Scalar) {
if !assert.Equal(t, p1 == nil, p2 == nil) {
assert.FailNow(t, "One of the values is nil")
Expand All @@ -38,6 +73,10 @@
switch p1.GetValue().(type) {
case *core.Scalar_Primitive:
EqualPrimitive(t, p1.GetPrimitive(), p2.GetPrimitive())
case *core.Scalar_Error:
EqualError(t, p1.GetError(), p2.GetError())
case *core.Scalar_Union:
EqualUnion(t, p1.GetUnion(), p2.GetUnion())
default:
assert.FailNow(t, "Not yet implemented for types %v", reflect.TypeOf(p1.GetValue()))
}
Expand Down
Loading