From bd46770de2c6176d16411f36212a73f347f89510 Mon Sep 17 00:00:00 2001 From: Adam Curtis Date: Tue, 27 Nov 2018 11:20:25 +0000 Subject: [PATCH] Allow multiple routing keys for sources. (#25) * Allow multiple routing keys for sources. Existing routingKey config item gets merged into routingKeys * Fixed panic with consumer and forwarder mapping * Rename some variables and structures to make their purpose clearer. --- README.md | 2 +- config/config.go | 13 +++++++------ examples/rabbit_to_lambda.json | 2 +- examples/rabbit_to_sns.json | 2 +- examples/rabbit_to_sqs.json | 2 +- mapping/mapping.go | 21 ++++++++++++++------- mapping/mapping_test.go | 8 ++++---- rabbitmq/consumer.go | 19 ++++++++++++++----- server.go | 4 ++-- supervisor/supervisor.go | 24 ++++++++++++------------ supervisor/supervisor_test.go | 12 ++++++------ 11 files changed, 63 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index c2af9a5..ca8ff14 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ Sample of RabbitMQ -> SNS mapping file. All fields are required. Samples are loc "connection" : "amqp://guest:guest@localhost:5672/", "topic" : "amq.topic", "queue" : "test-queue", - "routing" : "#" + "routingKeys" : ["#"] }, "destination" : { "type" : "SNS", diff --git a/config/config.go b/config/config.go index 18cbb82..73d2db9 100644 --- a/config/config.go +++ b/config/config.go @@ -7,12 +7,13 @@ const ( // RabbitEntry RabbitMQ mapping entry type RabbitEntry struct { - Type string `json:"type"` - Name string `json:"name"` - ConnectionURL string `json:"connection"` - ExchangeName string `json:"topic"` - QueueName string `json:"queue"` - RoutingKey string `json:"routing"` + Type string `json:"type"` + Name string `json:"name"` + ConnectionURL string `json:"connection"` + ExchangeName string `json:"topic"` + QueueName string `json:"queue"` + RoutingKey string `json:"routing"` + RoutingKeys []string `json:"routingKeys"` } // AmazonEntry SQS/SNS mapping entry diff --git a/examples/rabbit_to_lambda.json b/examples/rabbit_to_lambda.json index 0fcd325..6d83b6d 100644 --- a/examples/rabbit_to_lambda.json +++ b/examples/rabbit_to_lambda.json @@ -6,7 +6,7 @@ "connection" : "amqp://guest:guest@localhost:5672/", "topic" : "amq.topic", "queue" : "test-queue", - "routing" : "#" + "routingKeys" : ["#"] }, "destination" : { "type" : "Lambda", diff --git a/examples/rabbit_to_sns.json b/examples/rabbit_to_sns.json index dd3fb12..ae538d1 100644 --- a/examples/rabbit_to_sns.json +++ b/examples/rabbit_to_sns.json @@ -6,7 +6,7 @@ "connection" : "amqp://guest:guest@localhost:5672/", "topic" : "amq.topic", "queue" : "test-queue", - "routing" : "#" + "routingKeys" : ["#"] }, "destination" : { "type" : "SNS", diff --git a/examples/rabbit_to_sqs.json b/examples/rabbit_to_sqs.json index 8021402..41db54b 100644 --- a/examples/rabbit_to_sqs.json +++ b/examples/rabbit_to_sqs.json @@ -6,7 +6,7 @@ "connection" : "amqp://guest:guest@localhost:5672/", "topic" : "amq.topic", "queue" : "test-queue", - "routing" : "#" + "routingKeys" : ["#"] }, "destination" : { "type" : "SQS", diff --git a/mapping/mapping.go b/mapping/mapping.go index 307c554..2ce7e7d 100644 --- a/mapping/mapping.go +++ b/mapping/mapping.go @@ -2,10 +2,11 @@ package mapping import ( "encoding/json" - log "github.com/sirupsen/logrus" "io/ioutil" "os" + log "github.com/sirupsen/logrus" + "github.com/AirHelp/rabbit-amazon-forwarder/config" "github.com/AirHelp/rabbit-amazon-forwarder/consumer" "github.com/AirHelp/rabbit-amazon-forwarder/forwarder" @@ -33,6 +34,12 @@ type Helper interface { createForwarder(entry config.AmazonEntry) forwarder.Client } +// ConsumerForwarderMapping mapping for consumers and forwarders +type ConsumerForwarderMapping struct { + Consumer consumer.Client + Forwarder forwarder.Client +} + type helperImpl struct{} // New creates new mapping client @@ -46,23 +53,23 @@ func New(helpers ...Helper) Client { } // Load loads mappings -func (c Client) Load() (map[consumer.Client]forwarder.Client, error) { - consumerForwarderMap := make(map[consumer.Client]forwarder.Client) +func (c Client) Load() ([]ConsumerForwarderMapping, error) { + var consumerForwarderMapping []ConsumerForwarderMapping data, err := c.loadFile() if err != nil { - return consumerForwarderMap, err + return consumerForwarderMapping, err } var pairsList pairs if err = json.Unmarshal(data, &pairsList); err != nil { - return consumerForwarderMap, err + return consumerForwarderMapping, err } log.Info("Loading consumer - forwarder pairs") for _, pair := range pairsList { consumer := c.helper.createConsumer(pair.Source) forwarder := c.helper.createForwarder(pair.Destination) - consumerForwarderMap[consumer] = forwarder + consumerForwarderMapping = append(consumerForwarderMapping, ConsumerForwarderMapping{consumer, forwarder}) } - return consumerForwarderMap, nil + return consumerForwarderMapping, nil } func (c Client) loadFile() ([]byte, error) { diff --git a/mapping/mapping_test.go b/mapping/mapping_test.go index 9e75c1d..dc36b92 100644 --- a/mapping/mapping_test.go +++ b/mapping/mapping_test.go @@ -22,13 +22,13 @@ const ( func TestLoad(t *testing.T) { os.Setenv(config.MappingFile, "../tests/rabbit_to_sns.json") client := New(MockMappingHelper{}) - var consumerForwarderMap map[consumer.Client]forwarder.Client + var consumerForwarderMapping []ConsumerForwarderMapping var err error - if consumerForwarderMap, err = client.Load(); err != nil { + if consumerForwarderMapping, err = client.Load(); err != nil { t.Errorf("could not load mapping and start mocked rabbit->sns pair: %s", err.Error()) } - if len(consumerForwarderMap) != 1 { - t.Errorf("wrong consumerForwarderMap size, expected 1, got %d", len(consumerForwarderMap)) + if len(consumerForwarderMapping) != 1 { + t.Errorf("wrong consumerForwarderMapping size, expected 1, got %d", len(consumerForwarderMapping)) } } diff --git a/rabbitmq/consumer.go b/rabbitmq/consumer.go index a54070c..2befba7 100644 --- a/rabbitmq/consumer.go +++ b/rabbitmq/consumer.go @@ -3,9 +3,10 @@ package rabbitmq import ( "errors" "fmt" - log "github.com/sirupsen/logrus" "time" + log "github.com/sirupsen/logrus" + "github.com/AirHelp/rabbit-amazon-forwarder/config" "github.com/AirHelp/rabbit-amazon-forwarder/consumer" "github.com/AirHelp/rabbit-amazon-forwarder/forwarder" @@ -27,7 +28,7 @@ type Consumer struct { ConnectionURL string ExchangeName string QueueName string - RoutingKey string + RoutingKeys []string } // parameters for starting consumer @@ -42,7 +43,12 @@ type workerParams struct { // CreateConsumer creates consumer from string map func CreateConsumer(entry config.RabbitEntry) consumer.Client { - return Consumer{entry.Name, entry.ConnectionURL, entry.ExchangeName, entry.QueueName, entry.RoutingKey} + // merge RoutingKey with RoutingKeys + if entry.RoutingKey != "" { + entry.RoutingKeys = append(entry.RoutingKeys, entry.RoutingKey) + } + + return Consumer{entry.Name, entry.ConnectionURL, entry.ExchangeName, entry.QueueName, entry.RoutingKeys} } // Name consumer name @@ -132,8 +138,11 @@ func (c Consumer) setupExchangesAndQueues(conn *amqp.Connection, ch *amqp.Channe }); err != nil { return failOnError(err, "Failed to declare a queue:"+c.QueueName) } - if err = ch.QueueBind(c.QueueName, c.RoutingKey, c.ExchangeName, false, nil); err != nil { - return failOnError(err, "Failed to bind a queue:"+c.QueueName) + // bind all of the routing keys + for _, routingKey := range c.RoutingKeys { + if err = ch.QueueBind(c.QueueName, routingKey, c.ExchangeName, false, nil); err != nil { + return failOnError(err, "Failed to bind a queue:"+c.QueueName) + } } msgs, err := ch.Consume(c.QueueName, c.Name(), false, false, false, false, nil) diff --git a/server.go b/server.go index 0cb3863..92dc55a 100644 --- a/server.go +++ b/server.go @@ -15,11 +15,11 @@ const ( func main() { createLogger() - consumerForwarderMap, err := mapping.New().Load() + consumerForwarderMapping, err := mapping.New().Load() if err != nil { log.WithField("error", err.Error()).Fatalf("Could not load consumer - forwarder pairs") } - supervisor := supervisor.New(consumerForwarderMap) + supervisor := supervisor.New(consumerForwarderMapping) if err := supervisor.Start(); err != nil { log.WithField("error", err.Error()).Fatal("Could not start supervisor") } diff --git a/supervisor/supervisor.go b/supervisor/supervisor.go index e8b270b..cd5540c 100644 --- a/supervisor/supervisor.go +++ b/supervisor/supervisor.go @@ -3,13 +3,13 @@ package supervisor import ( "encoding/json" "fmt" - log "github.com/sirupsen/logrus" "net/http" "strings" "time" - "github.com/AirHelp/rabbit-amazon-forwarder/consumer" - "github.com/AirHelp/rabbit-amazon-forwarder/forwarder" + log "github.com/sirupsen/logrus" + + "github.com/AirHelp/rabbit-amazon-forwarder/mapping" ) const ( @@ -34,25 +34,25 @@ type consumerChannel struct { // Client supervisor client type Client struct { - mappings map[consumer.Client]forwarder.Client + mappings []mapping.ConsumerForwarderMapping consumers map[string]*consumerChannel } // New client for supervisor -func New(consumerForwarderMap map[consumer.Client]forwarder.Client) Client { - return Client{mappings: consumerForwarderMap} +func New(consumerForwarderMapping []mapping.ConsumerForwarderMapping) Client { + return Client{mappings: consumerForwarderMapping} } // Start starts supervisor func (c *Client) Start() error { c.consumers = make(map[string]*consumerChannel) - for consumer, forwarder := range c.mappings { - channel := makeConsumerChannel(forwarder.Name()) - c.consumers[forwarder.Name()] = channel - go consumer.Start(forwarder, channel.check, channel.stop) + for _, mappingEntry := range c.mappings { + channel := makeConsumerChannel(mappingEntry.Forwarder.Name()) + c.consumers[mappingEntry.Forwarder.Name()] = channel + go mappingEntry.Consumer.Start(mappingEntry.Forwarder, channel.check, channel.stop) log.WithFields(log.Fields{ - "consumerName": consumer.Name(), - "forwarderName": forwarder.Name()}).Info("Started consumer with forwarder") + "consumerName": mappingEntry.Consumer.Name(), + "forwarderName": mappingEntry.Forwarder.Name()}).Info("Started consumer with forwarder") } return nil } diff --git a/supervisor/supervisor_test.go b/supervisor/supervisor_test.go index 7f30bf5..ac80bed 100644 --- a/supervisor/supervisor_test.go +++ b/supervisor/supervisor_test.go @@ -7,8 +7,8 @@ import ( "net/http/httptest" "testing" - "github.com/AirHelp/rabbit-amazon-forwarder/consumer" "github.com/AirHelp/rabbit-amazon-forwarder/forwarder" + "github.com/AirHelp/rabbit-amazon-forwarder/mapping" ) func TestStart(t *testing.T) { @@ -97,11 +97,11 @@ func TestCheck(t *testing.T) { } } -func prepareConsumers() map[consumer.Client]forwarder.Client { - consumers := make(map[consumer.Client]forwarder.Client) - consumers[MockRabbitConsumer{"rabbit1"}] = MockSNSForwarder{"sns"} - consumers[MockRabbitConsumer{"rabbit2"}] = MockSQSForwarder{"sqs"} - consumers[MockRabbitConsumer{"rabbit3"}] = MockLambdaForwarder{"lambda"} +func prepareConsumers() []mapping.ConsumerForwarderMapping { + var consumers []mapping.ConsumerForwarderMapping + consumers = append(consumers, mapping.ConsumerForwarderMapping{Consumer: MockRabbitConsumer{"rabbit"}, Forwarder: MockSNSForwarder{"sns"}}) + consumers = append(consumers, mapping.ConsumerForwarderMapping{Consumer: MockRabbitConsumer{"rabbit"}, Forwarder: MockSQSForwarder{"sqs"}}) + consumers = append(consumers, mapping.ConsumerForwarderMapping{Consumer: MockRabbitConsumer{"rabbit"}, Forwarder: MockLambdaForwarder{"lambda"}}) return consumers }