Skip to content

Commit

Permalink
Add multi exchange support
Browse files Browse the repository at this point in the history
  • Loading branch information
ghokun committed Aug 14, 2023
1 parent d4035c4 commit bd00228
Showing 1 changed file with 113 additions and 25 deletions.
138 changes: 113 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"os"
"os/signal"
"strings"

"github.com/fatih/color"
"github.com/google/uuid"
Expand All @@ -20,6 +21,60 @@ import (

var Version = "development"

const (
usage = `coyote [global options]
Examples:
coyote --url amqps://user@myurl --exchange myexchange --store events.sqlite
coyote --url amqps://user:password@myurl --noprompt --exchange myexchange --store events.sqlite
coyote --url amqps://user:password@myurl --noprompt --insecure --exchange myexchange
Exchange binding formats:
--exchange myexchange # All messages in single exchange
--exchange myexchange1=mykey1 # Messages with routing key in a single exchange
--exchange myexchange1=mykey1,myexchange1=mykey2 # Messages with routing keys in a single exchange
--exchange myexchange1,myexchange2 # All messages in multiple exchanges
--exchange myexchange1=mykey1,myexchange2=mykey2 # Messages with routing keys in multiple exchanges
--exchange myexchange1,myexchange2=mykey2 # Messages with or without routing keys in multiple exchanges`
)

type listen struct {
c []combination
}

type combination struct {
exchange string
routingKey string
}

func (l *listen) Set(value string) (err error) {
for _, comb := range strings.Split(value, ",") {
pair := strings.Split(comb, "=")
length := len(pair)
if length == 1 {
if len(pair[0]) < 1 {
return fmt.Errorf("exchange name can not be empty")
}
l.c = append(l.c, combination{exchange: pair[0], routingKey: "#"})
} else if length == 2 {
if len(pair[0]) < 1 {
return fmt.Errorf("exchange name can not be empty")
}
if len(pair[1]) < 1 {
return fmt.Errorf("routing key can not be empty when '=' is provided")
}
l.c = append(l.c, combination{exchange: pair[0], routingKey: pair[1]})
} else {
return fmt.Errorf("valid values are ['a=x' 'a,b' 'a=x,b=y' 'a,b=y'] where a and b are exchanges, x and y are routing keys")
}
}
return nil
}

func (l *listen) String() string {
return ""
}

func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -41,30 +96,27 @@ func main() {
}()

app := &cli.App{
Name: "coyote",
Usage: "Coyote is a RabbitMQ message sink.",
Version: Version,
Name: "coyote",
Usage: "Coyote is a RabbitMQ message sink.",
Version: Version,
UsageText: usage,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "url",
Required: true,
Usage: "RabbitMQ url, must start with amqps:// or amqp://.",
},
&cli.StringFlag{
&cli.GenericFlag{
Name: "exchange",
Required: true,
Usage: "Exchange name to listen messages.",
Value: &listen{},
Usage: "Exchange & routing key combinations to listen messages.",
},
&cli.StringFlag{
Name: "queue",
Value: "interceptor",
Usage: "Interceptor queue name.",
},
&cli.StringFlag{
Name: "bind",
Value: "#",
Usage: "Routing key to bind.",
},
&cli.StringFlag{
Name: "store",
Usage: "SQLite filename to store events.",
Expand Down Expand Up @@ -108,22 +160,55 @@ func main() {
}
defer ch.Close()

err = ch.ExchangeDeclarePassive(ctx.String("exchange"), "topic", false, true, false, false, nil)
if err != nil {
return fmt.Errorf("%s %w", color.RedString("failed to connect to exchange:"), err)
}

q, err := ch.QueueDeclare(fmt.Sprintf("%s.%s", ctx.String("queue"), uuid.NewString()), false, true, false, false, nil)
q, err := ch.QueueDeclare(
fmt.Sprintf("%s.%s", ctx.String("queue"), uuid.NewString()), // queue name
false, // is durable
true, // is auto delete
true, // is exclusive
false, // is no wait
nil, // args
)
if err != nil {
return fmt.Errorf("%s %w", color.RedString("failed to declare a queue:"), err)
}

err = ch.QueueBind(q.Name, ctx.String("bind"), ctx.String("exchange"), false, nil)
if err != nil {
return fmt.Errorf("%s %w", color.RedString("failed to bind to queue:"), err)
for _, c := range ctx.Generic("exchange").(*listen).c {
err = ch.ExchangeDeclarePassive(
c.exchange, // exchange name
"topic", // exchange kind
true, // is durable
false, // is auto delete
false, // is internal
false, // is no wait
nil, // args
)
if err != nil {
return fmt.Errorf("%s %w", color.RedString("failed to connect to exchange:"), err)
}

err = ch.QueueBind(
q.Name, // interceptor queue name
c.routingKey, // routing key to bind
c.exchange, // exchange to listen
false, // is no wait
nil, // args
)
if err != nil {
return fmt.Errorf("%s %w", color.RedString("failed to bind to queue:"), err)
} else {
log.Printf("👂 Listening from exchange %s with routing key %s", color.YellowString(c.exchange), color.YellowString(c.routingKey))
}
}

deliveries, err := ch.Consume(q.Name, "", true, false, false, false, nil)
deliveries, err := ch.Consume(
q.Name, // queue name to consume from
"", // consumer tag
true, // is auto ack
false, // is exclusive
false, // is no local
false, // is no wait
nil, // args
)
if err != nil {
return fmt.Errorf("%s %w", color.RedString("failed to register a consumer:"), err)
}
Expand All @@ -147,6 +232,7 @@ func main() {
(
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"timestamp" TIMESTAMP DEFAULT (DATETIME(CURRENT_TIMESTAMP, 'localtime')),
"exchange" TEXT,
"routing_key" TEXT,
"correlation_id" TEXT,
"reply_to" TEXT,
Expand All @@ -159,15 +245,17 @@ func main() {
if _, err := statement.Exec(); err != nil {
log.Fatal(err)
}
insert, err = db.Prepare(`INSERT INTO event(routing_key, correlation_id, reply_to, headers, body)
VALUES (?, ?, ?, ?, ?)`)
insert, err = db.Prepare(`INSERT INTO event(exchange, routing_key, correlation_id, reply_to, headers, body)
VALUES (?, ?, ?, ?, ?, ?)`)
if err != nil {
log.Fatal(err)
}
}
for d := range deliveries {
log.Printf("📧 %s\n%s%s\n%s%s\n%s%s\n%s%s\n%s%s",
log.Printf("📧 %s\n%s%s\n%s%s\n%s%s\n%s%s\n%s%s\n%s%s",
color.YellowString("Received a message"),
color.GreenString("# Exchange : "),
d.Exchange,
color.GreenString("# Routing-key : "),
d.RoutingKey,
color.GreenString("# Correlation-id : "),
Expand All @@ -179,14 +267,14 @@ func main() {
color.GreenString("# Body : "),
d.Body)
if insert != nil {
if _, err := insert.Exec(d.RoutingKey, d.CorrelationId, d.ReplyTo, fmt.Sprint(d.Headers), string(d.Body)); err != nil {
if _, err := insert.Exec(d.Exchange, d.RoutingKey, d.CorrelationId, d.ReplyTo, fmt.Sprint(d.Headers), string(d.Body)); err != nil {
log.Fatal(err)
}
}
}
}()

log.Printf("⏳ Waiting for messages. To exit press CTRL+C")
log.Printf("⏳ Waiting for messages. To exit press %s", color.YellowString("CTRL+C"))
<-ctx.Done()
return nil
},
Expand Down

0 comments on commit bd00228

Please sign in to comment.