Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Go SDK]: GrowableTracker TrySplit() with fraction math.NaN() #29210

Closed
1 of 16 tasks
johannaojeling opened this issue Oct 31, 2023 · 3 comments · Fixed by #29307
Closed
1 of 16 tasks

[Bug][Go SDK]: GrowableTracker TrySplit() with fraction math.NaN() #29210

johannaojeling opened this issue Oct 31, 2023 · 3 comments · Fixed by #29307
Assignees

Comments

@johannaojeling
Copy link
Contributor

johannaojeling commented Oct 31, 2023

What happened?

Problem

In SDFs, ProcessSizedElementsAndRestrictions.GetProgress() can return math.NaN() if done and remaining are both 0:

d, r := n.rt.GetProgress()
frac := d / (d + r)

	d, r := n.rt.GetProgress() // d, r := 0, 0
	frac := d / (d + r) // frac := 0 / (0 + 0)

This causes downstream problems if using an offsetrange.GrowableTracker with an unbounded offsetrange.Restriction, as math.NaN() is then being passed as the fraction to TrySplit(). The function returns a primary restriction with its End smaller than its Start. This in turn leads to a negative return value when Size() is invoked, which causes the pipeline to fail.

Example use case

Reading messages from a stream fails at checkpointing if no new messages have been published and read since the last checkpoint.

Given

  • 5 messages have been published on stream
  • no new messages are published since the pipeline starts executing
  • the restriction is based on the sequence number of a message
  • the read is configured to start from seq no 1
  • the estimated end (exclusive) is the seq no of the last published message + 1
  • the SDF initiates a self-checkpoint with process continuation resume when there are no new messages to process

Initial state

Rest: {Start: 1, End: math.MaxInt64}
Tracker: {attempted: -1, claimed: 0}
Estimated end: 6

At checkpoint 1

Rest: {Start: 1, End: math.MaxInt64}
Tracker: {attempted: 5, claimed: 5}
Estimated end: 6

→ fraction: 0.5
→ splitPt: 6
→ primary: {Start: 1, End: 6}
→ residual: {Start: 6, End: math.MaxInt64}

At checkpoint 2

Rest: {Start: 6, End: math.MaxInt64}
Tracker: {attempted: -1, claimed: 5}
Estimated end: 6

→ fraction: math.NaN()
→ splitPt: 5
→ primary: {Start: 6, End: 5}
→ residual: {Start: 5, End: math.MaxInt64}

→ primary.Size() < 0 → error

Logs
2023/10/31 07:48:33 WARN SDK Error from split, aborting splits bundle.ID=inst004 bundle.stage=stage-003 bundle.watermark=1969-12-31T23:59:59.999Z error="split[inst004] error from SDK: unable to split inst004: \tAttempting split in ProcessSizedElementsAndRestrictions\n\tSDF.ProcessSizedElementsAndRestrictions[natsio.readFn] UID:4 Out:[2]\nsize returned expected to be non-negative but received -1."
2023/10/31 07:48:33 ERROR unable to split inst004:      Attempting split in ProcessSizedElementsAndRestrictions
        SDF.ProcessSizedElementsAndRestrictions[natsio.readFn] UID:4 Out:[2]

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@johannaojeling
Copy link
Contributor Author

@lostluck I'm happy to fix this but want to align with you on the implementation:

@johannaojeling
Copy link
Contributor Author

.take-issue

@lostluck
Copy link
Contributor

lostluck commented Oct 31, 2023

OK, I think the best answer for now is "if 0 work has been done, fraction is 0". This avoids the NA condition. With a growable tracker, there's the potential for more work to appear later, so this avoids issues with the fraction of progress going backwards as work appears (eg. 1. back to 0) . (This can still happen, if the amount of work goes up faster than we're processing it... but that's a different issue for runners to deal with).

I think it's a safe enough behavior to add to
ProcessSizedElementsAndRestrictions.GetProgress

@johannaojeling johannaojeling changed the title [Bug][Go SDK]: GrowableTracker TrySplit() with fraction math.NA() [Bug][Go SDK]: GrowableTracker TrySplit() with fraction math.NaN() Nov 5, 2023
@github-actions github-actions bot added this to the 2.53.0 Release milestone Nov 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants