Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Triggers with session window not working #31658

Open
2 of 16 tasks
boolangery opened this issue Jun 20, 2024 · 1 comment
Open
2 of 16 tasks

[Bug]: Triggers with session window not working #31658

boolangery opened this issue Jun 20, 2024 · 1 comment

Comments

@boolangery
Copy link

What happened?

Unable to trigger early firings with a session window.

Tried triggers:

windowedUpdates := beam.WindowInto(s,
	window.NewSessions(30*time.Second), propUpdates,
	beam.Trigger(trigger.Repeat(trigger.AfterCount(500))),
	beam.Trigger(trigger.Repeat(trigger.AfterProcessingTime().PlusDelay(60*time.Second))),
	beam.PanesDiscard(),
)
windowedUpdates := beam.WindowInto(s,
	window.NewSessions(30*time.Second), propUpdates,
	beam.Trigger(trigger.AfterEndOfWindow().EarlyFiring(trigger.AfterProcessingTime().PlusDelay(60*time.Second))),
	beam.PanesDiscard(),
)

Context:

  • Streaming pipeline
  • PubSub input
  • Printing output after a groupBy
  • Sending one message every second

Result: The window extend indefinitely and no panes are fired

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@lostluck
Copy link
Contributor

lostluck commented Feb 3, 2025

The first one is incorrect, and should be a failure at construction time since two triggers are being specified.

The second one, I would expect it to work though, but triggers with merging Windows are pretty complicated... I agree that it doesn't match with my intuition which would be the merge is supposed to take the earlier of the two "first element times", rather than the later of them...

Which does appear to be the implementation, but it's possible the timer that is being set isn't mapping back to the newly merged window... (which doesn't seem right). Hard to say offhand unless we validate the equivalent outside of Go. There could be some other issue.

Possibly related to #31153, but this feels like a runner side issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants