Skip to content
This repository has been archived by the owner on Feb 10, 2025. It is now read-only.

Commit

Permalink
Allow multiple routing keys for sources. (#25)
Browse files Browse the repository at this point in the history
* Allow multiple routing keys for sources.

Existing routingKey config item gets merged into routingKeys

* Fixed panic with consumer and forwarder mapping

* Rename some variables and structures to make their purpose clearer.
  • Loading branch information
adamlc authored and filiphaftek committed Nov 27, 2018
1 parent 942f6a3 commit bd46770
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 46 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Sample of RabbitMQ -> SNS mapping file. All fields are required. Samples are loc
"connection" : "amqp://guest:guest@localhost:5672/",
"topic" : "amq.topic",
"queue" : "test-queue",
"routing" : "#"
"routingKeys" : ["#"]
},
"destination" : {
"type" : "SNS",
Expand Down
13 changes: 7 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ const (

// RabbitEntry RabbitMQ mapping entry
type RabbitEntry struct {
Type string `json:"type"`
Name string `json:"name"`
ConnectionURL string `json:"connection"`
ExchangeName string `json:"topic"`
QueueName string `json:"queue"`
RoutingKey string `json:"routing"`
Type string `json:"type"`
Name string `json:"name"`
ConnectionURL string `json:"connection"`
ExchangeName string `json:"topic"`
QueueName string `json:"queue"`
RoutingKey string `json:"routing"`
RoutingKeys []string `json:"routingKeys"`
}

// AmazonEntry SQS/SNS mapping entry
Expand Down
2 changes: 1 addition & 1 deletion examples/rabbit_to_lambda.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"connection" : "amqp://guest:guest@localhost:5672/",
"topic" : "amq.topic",
"queue" : "test-queue",
"routing" : "#"
"routingKeys" : ["#"]
},
"destination" : {
"type" : "Lambda",
Expand Down
2 changes: 1 addition & 1 deletion examples/rabbit_to_sns.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"connection" : "amqp://guest:guest@localhost:5672/",
"topic" : "amq.topic",
"queue" : "test-queue",
"routing" : "#"
"routingKeys" : ["#"]
},
"destination" : {
"type" : "SNS",
Expand Down
2 changes: 1 addition & 1 deletion examples/rabbit_to_sqs.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"connection" : "amqp://guest:guest@localhost:5672/",
"topic" : "amq.topic",
"queue" : "test-queue",
"routing" : "#"
"routingKeys" : ["#"]
},
"destination" : {
"type" : "SQS",
Expand Down
21 changes: 14 additions & 7 deletions mapping/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package mapping

import (
"encoding/json"
log "github.com/sirupsen/logrus"
"io/ioutil"
"os"

log "github.com/sirupsen/logrus"

"github.com/AirHelp/rabbit-amazon-forwarder/config"
"github.com/AirHelp/rabbit-amazon-forwarder/consumer"
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
Expand Down Expand Up @@ -33,6 +34,12 @@ type Helper interface {
createForwarder(entry config.AmazonEntry) forwarder.Client
}

// ConsumerForwarderMapping mapping for consumers and forwarders
type ConsumerForwarderMapping struct {
Consumer consumer.Client
Forwarder forwarder.Client
}

type helperImpl struct{}

// New creates new mapping client
Expand All @@ -46,23 +53,23 @@ func New(helpers ...Helper) Client {
}

// Load loads mappings
func (c Client) Load() (map[consumer.Client]forwarder.Client, error) {
consumerForwarderMap := make(map[consumer.Client]forwarder.Client)
func (c Client) Load() ([]ConsumerForwarderMapping, error) {
var consumerForwarderMapping []ConsumerForwarderMapping
data, err := c.loadFile()
if err != nil {
return consumerForwarderMap, err
return consumerForwarderMapping, err
}
var pairsList pairs
if err = json.Unmarshal(data, &pairsList); err != nil {
return consumerForwarderMap, err
return consumerForwarderMapping, err
}
log.Info("Loading consumer - forwarder pairs")
for _, pair := range pairsList {
consumer := c.helper.createConsumer(pair.Source)
forwarder := c.helper.createForwarder(pair.Destination)
consumerForwarderMap[consumer] = forwarder
consumerForwarderMapping = append(consumerForwarderMapping, ConsumerForwarderMapping{consumer, forwarder})
}
return consumerForwarderMap, nil
return consumerForwarderMapping, nil
}

func (c Client) loadFile() ([]byte, error) {
Expand Down
8 changes: 4 additions & 4 deletions mapping/mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ const (
func TestLoad(t *testing.T) {
os.Setenv(config.MappingFile, "../tests/rabbit_to_sns.json")
client := New(MockMappingHelper{})
var consumerForwarderMap map[consumer.Client]forwarder.Client
var consumerForwarderMapping []ConsumerForwarderMapping
var err error
if consumerForwarderMap, err = client.Load(); err != nil {
if consumerForwarderMapping, err = client.Load(); err != nil {
t.Errorf("could not load mapping and start mocked rabbit->sns pair: %s", err.Error())
}
if len(consumerForwarderMap) != 1 {
t.Errorf("wrong consumerForwarderMap size, expected 1, got %d", len(consumerForwarderMap))
if len(consumerForwarderMapping) != 1 {
t.Errorf("wrong consumerForwarderMapping size, expected 1, got %d", len(consumerForwarderMapping))
}
}

Expand Down
19 changes: 14 additions & 5 deletions rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package rabbitmq
import (
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"time"

log "github.com/sirupsen/logrus"

"github.com/AirHelp/rabbit-amazon-forwarder/config"
"github.com/AirHelp/rabbit-amazon-forwarder/consumer"
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
Expand All @@ -27,7 +28,7 @@ type Consumer struct {
ConnectionURL string
ExchangeName string
QueueName string
RoutingKey string
RoutingKeys []string
}

// parameters for starting consumer
Expand All @@ -42,7 +43,12 @@ type workerParams struct {

// CreateConsumer creates consumer from string map
func CreateConsumer(entry config.RabbitEntry) consumer.Client {
return Consumer{entry.Name, entry.ConnectionURL, entry.ExchangeName, entry.QueueName, entry.RoutingKey}
// merge RoutingKey with RoutingKeys
if entry.RoutingKey != "" {
entry.RoutingKeys = append(entry.RoutingKeys, entry.RoutingKey)
}

return Consumer{entry.Name, entry.ConnectionURL, entry.ExchangeName, entry.QueueName, entry.RoutingKeys}
}

// Name consumer name
Expand Down Expand Up @@ -132,8 +138,11 @@ func (c Consumer) setupExchangesAndQueues(conn *amqp.Connection, ch *amqp.Channe
}); err != nil {
return failOnError(err, "Failed to declare a queue:"+c.QueueName)
}
if err = ch.QueueBind(c.QueueName, c.RoutingKey, c.ExchangeName, false, nil); err != nil {
return failOnError(err, "Failed to bind a queue:"+c.QueueName)
// bind all of the routing keys
for _, routingKey := range c.RoutingKeys {
if err = ch.QueueBind(c.QueueName, routingKey, c.ExchangeName, false, nil); err != nil {
return failOnError(err, "Failed to bind a queue:"+c.QueueName)
}
}

msgs, err := ch.Consume(c.QueueName, c.Name(), false, false, false, false, nil)
Expand Down
4 changes: 2 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ const (
func main() {
createLogger()

consumerForwarderMap, err := mapping.New().Load()
consumerForwarderMapping, err := mapping.New().Load()
if err != nil {
log.WithField("error", err.Error()).Fatalf("Could not load consumer - forwarder pairs")
}
supervisor := supervisor.New(consumerForwarderMap)
supervisor := supervisor.New(consumerForwarderMapping)
if err := supervisor.Start(); err != nil {
log.WithField("error", err.Error()).Fatal("Could not start supervisor")
}
Expand Down
24 changes: 12 additions & 12 deletions supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package supervisor
import (
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
"net/http"
"strings"
"time"

"github.com/AirHelp/rabbit-amazon-forwarder/consumer"
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
log "github.com/sirupsen/logrus"

"github.com/AirHelp/rabbit-amazon-forwarder/mapping"
)

const (
Expand All @@ -34,25 +34,25 @@ type consumerChannel struct {

// Client supervisor client
type Client struct {
mappings map[consumer.Client]forwarder.Client
mappings []mapping.ConsumerForwarderMapping
consumers map[string]*consumerChannel
}

// New client for supervisor
func New(consumerForwarderMap map[consumer.Client]forwarder.Client) Client {
return Client{mappings: consumerForwarderMap}
func New(consumerForwarderMapping []mapping.ConsumerForwarderMapping) Client {
return Client{mappings: consumerForwarderMapping}
}

// Start starts supervisor
func (c *Client) Start() error {
c.consumers = make(map[string]*consumerChannel)
for consumer, forwarder := range c.mappings {
channel := makeConsumerChannel(forwarder.Name())
c.consumers[forwarder.Name()] = channel
go consumer.Start(forwarder, channel.check, channel.stop)
for _, mappingEntry := range c.mappings {
channel := makeConsumerChannel(mappingEntry.Forwarder.Name())
c.consumers[mappingEntry.Forwarder.Name()] = channel
go mappingEntry.Consumer.Start(mappingEntry.Forwarder, channel.check, channel.stop)
log.WithFields(log.Fields{
"consumerName": consumer.Name(),
"forwarderName": forwarder.Name()}).Info("Started consumer with forwarder")
"consumerName": mappingEntry.Consumer.Name(),
"forwarderName": mappingEntry.Forwarder.Name()}).Info("Started consumer with forwarder")
}
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"net/http/httptest"
"testing"

"github.com/AirHelp/rabbit-amazon-forwarder/consumer"
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
"github.com/AirHelp/rabbit-amazon-forwarder/mapping"
)

func TestStart(t *testing.T) {
Expand Down Expand Up @@ -97,11 +97,11 @@ func TestCheck(t *testing.T) {
}
}

func prepareConsumers() map[consumer.Client]forwarder.Client {
consumers := make(map[consumer.Client]forwarder.Client)
consumers[MockRabbitConsumer{"rabbit1"}] = MockSNSForwarder{"sns"}
consumers[MockRabbitConsumer{"rabbit2"}] = MockSQSForwarder{"sqs"}
consumers[MockRabbitConsumer{"rabbit3"}] = MockLambdaForwarder{"lambda"}
func prepareConsumers() []mapping.ConsumerForwarderMapping {
var consumers []mapping.ConsumerForwarderMapping
consumers = append(consumers, mapping.ConsumerForwarderMapping{Consumer: MockRabbitConsumer{"rabbit"}, Forwarder: MockSNSForwarder{"sns"}})
consumers = append(consumers, mapping.ConsumerForwarderMapping{Consumer: MockRabbitConsumer{"rabbit"}, Forwarder: MockSQSForwarder{"sqs"}})
consumers = append(consumers, mapping.ConsumerForwarderMapping{Consumer: MockRabbitConsumer{"rabbit"}, Forwarder: MockLambdaForwarder{"lambda"}})
return consumers
}

Expand Down

0 comments on commit bd46770

Please sign in to comment.