Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer automatic topic creation is disabled when there are multiple topics #3058

Open
sonalys opened this issue Jan 7, 2025 · 6 comments

Comments

@sonalys
Copy link

sonalys commented Jan 7, 2025

Description

When using the producer with: allowAutoTopicCreation = true, the func tryRefreshMetadata on file client.go is behaving strangely:

allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
if len(topics) > 0 {
	DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
} else {
	allowAutoTopicCreation = false
	DebugLogger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
}

req := NewMetadataRequest(client.conf.Version, topics)
req.AllowAutoTopicCreation = allowAutoTopicCreation

This behavior reproduces for both sync and async producers. So if you are asynchronously sending messages to different topics, they won't respect the allowAutoTopicCreation configuration.

I don't know if this is a technical limitation or not, but this behavior is totally hidden from the developer.

Versions

github.com/IBM/sarama v1.44.0

@dnwe
Copy link
Collaborator

dnwe commented Jan 7, 2025

@sonalys I'm not sure I understand your issue, can you elaborate a little more? The producer (and consumer) will call RefreshMetadata with the specific topic(s) you're producing or consuming from, going down the len(topics) > 0 path and allow automatic creation if conf.Metadata.AllowAutoTopicCreation is set. The allowAutoTopicCreation = false is simply for the "full metadata" path when you send a MetadataRequest to Kafka and don't specify any topics

@sonalys
Copy link
Author

sonalys commented Jan 7, 2025

I'm sorry if I interpreted it wrong, but I looked through all the references to the configuration allowAutoTopicCreation, and this was the only path it was being used.

I don't know your codebase at all, so I presumed this is the place where it's used for creating topics automatically.

The behavior I explained remains the same. Whenever using async producers, or sync producer with multiple topics, the topics are not automatically created.

@dnwe
Copy link
Collaborator

dnwe commented Jan 7, 2025

I'm not sure what you mean by "multiple topics" when it comes to the producer as you specify the topic in each individual message that you want to send and they're batched up appropriately.

For example, if I do something simple like this with the sync producer and an empty Kafka cluster with auto.create.topics.enable=true in the kafka server.properties then the topics foobar-1, foobar-2 and foobar-3 are successfully created as the producer attempts to send messages:

topic := "foobar"
for i := 1; i < 4; i++ {
	producer.Input() <- &sarama.ProducerMessage{
		Topic:     topic + "-" + strconv.Itoa(i),
		Value:     sarama.StringEncoder(time.Now().String()),
		Headers:   []sarama.RecordHeader{},
		Timestamp: time.Now(),
	}
	message := <-producer.Successes()
	log.Printf(
		"Message produced: value = %s, timestamp = %v, topic = %s-%d", message.Value, message.Timestamp, message.Topic, message.Partition)
}

Consumers do have the concept of being given a list of topics to subscribe to, but I see the same behaviour there. If I bring up an empty Kafka cluster and point a consumer group at it with three topics, I see all three being autocreated successfully

topic := "barfoo"
group.Consume(ctx, []string{topic + "-1", topic + "-2", topic + "-3"}, &consumer)

@sonalys
Copy link
Author

sonalys commented Jan 7, 2025

Yes, that's what I meant, sorry for my english. Multiple topics -> a batch of messages with different topics.

I'm not experiencing the same behavior as you are. In my case I'm getting the UnknownPartitionOrTopic error.
When messages are produced individually, everything works as intended.

I'm running it with the confluentinc/confluent-local:7.5.0 and default configurations.

@dnwe
Copy link
Collaborator

dnwe commented Jan 7, 2025

Seems to work fine for me

Started up "confluent-local" exposing port 9092

docker run --rm -it -p 9092:9092 confluentinc/confluent-local:7.5.0

Ran my code sample from above using cfg.Version = sarama.V3_5_0_0 (as per confluent's version table) and I saw my topics auto-created as I expected and I could roundtrip messages fine

From the confluent-local logs we can see it forwarding the auto-create request to the active controller:

[2025-01-07 15:10:30,565] INFO Sent auto-creation request for Set(foobar-1, foobar-2, foobar-3) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
[2025-01-07 15:10:30,570] INFO [QuorumController id=1] CreateTopics result(s): CreatableTopic(name='foobar-1', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS, CreatableTopic(name='foobar-2', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS, CreatableTopic(name='foobar-3', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)

@sonalys
Copy link
Author

sonalys commented Jan 8, 2025

# Producer Code

producer, err := sarama.NewAsyncProducer(config.Config.BrokersAddr, saramaConfig)
if err != nil {
  return nil, err
}

go func() {
  for err := range producer.Errors() {
	  log.Error(context.Background(), "Failed to produce message", zap.Error(err))
  }
}()

msgs := make([]*sarama.ProducerMessage, 0, len(evts))
input := p.producer.Input()

for _, evt := range evts {
  msg := evt.toMessage()
  otel.GetTextMapPropagator().Inject(ctx, otelsarama.NewProducerMessageCarrier(msg))
  msgs = append(msgs, msg)
  input <- msg
}
# Consumer Code

consumer, err := sarama.NewConsumerGroup(config.Config.BrokersAddr, config.Group, saramaConfig)
if err != nil {
  return nil, fmt.Errorf("failed to create consumer group: %w", err)
}

done := make(chan struct{})
go consume(ctx, consumer, producer, config, done)

for {
if err := consumerGroup.Consume(ctx, topics, handler); err != nil {
	if !errors.Is(err, sarama.ErrClosedConsumerGroup) {
		log.Error(ctx, "error from consumer group", zap.Error(err))
	}
	return
}

if ctx.Err() != nil {
	return
}
# Test Code

t.Run("should produce and consume multiple topics", func(t *testing.T) {
  evt1 := newTestEvent()
  evt2 := newTestEvent()
  
  group := uuid.NewString()
  
  wg := sync.WaitGroup{}
  wg.Add(2)
  
  handlers := map[string]EventHandler{
	  evt1.GetTopic(): func(ctx context.Context, gotEvt EventReader) ([]Event, error) {
		  defer wg.Done()
		  assert.Equal(t, evt1, gotEvt)
		  return nil, nil
	  },
	  evt2.GetTopic(): func(ctx context.Context, gotEvt EventReader) ([]Event, error) {
		  defer wg.Done()
		  assert.Equal(t, evt2, gotEvt)
		  return nil, nil
	  },
  }
  
  cfg := ConsumerConfig{
	  Group: group,
	  Config: Config{
		  BrokersAddr: brokers,
	  },
	  Handlers: handlers,
  }
  
  _, err := StartNewConsumerGroup(ctx, cfg)
  require.NoError(t, err)
  
  producer, err := NewProducer(ProducerConfig{
	  Config: Config{
		  BrokersAddr: brokers,
	  },
  })
  require.NoError(t, err)
  defer producer.Shutdown()
  
  producer.Produce(ctx, evt1, evt2)
  wg.Wait()
})

I get the following from the test:

7:11AM	ERROR	Failed to produce message	{"error": "kafka: Failed to produce message to topic ndjdv: kafka server: Request was for a topic or partition that does not exist on this broker",

If I change the producer code to be synchronous:

producer, err := sarama.NewSyncProducer(config.Config.BrokersAddr, saramaConfig)
if err != nil {
return nil, err
}

func (p *Producer) Produce(ctx context.Context, evts ...Event) error {
	var errs []error

	for _, evt := range evts {
		msg := evt.toMessage()
		otel.GetTextMapPropagator().Inject(ctx, otelsarama.NewProducerMessageCarrier(msg))

		if _, _, err := p.producer.SendMessage(msg); err != nil {
			errs = append(errs, err)
		}
	}

	if len(errs) > 0 {
		return errors.Join(errs...)
	}

	return nil
}

It works normally.

I'm using this configuration:

config := sarama.NewConfig()

if fromConfig.DialTimeout > 0 {
config.Net.DialTimeout = fromConfig.DialTimeout
}

if fromConfig.SASL != nil {
fromConfig.SASL.ApplySASLSarama(config)
}

if fromConfig.TLS != nil {
fromConfig.TLS.ApplyTLSSarama(config)
}

config.Consumer.Offsets.Initial = sarama.OffsetOldest

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants