From 7b6882aff69d70eefda394acd56b329c50b85334 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 6 Jan 2025 15:04:07 -0800 Subject: [PATCH 1/7] Support RequiresTimeSorted annotation. --- runners/prism/java/build.gradle | 8 +++----- .../runners/prism/internal/engine/elementmanager.go | 12 ++++++++++++ .../pkg/beam/runners/prism/internal/handlepardo.go | 7 +++++-- .../beam/runners/prism/internal/jobservices/job.go | 1 + 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index a48973f65674..6f927f81aaea 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -98,6 +98,7 @@ def sickbayTests = [ 'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton', // Requires Allowed Lateness, among others. 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness', + 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness', 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate', 'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode', 'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow', @@ -178,11 +179,8 @@ def sickbayTests = [ 'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables', // Requires Time Sorted Input - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput', - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream', - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness', - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData', - 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData', + // 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput', + // 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream', // Missing output due to processing time timer skew. 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew', diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 3cfcf9ef8c0e..4a434847f997 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1179,6 +1179,18 @@ func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *st func (ss *stageState) AddPending(newPending []element) int { ss.mu.Lock() defer ss.mu.Unlock() + // TODO(#https://github.com/apache/beam/issues/31438): + // Adjust with AllowedLateness + // Data that arrives after the *output* watermark is late. + threshold := ss.output + origPending := make([]element, 0, ss.pending.Len()) + for _, e := range newPending { + if e.timestamp < threshold { + continue + } + origPending = append(origPending, e) + } + newPending = origPending if ss.stateful { if ss.pendingByKeys == nil { ss.pendingByKeys = map[string]*dataAndTimers{} diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go index 13e9b6f1b79d..7ac472251f61 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go @@ -84,9 +84,12 @@ func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb // At their simplest, we don't need to do anything special at pre-processing time, and simply pass through as normal. - // StatefulDoFns need to be marked as being roots. + // ForceRoots cause fusion breaks in the optimized graph. + // StatefulDoFns need to be marked as being roots, for correct per-key state handling. + // Prism already sorts input elements for a stage by EventTime, so a fusion break enables the sorted behavior. var forcedRoots []string - if len(pdo.StateSpecs)+len(pdo.TimerFamilySpecs) > 0 { + if len(pdo.GetStateSpecs())+len(pdo.GetTimerFamilySpecs()) > 0 || + pdo.GetRequiresTimeSortedInput() { forcedRoots = append(forcedRoots, tid) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index 4be64e5a9c80..f186b11fd1d8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -47,6 +47,7 @@ var supportedRequirements = map[string]struct{}{ urns.RequirementStatefulProcessing: {}, urns.RequirementBundleFinalization: {}, urns.RequirementOnWindowExpiration: {}, + urns.RequirementTimeSortedInput: {}, } // TODO, move back to main package, and key off of executor handlers? From 2e8d6fe4a0034b05b8bd55eacaa68f361b840b8b Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:49:36 -0800 Subject: [PATCH 2/7] Drop data based on element's window vs receiving stage's output watermark. --- .../go/pkg/beam/runners/prism/internal/engine/elementmanager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 4a434847f997..bb3c8ceceeb8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1185,7 +1185,7 @@ func (ss *stageState) AddPending(newPending []element) int { threshold := ss.output origPending := make([]element, 0, ss.pending.Len()) for _, e := range newPending { - if e.timestamp < threshold { + if e.window.MaxTimestamp() < threshold { continue } origPending = append(origPending, e) From 51b05e741b93790b52c8602509db860462abab21 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:49:49 -0800 Subject: [PATCH 3/7] Fix late data tests to actually have late data. --- .../org/apache/beam/sdk/transforms/ParDoTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 742547b9b6c3..2bdb0494959c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -3731,7 +3731,9 @@ public void testRequiresTimeSortedInputWithLateDataAndAllowedLateness() { if (stamp == 100) { // advance watermark when we have 100 remaining elements // all the rest are going to be late elements - input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); + input = + input.advanceWatermarkTo( + GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1))); } } testTimeSortedInput( @@ -3764,7 +3766,9 @@ public void testRequiresTimeSortedInputWithLateData() { if (stamp == 100) { // advance watermark when we have 100 remaining elements // all the rest are going to be late elements - input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); + input = + input.advanceWatermarkTo( + GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1))); } } testTimeSortedInput( @@ -3796,7 +3800,9 @@ public void testTwoRequiresTimeSortedInputWithLateData() { if (stamp == 100) { // advance watermark when we have 100 remaining elements // all the rest are going to be late elements - input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); + input = + input.advanceWatermarkTo( + GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1))); } } // apply the sorted function for the first time From 1f5fc71d9bc55e90cd2b4cc1eb9059c64df74c94 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:50:26 -0800 Subject: [PATCH 4/7] Disable reshuffle tests that fail due to precise late data. --- runners/prism/java/build.gradle | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index 6f927f81aaea..a322830709ad 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -161,6 +161,14 @@ def sickbayTests = [ // TODO(https://github.com/apache/beam/issues/31231) 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata', + + // These tests fail once Late Data was being precisely dropped. + // They set a single element to be late data, and expect it (correctly) to be preserved. + // Since presently, these are treated as No-ops, the fix is to disable the + // dropping behavior when a stage's input is a Reshuffle/Redistribute transform. + 'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming', + 'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributeWithTimestampsStreaming', + // Prism isn't handling Java's side input views properly. // https://github.com/apache/beam/issues/32932 // java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. From aab4748e8d7d90ded345835834e313f4fada289f Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 6 Jan 2025 17:09:42 -0800 Subject: [PATCH 5/7] Remove commented out tests. --- runners/prism/java/build.gradle | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index a322830709ad..c4a69fef6507 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -184,11 +184,7 @@ def sickbayTests = [ 'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput', // Prism side encoding error. // java.lang.IllegalStateException: java.io.EOFException - 'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables', - - // Requires Time Sorted Input - // 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput', - // 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream', + 'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables', // Missing output due to processing time timer skew. 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew', From 92f888b1b9b1e5c11cc2792e70ece622222e2f65 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 7 Jan 2025 09:21:30 -0800 Subject: [PATCH 6/7] ws lint --- runners/prism/java/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index c4a69fef6507..791952c625f4 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -184,7 +184,7 @@ def sickbayTests = [ 'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput', // Prism side encoding error. // java.lang.IllegalStateException: java.io.EOFException - 'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables', + 'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables', // Missing output due to processing time timer skew. 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew', From 1c2d033b14b8d4322d5763a7663cffc33c3f44f0 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 7 Jan 2025 09:22:25 -0800 Subject: [PATCH 7/7] Revert change to allowed lateness test, which seems to break for the direct runner. Prism can't run it yet anyway. --- .../test/java/org/apache/beam/sdk/transforms/ParDoTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 2bdb0494959c..8409133772eb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -3731,9 +3731,7 @@ public void testRequiresTimeSortedInputWithLateDataAndAllowedLateness() { if (stamp == 100) { // advance watermark when we have 100 remaining elements // all the rest are going to be late elements - input = - input.advanceWatermarkTo( - GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.standardSeconds(1))); + input = input.advanceWatermarkTo(Instant.ofEpochMilli(stamp)); } } testTimeSortedInput(