Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to cancel jetstream.MessagesContext with context.Context #1772

Open
cedws opened this issue Jan 13, 2025 · 3 comments
Open

Ability to cancel jetstream.MessagesContext with context.Context #1772

cedws opened this issue Jan 13, 2025 · 3 comments
Labels
proposal Enhancement idea or proposal

Comments

@cedws
Copy link

cedws commented Jan 13, 2025

Proposed change

MessagesContext provides an interface to retrieve the next message in a queue. This can be stopped/cancelled by calling Stop() on it.

The Next() func on the MesssagesContext blocks indefinitely. This can be a problem if you're reading messages in a loop and need to be able to cancel this work with a context.Context. I can't see any way to pass a context.Context to Next() nor when instantiating the MessagesContext.

I see that MessagesContext can be instantiated with the PullExpiry opt, but I don't think this actually solves my problem.

I'm using this over Fetch because of what it says in the docs, though it's worth noting the Fetch() API does not support context.Context either:

Warning: Both Fetch() and FetchNoWait() have worse performance when used to continuously retrieve messages in comparison to Messages() or Consume() methods, as they do not perform any optimizations (pre-buffering) and new subscription is created for each execution.

Use case

As above.

Contribution

No response

@cedws cedws added the proposal Enhancement idea or proposal label Jan 13, 2025
@cedws
Copy link
Author

cedws commented Jan 13, 2025

This is the workaround I'm using, but it's not particularly nice:

var (
	msgCh    = make(chan jetstream.Msg, 1)
	msgErrCh = make(chan error)
)

go func() {
	for {
		msg, err := msgCtx.Next()
		if err != nil {
			msgErrCh <- err
			return
		}

		msgCh <- msg
	}
}()

for {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case msg := <-msgCh:
	        // ...
	case err := <-msgErrCh:
	        return err
	}
}

@zhulik
Copy link

zhulik commented Jan 26, 2025

This is a common problem as far as I can see, many methods that should be cancellable do not accept a context, jetstream.Consumer.Next() for instance.

It should be possible to implement it without breaking the public apis, something like

ctx, cancel := context.WithCancel(context.Background())
go msgCtx.Next(jetstream.PullContext(ctx))
cancel()

would just work.

@piotrpio
Copy link
Collaborator

Hello @cedws, @zhulik. Thanks for creating the issue - we're working on enabling passing context to those methods, I'll update you in this issue when we have a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal Enhancement idea or proposal
Projects
None yet
Development

No branches or pull requests

3 participants