Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
Detect closed channel
Browse files Browse the repository at this point in the history
  • Loading branch information
jbygdell committed Feb 9, 2023
1 parent a8409b2 commit 0f8cb6a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
24 changes: 24 additions & 0 deletions messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ func NewAMQPMessenger(c BrokerConfig, tlsConfig *tls.Config) (*AMQPMessenger, er

// SendMessage sends message to RabbitMQ if the upload is finished
func (m *AMQPMessenger) SendMessage(corrID string, body []byte) error {
if m.channel.IsClosed() {
log.Debugln("channel closed, reconnecting")
if err := m.createNewChannel(); err != nil {
return fmt.Errorf("failed to recreate channel: %v", err)
}
}

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := m.channel.PublishWithContext(
Expand Down Expand Up @@ -133,3 +140,20 @@ func buildMqURI(mqHost, mqPort, mqUser, mqPassword, mqVhost string, ssl bool) st

return brokerURI
}

func (m *AMQPMessenger) createNewChannel() error {
c, err := m.connection.Channel()
if err != nil {
return err
}
confirmsChan := make(chan amqp.Confirmation, 1)
if err := c.Confirm(false); err != nil {
close(confirmsChan)
log.Fatalf("Channel could not be put into confirm mode: %s\n", err)
}
log.Debugln("recconected to new channel")
m.channel = c
m.confirmsChan = c.NotifyPublish(confirmsChan)

return nil
}
34 changes: 34 additions & 0 deletions messenger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,37 @@ func TestSendMessage(t *testing.T) {
err = messenger.SendMessage(uuid.String(), jsonMessage)
assert.NoError(t, err)
}

func TestCreateNewChannel(t *testing.T) {
viper.Reset()
viper.Set("server.confFile", "dev_utils/config.yaml")

config, err := NewConfig()
assert.NotNil(t, config)
assert.NoError(t, err)
tlsConfig, err := TLSConfigBroker(config)
if err != nil {
t.Log(err)
t.Skip("skip test since certificates are not present")
}
assert.NotNil(t, tlsConfig)
assert.NoError(t, err)

messenger, err := NewAMQPMessenger(config.Broker, tlsConfig)
messenger.channel.Close()
assert.NoError(t, err)
event := Event{}
checksum := Checksum{}
event.Operation = "TestRecreateChannel"
event.Username = "Dummy"
checksum.Type = "md5"
checksum.Value = "123456789"
event.Checksum = []interface{}{checksum}

jsonMessage, err := json.Marshal(event)
assert.NoError(t, err)
uuid, _ := uuid.NewRandom()
t.Log("uuid: ", uuid)
err = messenger.SendMessage(uuid.String(), jsonMessage)
assert.NoError(t, err)
}

0 comments on commit 0f8cb6a

Please sign in to comment.