Skip to content
This repository has been archived by the owner on Feb 10, 2025. It is now read-only.

Commit

Permalink
Added logging in json
Browse files Browse the repository at this point in the history
  • Loading branch information
Filip committed Sep 29, 2017
1 parent a6194a1 commit 3833909
Show file tree
Hide file tree
Showing 273 changed files with 149,454 additions and 48 deletions.
17 changes: 11 additions & 6 deletions lambda/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package lambda

import (
"errors"
"log"

"github.com/AirHelp/rabbit-amazon-forwarder/config"
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/aws/aws-sdk-go/service/lambda/lambdaiface"
log "github.com/sirupsen/logrus"
)

const (
Expand All @@ -33,7 +32,7 @@ func CreateForwarder(entry config.AmazonEntry, lambdaClient ...lambdaiface.Lambd
client = lambda.New(session.Must(session.NewSession()))
}
forwarder := Forwarder{entry.Name, client, entry.Target}
log.Print("Created forwarder: ", forwarder.Name())
log.WithFields(log.Fields{"forwarderName": forwarder.Name()}).Info("Created forwarder")
return forwarder
}

Expand All @@ -53,13 +52,19 @@ func (f Forwarder) Push(message string) error {
}
resp, err := f.lambdaClient.Invoke(params)
if err != nil {
log.Printf("[%s] Could not forward message. Error: %s", f.Name(), err.Error())
log.WithFields(log.Fields{
"forwarderName": f.Name(),
"error": err.Error()}).Error("Could not forward message")
return err
}
if resp.FunctionError != nil {
log.Printf("[%s] Could not forward message. Function error: %s", f.Name(), *resp.FunctionError)
log.WithFields(log.Fields{
"forwarderName": f.Name(),
"functionError": *resp.FunctionError}).Errorf("Could not forward message")
return errors.New(*resp.FunctionError)
}
log.Printf("[%s] Forward succeeded. Code:%d, body:%s", f.Name(), resp.StatusCode, string(resp.Payload))
log.WithFields(log.Fields{
"forwarderName": f.Name(),
"statusCode": resp.StatusCode}).Info("Forward succeeded")
return nil
}
14 changes: 9 additions & 5 deletions mapping/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package mapping

import (
"encoding/json"
log "github.com/sirupsen/logrus"
"io/ioutil"
"log"
"os"

"github.com/AirHelp/rabbit-amazon-forwarder/config"
Expand Down Expand Up @@ -56,7 +56,7 @@ func (c Client) Load() (map[consumer.Client]forwarder.Client, error) {
if err = json.Unmarshal(data, &pairsList); err != nil {
return consumerForwarderMap, err
}
log.Print("Loading consumer->forwader pairs")
log.Info("Loading consumer - forwarder pairs")
for _, pair := range pairsList {
consumer := c.helper.createConsumer(pair.Source)
forwarder := c.helper.createForwarder(pair.Destination)
Expand All @@ -67,12 +67,14 @@ func (c Client) Load() (map[consumer.Client]forwarder.Client, error) {

func (c Client) loadFile() ([]byte, error) {
filePath := os.Getenv(config.MappingFile)
log.Print("Loading mapping file: ", filePath)
log.WithFields(log.Fields{"mappingFile": filePath}).Info("Loading mapping file")
return ioutil.ReadFile(filePath)
}

func (h helperImpl) createConsumer(entry config.RabbitEntry) consumer.Client {
log.Printf("Creating consumer: [%s, %s]", entry.Type, entry.Name)
log.WithFields(log.Fields{
"consumerType": entry.Type,
"consumerName": entry.Name}).Info("Creating consumer")
switch entry.Type {
case rabbitmq.Type:
return rabbitmq.CreateConsumer(entry)
Expand All @@ -81,7 +83,9 @@ func (h helperImpl) createConsumer(entry config.RabbitEntry) consumer.Client {
}

func (h helperImpl) createForwarder(entry config.AmazonEntry) forwarder.Client {
log.Printf("Creating forwarder: [%s, %s]", entry.Type, entry.Name)
log.WithFields(log.Fields{
"forwarderType": entry.Type,
"forwarderName": entry.Name}).Info("Creating forwarder")
switch entry.Type {
case sns.Type:
return sns.CreateForwarder(entry)
Expand Down
45 changes: 31 additions & 14 deletions rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package rabbitmq
import (
"errors"
"fmt"
"log"
log "github.com/sirupsen/logrus"
"time"

"github.com/AirHelp/rabbit-amazon-forwarder/config"
Expand Down Expand Up @@ -40,7 +40,7 @@ type workerParams struct {
ch *amqp.Channel
}

// CreateConsumer creates conusmer from string map
// CreateConsumer creates consumer from string map
func CreateConsumer(entry config.RabbitEntry) consumer.Client {
return Consumer{entry.Name, entry.ConnectionURL, entry.ExchangeName, entry.QueueName, entry.RoutingKey}
}
Expand All @@ -52,11 +52,13 @@ func (c Consumer) Name() string {

// Start start consuming messages from Rabbit queue
func (c Consumer) Start(forwarder forwarder.Client, check chan bool, stop chan bool) error {
log.Print("Starting consumer with params: ", c)
log.WithFields(log.Fields{
"exchangeName": c.ExchangeName,
"queueName": c.QueueName}).Info("Starting connecting consumer")
for {
delivery, conn, ch, err := c.initRabbitMQ()
if err != nil {
log.Print(err)
log.Error(err)
closeRabbitMQ(conn, ch)
time.Sleep(ReconnectRabbitMQInterval * time.Second)
continue
Expand All @@ -70,15 +72,17 @@ func (c Consumer) Start(forwarder forwarder.Client, check chan bool, stop chan b
}

func closeRabbitMQ(conn *amqp.Connection, ch *amqp.Channel) {
log.Print("Closing RabbitMQ connection and channel")
log.Info("Closing RabbitMQ connection and channel")
if ch != nil {
if err := ch.Close(); err != nil {
log.Print("Could not close channel. Error: ", err)
log.WithFields(log.Fields{
"error": err.Error()}).Error("Could not close channel")
}
}
if conn != nil {
if err := conn.Close(); err != nil {
log.Print("Could not close connection. Error: ", err)
log.WithFields(log.Fields{
"error": err.Error()}).Error("Could not close connection")
}
}
}
Expand Down Expand Up @@ -143,31 +147,44 @@ func (c Consumer) setupExchangesAndQueues(conn *amqp.Connection, ch *amqp.Channe

func (c Consumer) startForwarding(params *workerParams) error {
forwarderName := params.forwarder.Name()
log.Printf("[%s] Started forwarding messages to %s", c.Name(), forwarderName)
log.WithFields(log.Fields{
"consumerName": c.Name(),
"forwarderName": forwarderName}).Info("Started forwarding messages")
for {
select {
case d, ok := <-params.msgs:
if !ok { // channel already closed
closeRabbitMQ(params.conn, params.ch)
return errors.New(channelClosedMessage)
}
log.Printf("[%s] Message to forward: %v", c.Name(), d.MessageId)
log.WithFields(log.Fields{
"consumerName": c.Name(),
"messageID": d.MessageId}).Info("Message to forward")
err := params.forwarder.Push(string(d.Body))
if err != nil {
log.Printf("[%s] Could not forward message. Error: %v", forwarderName, err)
log.WithFields(log.Fields{
"forwarderName": forwarderName,
"error": err.Error()}).Error("Could not forward message")
if err = d.Reject(false); err != nil {
log.Printf("[%s] Could not reject message. Error: %v", forwarderName, err)
log.WithFields(log.Fields{
"forwarderName": forwarderName,
"error": err.Error()}).Error("Could not reject message")
}

} else {
if err := d.Ack(true); err != nil {
log.Println("Could not ack message with id:", d.MessageId)
log.WithFields(log.Fields{
"forwarderName": forwarderName,
"error": err.Error(),
"messageID": d.MessageId}).Error("Could not ack message")
}
}
case <-params.check:
log.Printf("[%s] Checking", forwarderName)
log.WithFields(log.Fields{
"forwarderName": forwarderName}).Info("Checking")
case <-params.stop:
log.Printf("[%s] Closing", forwarderName)
log.WithFields(log.Fields{
"forwarderName": forwarderName}).Info("Closing")
closeRabbitMQ(params.conn, params.ch)
return errors.New(closedBySupervisorMessage)
}
Expand Down
17 changes: 11 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
package main

import (
"log"
"net/http"

"github.com/AirHelp/rabbit-amazon-forwarder/mapping"
"github.com/AirHelp/rabbit-amazon-forwarder/supervisor"
log "github.com/sirupsen/logrus"
"net/http"
"os"
)

func main() {
log.SetFormatter(&log.TextFormatter{})
log.SetOutput(os.Stdout)

consumerForwarderMap, err := mapping.New().Load()
if err != nil {
log.Fatalf("Could not load consumer->forwader pairs. Error: " + err.Error())
log.WithFields(log.Fields{
"error": err.Error()}).Fatalf("Could not load consumer - forwarder pairs. Error: " + err.Error())
}
supervisor := supervisor.New(consumerForwarderMap)
if err := supervisor.Start(); err != nil {
log.Fatal("Could not start supervisor. Error: ", err.Error())
log.WithFields(log.Fields{
"error": err.Error()}).Fatal("Could not start supervisor")
}
http.HandleFunc("/restart", supervisor.Restart)
http.HandleFunc("/health", supervisor.Check)
log.Print("Starting http server")
log.Info("Starting http server")
log.Fatal(http.ListenAndServe(":8080", nil))
}
12 changes: 8 additions & 4 deletions sns/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sns

import (
"errors"
"log"
log "github.com/sirupsen/logrus"

"github.com/AirHelp/rabbit-amazon-forwarder/config"
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
Expand Down Expand Up @@ -33,7 +33,7 @@ func CreateForwarder(entry config.AmazonEntry, snsClient ...snsiface.SNSAPI) for
client = sns.New(session.Must(session.NewSession()))
}
forwarder := Forwarder{entry.Name, client, entry.Target}
log.Print("Created forwarder: ", forwarder.Name())
log.WithFields(log.Fields{"forwarderName": forwarder.Name()}).Info("Created forwarder")
return forwarder
}

Expand All @@ -54,9 +54,13 @@ func (f Forwarder) Push(message string) error {

resp, err := f.snsClient.Publish(params)
if err != nil {
log.Printf("[%s] Could not forward message. Error: %s", f.Name(), err.Error())
log.WithFields(log.Fields{
"forwarderName": f.Name(),
"error": err.Error()}).Error("Could not forward message")
return err
}
log.Printf("[%s] Forward succeeded. Response: %v", f.Name(), resp)
log.WithFields(log.Fields{
"forwarderName": f.Name(),
"responseID": resp.MessageId}).Info("Forward succeeded")
return nil
}
12 changes: 8 additions & 4 deletions sqs/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sqs

import (
"errors"
"log"
log "github.com/sirupsen/logrus"

"github.com/AirHelp/rabbit-amazon-forwarder/config"
"github.com/AirHelp/rabbit-amazon-forwarder/forwarder"
Expand Down Expand Up @@ -33,7 +33,7 @@ func CreateForwarder(entry config.AmazonEntry, sqsClient ...sqsiface.SQSAPI) for
client = sqs.New(session.Must(session.NewSession()))
}
forwarder := Forwarder{entry.Name, client, entry.Target}
log.Print("Created forwarder: ", forwarder.Name())
log.WithFields(log.Fields{"forwarderName": forwarder.Name()}).Info("Created forwarder")
return forwarder
}

Expand All @@ -55,9 +55,13 @@ func (f Forwarder) Push(message string) error {
resp, err := f.sqsClient.SendMessage(params)

if err != nil {
log.Printf("[%s] Could not forward message. Error: %s", f.Name(), err.Error())
log.WithFields(log.Fields{
"forwarderName": f.Name(),
"error": err.Error()}).Error("Could not forward message")
return err
}
log.Printf("[%s] Forward succeeded. Response: %s", f.Name(), resp)
log.WithFields(log.Fields{
"forwarderName": f.Name(),
"responseID": resp.MessageId}).Info("Forward succeeded")
return nil
}
20 changes: 11 additions & 9 deletions supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package supervisor
import (
"encoding/json"
"fmt"
"log"
log "github.com/sirupsen/logrus"
"net/http"
"strings"
"time"
Expand All @@ -18,7 +18,6 @@ const (
notSupported = "not supported response format"
acceptHeader = "Accept"
contentType = "Content-Type"
errorRestart = "could not restart workers"
acceptAll = "*/*"
)

Expand Down Expand Up @@ -51,7 +50,9 @@ func (c *Client) Start() error {
channel := makeConsumerChannel(forwarder.Name())
c.consumers[forwarder.Name()] = channel
go consumer.Start(forwarder, channel.check, channel.stop)
log.Printf("Started consumer:%s with forwader:%s", consumer.Name(), forwarder.Name())
log.WithFields(log.Fields{
"consumerName": consumer.Name(),
"forwarderName": forwarder.Name()}).Info("Started consumer with forwarder")
}
return nil
}
Expand All @@ -61,8 +62,9 @@ func (c *Client) Check(w http.ResponseWriter, r *http.Request) {
if accept := r.Header.Get(acceptHeader); accept != "" &&
!strings.Contains(accept, jsonType) &&
!strings.Contains(accept, acceptAll) {
log.Print("Wrong Accept header: ", accept)
notAccpetableResponse(w)
log.WithFields(log.Fields{
"acceptHeader": accept}).Warn("Wrong Accept header")
notAcceptableResponse(w)
return
}
stopped := 0
Expand All @@ -89,7 +91,7 @@ func (c *Client) Check(w http.ResponseWriter, r *http.Request) {
func (c *Client) Restart(w http.ResponseWriter, r *http.Request) {
c.stop()
if err := c.Start(); err != nil {
log.Print(err)
log.Error(err)
errorResponse(w, "")
return
}
Expand All @@ -114,12 +116,12 @@ func errorResponse(w http.ResponseWriter, message string) {
w.Write([]byte(message))
}

func notAccpetableResponse(w http.ResponseWriter) {
func notAcceptableResponse(w http.ResponseWriter) {
w.Header().Set(contentType, jsonType)
w.WriteHeader(406)
bytes, err := json.Marshal(response{Healthy: false, Message: notSupported})
if err != nil {
log.Print(err)
log.Error(err)
w.WriteHeader(500)
return
}
Expand All @@ -131,7 +133,7 @@ func successResponse(w http.ResponseWriter) {
w.WriteHeader(200)
bytes, err := json.Marshal(response{Healthy: true, Message: success})
if err != nil {
log.Print(err)
log.Error(err)
w.WriteHeader(200)
return
}
Expand Down
Loading

0 comments on commit 3833909

Please sign in to comment.