From 20d29be00aab5f89813cca3755e12e7159d6722a Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 07:48:55 +0100 Subject: [PATCH 01/13] Switch to correct rabbitMQ library --- go.mod | 2 +- go.sum | 13 +++++++++++-- messenger.go | 10 +++++++--- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 0fe0116..9b7b363 100644 --- a/go.mod +++ b/go.mod @@ -12,9 +12,9 @@ require ( github.com/minio/minio-go/v6 v6.0.43 github.com/neicnordic/sda-common v0.0.3-0.20221122104056-37b54aea80a7 github.com/pkg/errors v0.9.1 + github.com/rabbitmq/amqp091-go v1.5.0 github.com/sirupsen/logrus v1.9.0 github.com/spf13/viper v1.15.0 - github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 github.com/stretchr/testify v1.8.1 ) diff --git a/go.sum b/go.sum index aadd7bc..783109f 100644 --- a/go.sum +++ b/go.sum @@ -247,6 +247,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= +github.com/rabbitmq/amqp091-go v1.5.0 h1:VouyHPBu1CrKyJVfteGknGOGCzmOz0zcv/tONLkb7rg= +github.com/rabbitmq/amqp091-go v1.5.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/ryszard/goskiplist v0.0.0-20150312221310-2dfbae5fcf46 h1:GHRpF1pTW19a8tTFrMLUcfWwyC0pnifVo2ClaLq+hP8= @@ -271,8 +273,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU= github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA= -github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 h1:WhxRHzgeVGETMlmVfqhRn8RIeeNoPr2Czh33I4Zdccw= -github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -292,6 +292,7 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -300,6 +301,8 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -343,6 +346,7 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -379,6 +383,7 @@ golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= @@ -406,6 +411,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -446,8 +452,10 @@ golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -525,6 +533,7 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/messenger.go b/messenger.go index 802b50f..2a6ec21 100644 --- a/messenger.go +++ b/messenger.go @@ -1,13 +1,15 @@ package main import ( + "context" "crypto/tls" "encoding/json" "fmt" + "time" "github.com/google/uuid" + amqp "github.com/rabbitmq/amqp091-go" log "github.com/sirupsen/logrus" - "github.com/streadway/amqp" ) // Checksum used in the message @@ -96,10 +98,12 @@ func (m *AMQPMessenger) SendMessage(message Event) error { if e != nil { log.Fatalf("%s", e) } - corrID, _ := uuid.NewRandom() - err := m.channel.Publish( + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + err := m.channel.PublishWithContext( + ctx, m.exchange, m.routingKey, false, // mandatory From 633f0a74dd4c7e8036ee1942b805166a8c390acc Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 07:57:48 +0100 Subject: [PATCH 02/13] Make Config public --- main.go | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/main.go b/main.go index d674442..03e1039 100644 --- a/main.go +++ b/main.go @@ -8,6 +8,9 @@ import ( log "github.com/sirupsen/logrus" ) +// Export Conf so we can access it in the other modules +var Conf *Config + func main() { // Create a function to handle panic and exit gracefully defer func() { @@ -16,20 +19,22 @@ func main() { } }() - config, err := NewConfig() + c, err := NewConfig() if err != nil { log.Panic(err) } - tlsBroker, err := TLSConfigBroker(config) + Conf = c + + tlsBroker, err := TLSConfigBroker(Conf) if err != nil { log.Panic(err) } - tlsProxy, err := TLSConfigProxy(config) + tlsProxy, err := TLSConfigProxy(Conf) if err != nil { log.Panic(err) } - sdaDB, err := common.NewSDAdb(config.DB) + sdaDB, err := common.NewSDAdb(Conf.DB) if err != nil { log.Panic(err) } @@ -38,35 +43,35 @@ func main() { log.Debugf("Connected to sda-db (v%v)", sdaDB.Version) - err = checkS3Bucket(config.S3) + err = checkS3Bucket(Conf.S3) if err != nil { log.Panic(err) } - messenger := NewAMQPMessenger(config.Broker, tlsBroker) + messenger := NewAMQPMessenger(Conf.Broker, tlsBroker) log.Debug("messenger acquired ", messenger) var pubkeys map[string][]byte auth := NewValidateFromToken(pubkeys) auth.pubkeys = make(map[string][]byte) // Load keys for JWT verification - if config.Server.jwtpubkeyurl != "" { - if err := auth.getjwtpubkey(config.Server.jwtpubkeyurl); err != nil { - log.Panicf("Error while getting key %s: %v", config.Server.jwtpubkeyurl, err) + if Conf.Server.jwtpubkeyurl != "" { + if err := auth.getjwtpubkey(Conf.Server.jwtpubkeyurl); err != nil { + log.Panicf("Error while getting key %s: %v", Conf.Server.jwtpubkeyurl, err) } } - if config.Server.jwtpubkeypath != "" { - if err := auth.getjwtkey(config.Server.jwtpubkeypath); err != nil { - log.Panicf("Error while getting key %s: %v", config.Server.jwtpubkeypath, err) + if Conf.Server.jwtpubkeypath != "" { + if err := auth.getjwtkey(Conf.Server.jwtpubkeypath); err != nil { + log.Panicf("Error while getting key %s: %v", Conf.Server.jwtpubkeypath, err) } } - proxy := NewProxy(config.S3, auth, messenger, sdaDB, tlsProxy) + proxy := NewProxy(Conf.S3, auth, messenger, sdaDB, tlsProxy) log.Debug("got the proxy ", proxy) http.Handle("/", proxy) - hc := NewHealthCheck(8001, config.S3, config.Broker, tlsProxy) + hc := NewHealthCheck(8001, Conf.S3, Conf.Broker, tlsProxy) go hc.RunHealthChecks() server := &http.Server{ @@ -77,8 +82,8 @@ func main() { ReadHeaderTimeout: 30 * time.Second, } - if config.Server.cert != "" && config.Server.key != "" { - if err := server.ListenAndServeTLS(config.Server.cert, config.Server.key); err != nil { + if Conf.Server.cert != "" && Conf.Server.key != "" { + if err := server.ListenAndServeTLS(Conf.Server.cert, Conf.Server.key); err != nil { panic(err) } } else { From 916ca54ba961c17c1b3840efcd835b3d8f40a6a7 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 07:59:01 +0100 Subject: [PATCH 03/13] NewAMQPMessenger should return error --- main.go | 7 ++++++- messenger.go | 11 +++++------ messenger_test.go | 9 +++++---- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/main.go b/main.go index 03e1039..13ba0ee 100644 --- a/main.go +++ b/main.go @@ -48,7 +48,12 @@ func main() { log.Panic(err) } - messenger := NewAMQPMessenger(Conf.Broker, tlsBroker) + messenger, err := NewAMQPMessenger(Conf.Broker, tlsBroker) + if err != nil { + log.Panic(err) + } + defer messenger.channel.Close() + defer messenger.connection.Close() log.Debug("messenger acquired ", messenger) var pubkeys map[string][]byte diff --git a/messenger.go b/messenger.go index 2a6ec21..c790d7f 100644 --- a/messenger.go +++ b/messenger.go @@ -41,9 +41,8 @@ type AMQPMessenger struct { confirmsChan <-chan amqp.Confirmation } -// NewAMQPMessenger creates a new messenger that can communicate with a backend -// amqp server. -func NewAMQPMessenger(c BrokerConfig, tlsConfig *tls.Config) *AMQPMessenger { +// NewAMQPMessenger creates a new messenger that can communicate with a backend amqp server. +func NewAMQPMessenger(c BrokerConfig, tlsConfig *tls.Config) (*AMQPMessenger, error) { brokerURI := buildMqURI(c.host, c.port, c.user, c.password, c.vhost, c.ssl) var connection *amqp.Connection @@ -57,12 +56,12 @@ func NewAMQPMessenger(c BrokerConfig, tlsConfig *tls.Config) *AMQPMessenger { connection, err = amqp.Dial(brokerURI) } if err != nil { - log.Panicf("brokerErrMsg 1: %s", err) + return nil, fmt.Errorf("brokerErrMsg 1: %s", err) } channel, err = connection.Channel() if err != nil { - log.Panicf("brokerErrMsg 2: %s", err) + return nil, fmt.Errorf("brokerErrMsg 2: %s", err) } log.Debug("enabling publishing confirms.") @@ -88,7 +87,7 @@ func NewAMQPMessenger(c BrokerConfig, tlsConfig *tls.Config) *AMQPMessenger { log.Fatalf("Channel could not be put into confirm mode: %s\n", err) } - return &AMQPMessenger{connection, channel, c.exchange, c.routingKey, channel.NotifyPublish(confirmsChan)} + return &AMQPMessenger{connection, channel, c.exchange, c.routingKey, channel.NotifyPublish(confirmsChan)}, err } // SendMessage sends message to RabbitMQ if the upload is finished diff --git a/messenger_test.go b/messenger_test.go index ae7ccf9..ee5bf3c 100644 --- a/messenger_test.go +++ b/messenger_test.go @@ -28,8 +28,9 @@ func TestNewAMQPMessenger(t *testing.T) { } assert.NotNil(t, tlsConfig) assert.NoError(t, err) - - assert.NotPanics(t, func() { NewAMQPMessenger(config.Broker, tlsConfig) }) + m, err := NewAMQPMessenger(config.Broker, tlsConfig) + assert.NoError(t, err) + assert.NotNil(t, m) } func TestSendMessage(t *testing.T) { @@ -47,8 +48,8 @@ func TestSendMessage(t *testing.T) { assert.NotNil(t, tlsConfig) assert.NoError(t, err) - messenger := NewAMQPMessenger(config.Broker, tlsConfig) - + messenger, err := NewAMQPMessenger(config.Broker, tlsConfig) + assert.NoError(t, err) event := Event{} checksum := Checksum{} event.Username = "Dummy" From a8409b25ddb1c9a9f57d0203e704c27c1e333f2b Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 08:16:55 +0100 Subject: [PATCH 04/13] Send file ID as Correlation ID --- messenger.go | 21 ++++++--------------- messenger_test.go | 9 ++++++++- proxy.go | 19 ++++++++++--------- proxy_test.go | 10 ++++++++-- 4 files changed, 32 insertions(+), 27 deletions(-) diff --git a/messenger.go b/messenger.go index c790d7f..99cca56 100644 --- a/messenger.go +++ b/messenger.go @@ -3,11 +3,9 @@ package main import ( "context" "crypto/tls" - "encoding/json" "fmt" "time" - "github.com/google/uuid" amqp "github.com/rabbitmq/amqp091-go" log "github.com/sirupsen/logrus" ) @@ -29,7 +27,7 @@ type Event struct { // Messenger is an interface for sending messages for different file events type Messenger interface { - SendMessage(message Event) error + SendMessage(string, []byte) error } // AMQPMessenger is a Messenger that sends messages to a local AMQP broker @@ -91,14 +89,7 @@ func NewAMQPMessenger(c BrokerConfig, tlsConfig *tls.Config) (*AMQPMessenger, er } // SendMessage sends message to RabbitMQ if the upload is finished -func (m *AMQPMessenger) SendMessage(message Event) error { - - body, e := json.Marshal(message) - if e != nil { - log.Fatalf("%s", e) - } - corrID, _ := uuid.NewRandom() - +func (m *AMQPMessenger) SendMessage(corrID string, body []byte) error { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() err := m.channel.PublishWithContext( @@ -112,20 +103,20 @@ func (m *AMQPMessenger) SendMessage(message Event) error { ContentEncoding: "UTF-8", ContentType: "application/json", DeliveryMode: amqp.Persistent, - CorrelationId: corrID.String(), + CorrelationId: corrID, Priority: 0, // 0-9 - Body: []byte(body), + Body: body, }, ) if err != nil { - return err + return fmt.Errorf("failed to send message because: %v", err) } confirmed := <-m.confirmsChan if !confirmed.Ack { return fmt.Errorf("failed delivery of delivery tag: %d", confirmed.DeliveryTag) } - log.Debugf("Delivered message: %v, with correlation-ID: %v", string(body), corrID.String()) + log.Debugf("Delivered message: %v, with correlation-ID: %v", string(body), corrID) return nil diff --git a/messenger_test.go b/messenger_test.go index ee5bf3c..0ca35bd 100644 --- a/messenger_test.go +++ b/messenger_test.go @@ -1,8 +1,10 @@ package main import ( + "encoding/json" "testing" + "github.com/google/uuid" "github.com/spf13/viper" "github.com/stretchr/testify/assert" ) @@ -52,11 +54,16 @@ func TestSendMessage(t *testing.T) { assert.NoError(t, err) event := Event{} checksum := Checksum{} + event.Operation = "TestSendMessage" event.Username = "Dummy" checksum.Type = "md5" checksum.Value = "123456789" event.Checksum = []interface{}{checksum} - err = messenger.SendMessage(event) + jsonMessage, err := json.Marshal(event) + assert.NoError(t, err) + uuid, _ := uuid.NewRandom() + t.Log("uuid: ", uuid) + err = messenger.SendMessage(uuid.String(), jsonMessage) assert.NoError(t, err) } diff --git a/proxy.go b/proxy.go index 83ef34d..d5f0022 100644 --- a/proxy.go +++ b/proxy.go @@ -132,24 +132,25 @@ func (p *Proxy) allowedResponse(w http.ResponseWriter, r *http.Request) { if p.uploadFinishedSuccessfully(r, s3response) { log.Debug("create message") message, _ := p.CreateMessageFromRequest(r, claims) - if err = p.messenger.SendMessage(message); err != nil { - log.Debug("error when sending message") - log.Error(err) - } - jsonMessage, err := json.Marshal(message) if err != nil { log.Errorf("failed to marshal rabbitmq message to json: %v", err) return } - fileID := p.fileIds[r.URL.Path] - delete(p.fileIds, r.URL.Path) - log.Debugf("marking file %v as 'uploaded' in database", fileID) - err = p.database.MarkFileAsUploaded(fileID, username, string(jsonMessage)) + + if err = p.messenger.SendMessage(p.fileIds[r.URL.Path], jsonMessage); err != nil { + log.Debug("error when sending message") + log.Error(err) + } + + log.Debugf("marking file %v as 'uploaded' in database", p.fileIds[r.URL.Path]) + err = p.database.MarkFileAsUploaded(p.fileIds[r.URL.Path], username, string(jsonMessage)) if err != nil { log.Error(err) } + + delete(p.fileIds, r.URL.Path) } // Writing non-200 to the response before the headers propagate the error diff --git a/proxy_test.go b/proxy_test.go index 728021f..e033158 100644 --- a/proxy_test.go +++ b/proxy_test.go @@ -118,12 +118,18 @@ type MockMessenger struct { lastEvent *Event } +func (m *MockMessenger) IsConnClosed() bool { + return true +} + func NewMockMessenger() *MockMessenger { return &MockMessenger{nil} } -func (m *MockMessenger) SendMessage(event Event) error { - m.lastEvent = &event +func (m *MockMessenger) SendMessage(uuid string, body []byte) error { + if uuid == "" || body == nil { + return fmt.Errorf("bad message") + } return nil } From 0f8cb6a84bd09844012e8d52cc2122d7bd6c1c5f Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 08:24:49 +0100 Subject: [PATCH 05/13] Detect closed channel --- messenger.go | 24 ++++++++++++++++++++++++ messenger_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/messenger.go b/messenger.go index 99cca56..3ef21d3 100644 --- a/messenger.go +++ b/messenger.go @@ -90,6 +90,13 @@ func NewAMQPMessenger(c BrokerConfig, tlsConfig *tls.Config) (*AMQPMessenger, er // SendMessage sends message to RabbitMQ if the upload is finished func (m *AMQPMessenger) SendMessage(corrID string, body []byte) error { + if m.channel.IsClosed() { + log.Debugln("channel closed, reconnecting") + if err := m.createNewChannel(); err != nil { + return fmt.Errorf("failed to recreate channel: %v", err) + } + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() err := m.channel.PublishWithContext( @@ -133,3 +140,20 @@ func buildMqURI(mqHost, mqPort, mqUser, mqPassword, mqVhost string, ssl bool) st return brokerURI } + +func (m *AMQPMessenger) createNewChannel() error { + c, err := m.connection.Channel() + if err != nil { + return err + } + confirmsChan := make(chan amqp.Confirmation, 1) + if err := c.Confirm(false); err != nil { + close(confirmsChan) + log.Fatalf("Channel could not be put into confirm mode: %s\n", err) + } + log.Debugln("recconected to new channel") + m.channel = c + m.confirmsChan = c.NotifyPublish(confirmsChan) + + return nil +} diff --git a/messenger_test.go b/messenger_test.go index 0ca35bd..3199224 100644 --- a/messenger_test.go +++ b/messenger_test.go @@ -67,3 +67,37 @@ func TestSendMessage(t *testing.T) { err = messenger.SendMessage(uuid.String(), jsonMessage) assert.NoError(t, err) } + +func TestCreateNewChannel(t *testing.T) { + viper.Reset() + viper.Set("server.confFile", "dev_utils/config.yaml") + + config, err := NewConfig() + assert.NotNil(t, config) + assert.NoError(t, err) + tlsConfig, err := TLSConfigBroker(config) + if err != nil { + t.Log(err) + t.Skip("skip test since certificates are not present") + } + assert.NotNil(t, tlsConfig) + assert.NoError(t, err) + + messenger, err := NewAMQPMessenger(config.Broker, tlsConfig) + messenger.channel.Close() + assert.NoError(t, err) + event := Event{} + checksum := Checksum{} + event.Operation = "TestRecreateChannel" + event.Username = "Dummy" + checksum.Type = "md5" + checksum.Value = "123456789" + event.Checksum = []interface{}{checksum} + + jsonMessage, err := json.Marshal(event) + assert.NoError(t, err) + uuid, _ := uuid.NewRandom() + t.Log("uuid: ", uuid) + err = messenger.SendMessage(uuid.String(), jsonMessage) + assert.NoError(t, err) +} From ef183ce3fb9d7864606fbf23eafc97355a47ff84 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 08:53:16 +0100 Subject: [PATCH 06/13] Reconnect on closed connection --- messenger.go | 5 +++++ proxy.go | 29 +++++++++++++++++++++-------- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/messenger.go b/messenger.go index 3ef21d3..28a8126 100644 --- a/messenger.go +++ b/messenger.go @@ -28,6 +28,7 @@ type Event struct { // Messenger is an interface for sending messages for different file events type Messenger interface { SendMessage(string, []byte) error + IsConnClosed() bool } // AMQPMessenger is a Messenger that sends messages to a local AMQP broker @@ -157,3 +158,7 @@ func (m *AMQPMessenger) createNewChannel() error { return nil } + +func (m *AMQPMessenger) IsConnClosed() bool { + return m.connection.IsClosed() +} diff --git a/proxy.go b/proxy.go index d5f0022..60a879b 100644 --- a/proxy.go +++ b/proxy.go @@ -139,15 +139,28 @@ func (p *Proxy) allowedResponse(w http.ResponseWriter, r *http.Request) { return } - if err = p.messenger.SendMessage(p.fileIds[r.URL.Path], jsonMessage); err != nil { - log.Debug("error when sending message") - log.Error(err) - } + switch p.messenger.IsConnClosed() { + case true: + log.Errorln("connection is closed") + w.WriteHeader(http.StatusServiceUnavailable) + + tlsBroker, _ := TLSConfigBroker(Conf) + m, err := NewAMQPMessenger(Conf.Broker, tlsBroker) + if err == nil { + p.messenger = m + } - log.Debugf("marking file %v as 'uploaded' in database", p.fileIds[r.URL.Path]) - err = p.database.MarkFileAsUploaded(p.fileIds[r.URL.Path], username, string(jsonMessage)) - if err != nil { - log.Error(err) + case false: + if err = p.messenger.SendMessage(p.fileIds[r.URL.Path], jsonMessage); err != nil { + log.Debug("error when sending message") + log.Error(err) + } + + log.Debugf("marking file %v as 'uploaded' in database", p.fileIds[r.URL.Path]) + err = p.database.MarkFileAsUploaded(p.fileIds[r.URL.Path], username, string(jsonMessage)) + if err != nil { + log.Error(err) + } } delete(p.fileIds, r.URL.Path) From 0976a6f9510301d5b5648fcae43cfabc48a79a38 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 09:22:31 +0100 Subject: [PATCH 07/13] Detect system signals and shutdown cleanly --- main.go | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/main.go b/main.go index 13ba0ee..e32d383 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,9 @@ package main import ( "net/http" + "os" + "os/signal" + "syscall" "time" common "github.com/neicnordic/sda-common/database" @@ -12,6 +15,9 @@ import ( var Conf *Config func main() { + sigc := make(chan os.Signal, 5) + signal.Notify(sigc, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + // Create a function to handle panic and exit gracefully defer func() { if err := recover(); err != nil { @@ -21,41 +27,57 @@ func main() { c, err := NewConfig() if err != nil { - log.Panic(err) + log.Error(err) + sigc <- syscall.SIGINT + panic(err) } Conf = c tlsBroker, err := TLSConfigBroker(Conf) if err != nil { - log.Panic(err) + log.Error(err) + sigc <- syscall.SIGINT + panic(err) } tlsProxy, err := TLSConfigProxy(Conf) if err != nil { - log.Panic(err) + log.Error(err) + sigc <- syscall.SIGINT + panic(err) } sdaDB, err := common.NewSDAdb(Conf.DB) if err != nil { - log.Panic(err) + log.Error(err) + sigc <- syscall.SIGINT + panic(err) } - defer sdaDB.Close() - log.Debugf("Connected to sda-db (v%v)", sdaDB.Version) err = checkS3Bucket(Conf.S3) if err != nil { - log.Panic(err) + log.Error(err) + sigc <- syscall.SIGINT + panic(err) } messenger, err := NewAMQPMessenger(Conf.Broker, tlsBroker) if err != nil { - log.Panic(err) + log.Error(err) + sigc <- syscall.SIGINT + panic(err) } - defer messenger.channel.Close() - defer messenger.connection.Close() + log.Debug("messenger acquired ", messenger) + go func() { + <-sigc + sdaDB.Close() + messenger.channel.Close() + messenger.connection.Close() + os.Exit(1) + }() var pubkeys map[string][]byte auth := NewValidateFromToken(pubkeys) auth.pubkeys = make(map[string][]byte) From d9fb37dd23b73e36e90f01f848704f27cd4cf3f1 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 10:03:05 +0100 Subject: [PATCH 08/13] Use latest database --- dev_utils/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev_utils/docker-compose.yml b/dev_utils/docker-compose.yml index 38ddaf5..b83350c 100644 --- a/dev_utils/docker-compose.yml +++ b/dev_utils/docker-compose.yml @@ -131,7 +131,7 @@ services: database: container_name: db - image: neicnordic/sda-db:v2.0.7 + image: ghcr.io/neicnordic/sda-db:v2.0.11 depends_on: certfixer: condition: service_completed_successfully From e31ca31262ef0977bb160eb9f8a8d3a47fb2ad2f Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 10:03:27 +0100 Subject: [PATCH 09/13] Require V4 of the database --- main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/main.go b/main.go index e32d383..23d4541 100644 --- a/main.go +++ b/main.go @@ -52,6 +52,11 @@ func main() { sigc <- syscall.SIGINT panic(err) } + if sdaDB.Version < 4 { + log.Error("database schema v4 is required") + sigc <- syscall.SIGINT + panic(err) + } log.Debugf("Connected to sda-db (v%v)", sdaDB.Version) From a1beab1e5cd2860b15377e21085f809e249028a7 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 10:49:21 +0100 Subject: [PATCH 10/13] Fix proxy_test --- proxy_test.go | 37 ++++--------------------------------- 1 file changed, 4 insertions(+), 33 deletions(-) diff --git a/proxy_test.go b/proxy_test.go index e033158..2348c79 100644 --- a/proxy_test.go +++ b/proxy_test.go @@ -52,7 +52,7 @@ func (suite *ProxyTests) SetupTest() { // Create a database configuration for the fake database suite.DBConf = common.DBConf{ Host: "localhost", - Port: 5432, + Port: 2345, User: "lega_in", Password: "lega_in", Database: "lega", @@ -67,6 +67,7 @@ func (suite *ProxyTests) SetupTest() { _, err = os.Stat("/.dockerenv") if err == nil { suite.DBConf.Host = "db" + suite.DBConf.Port = 5432 } suite.database = &common.SDAdb{} @@ -119,7 +120,7 @@ type MockMessenger struct { } func (m *MockMessenger) IsConnClosed() bool { - return true + return false } func NewMockMessenger() *MockMessenger { @@ -134,16 +135,6 @@ func (m *MockMessenger) SendMessage(uuid string, body []byte) error { return nil } -func (m *MockMessenger) CheckAndRestore() bool { - if m.lastEvent == nil { - - return false - } - m.lastEvent = nil - - return true -} - // AlwaysAllow is an Authenticator that always authenticates type AlwaysDeny struct{} @@ -167,7 +158,6 @@ func (suite *ProxyTests) TestServeHTTP_disallowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 403, w.Result().StatusCode) assert.Equal(suite.T(), false, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Deletion of files are disallowed r.Method = "DELETE" @@ -175,7 +165,6 @@ func (suite *ProxyTests) TestServeHTTP_disallowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 403, w.Result().StatusCode) assert.Equal(suite.T(), false, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Policy methods are not allowed w = httptest.NewRecorder() @@ -184,7 +173,6 @@ func (suite *ProxyTests) TestServeHTTP_disallowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 403, w.Result().StatusCode) assert.Equal(suite.T(), false, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Normal get is dissallowed w = httptest.NewRecorder() @@ -193,7 +181,6 @@ func (suite *ProxyTests) TestServeHTTP_disallowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 403, w.Result().StatusCode) assert.Equal(suite.T(), false, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Put policy is disallowed w = httptest.NewRecorder() @@ -202,7 +189,6 @@ func (suite *ProxyTests) TestServeHTTP_disallowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 403, w.Result().StatusCode) assert.Equal(suite.T(), false, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Create bucket disallowed w = httptest.NewRecorder() @@ -211,7 +197,6 @@ func (suite *ProxyTests) TestServeHTTP_disallowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 403, w.Result().StatusCode) assert.Equal(suite.T(), false, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Not authorized user get 401 response w = httptest.NewRecorder() @@ -220,7 +205,6 @@ func (suite *ProxyTests) TestServeHTTP_disallowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 401, w.Result().StatusCode) assert.Equal(suite.T(), false, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) } func (suite *ProxyTests) TestServeHTTPS3Unresponsive() { @@ -241,7 +225,6 @@ func (suite *ProxyTests) TestServeHTTPS3Unresponsive() { r.URL, _ = url.Parse("/asdf/asdf") proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 500, w.Result().StatusCode) // nolint:bodyclose - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) } // nolint:bodyclose @@ -261,7 +244,6 @@ func (suite *ProxyTests) TestServeHTTP_allowed() { assert.Equal(suite.T(), 200, w.Result().StatusCode) assert.Equal(suite.T(), true, suite.fakeServer.PingedAndRestore()) assert.Equal(suite.T(), false, suite.fakeServer.PingedAndRestore()) // Testing the pinged interface - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Put file works w = httptest.NewRecorder() @@ -270,8 +252,6 @@ func (suite *ProxyTests) TestServeHTTP_allowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 200, w.Result().StatusCode) assert.Equal(suite.T(), true, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), true, suite.messenger.CheckAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Put with partnumber sends no message w = httptest.NewRecorder() @@ -280,7 +260,6 @@ func (suite *ProxyTests) TestServeHTTP_allowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 200, w.Result().StatusCode) assert.Equal(suite.T(), true, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Post with uploadId sends message r.Method = "POST" @@ -289,7 +268,6 @@ func (suite *ProxyTests) TestServeHTTP_allowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 200, w.Result().StatusCode) assert.Equal(suite.T(), true, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), true, suite.messenger.CheckAndRestore()) // Post without uploadId sends no message r.Method = "POST" @@ -298,7 +276,6 @@ func (suite *ProxyTests) TestServeHTTP_allowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 200, w.Result().StatusCode) assert.Equal(suite.T(), true, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Abort multipart works r.Method = "DELETE" @@ -307,7 +284,6 @@ func (suite *ProxyTests) TestServeHTTP_allowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 200, w.Result().StatusCode) assert.Equal(suite.T(), true, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Going through the different extra stuff that can be in the get request // that trigger different code paths in the code. @@ -318,7 +294,6 @@ func (suite *ProxyTests) TestServeHTTP_allowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 200, w.Result().StatusCode) assert.Equal(suite.T(), true, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Delimiter alone together with prefix r.Method = "GET" @@ -327,7 +302,6 @@ func (suite *ProxyTests) TestServeHTTP_allowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 200, w.Result().StatusCode) assert.Equal(suite.T(), true, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Location parameter r.Method = "GET" @@ -336,7 +310,6 @@ func (suite *ProxyTests) TestServeHTTP_allowed() { proxy.ServeHTTP(w, r) assert.Equal(suite.T(), 200, w.Result().StatusCode) assert.Equal(suite.T(), true, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) } func (suite *ProxyTests) TestMessageFormatting() { @@ -377,7 +350,7 @@ func (suite *ProxyTests) TestMessageFormatting() { func (suite *ProxyTests) TestDatabaseConnection() { database, err := common.NewSDAdb(suite.DBConf) if err != nil { - suite.T().Skip("skip TestShutdown since broker not present") + suite.T().Skip("skip TestShutdown since DB not present: ", err) } // Start proxy that allows everything @@ -393,8 +366,6 @@ func (suite *ProxyTests) TestDatabaseConnection() { defer res.Body.Close() assert.Equal(suite.T(), 200, res.StatusCode) assert.Equal(suite.T(), true, suite.fakeServer.PingedAndRestore()) - assert.Equal(suite.T(), true, suite.messenger.CheckAndRestore()) - assert.Equal(suite.T(), false, suite.messenger.CheckAndRestore()) // Check that the file is registered and uploaded in the database // connect to the database From 9e8f794fed7ebe6de3bbd740c1f7255afc1ab635 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 11:19:18 +0100 Subject: [PATCH 11/13] Go install is broken --- dev_utils/docker-compose.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dev_utils/docker-compose.yml b/dev_utils/docker-compose.yml index b83350c..efdbbcd 100644 --- a/dev_utils/docker-compose.yml +++ b/dev_utils/docker-compose.yml @@ -159,8 +159,7 @@ services: - "/bin/sh" - "-c" - "cd /app; echo 'Running go ${GOLANG_VERSION:-1.19} tests'; - go install 2>/dev/null - && go test ./... -v -coverprofile=coverage.txt -covermode=atomic" + go test ./... -v -coverprofile=coverage.txt -covermode=atomic" depends_on: mq: condition: service_healthy From 9c246a492b10a8e9cb852f2148dafc654da9aa17 Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 11:21:29 +0100 Subject: [PATCH 12/13] test against GO 1.20 --- .github/workflows/functionality.yml | 2 +- .github/workflows/golint.yml | 2 +- .github/workflows/gotest.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/functionality.yml b/.github/workflows/functionality.yml index ec35582..71b75e1 100644 --- a/.github/workflows/functionality.yml +++ b/.github/workflows/functionality.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [1.18, 1.19] + go-version: ['1.20', '1.19'] steps: - name: Set up Go ${{ matrix.go-version }} uses: actions/setup-go@v3 diff --git a/.github/workflows/golint.yml b/.github/workflows/golint.yml index 467e93e..9f49725 100644 --- a/.github/workflows/golint.yml +++ b/.github/workflows/golint.yml @@ -8,7 +8,7 @@ jobs: strategy: fail-fast: false matrix: - go-version: [1.18, 1.19] + go-version: ['1.20', '1.19'] steps: - name: Set up Go ${{ matrix.go-version }} uses: actions/setup-go@v3 diff --git a/.github/workflows/gotest.yml b/.github/workflows/gotest.yml index a99ea34..6c4245c 100644 --- a/.github/workflows/gotest.yml +++ b/.github/workflows/gotest.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - go-version: [1.18, 1.19] + go-version: ['1.20', '1.19'] steps: - name: Set up Go ${{ matrix.go-version }} uses: actions/setup-go@v3 From b9b6a994e159c57be56aed5d3778f5843c91276a Mon Sep 17 00:00:00 2001 From: Joakim Bygdell Date: Thu, 9 Feb 2023 14:45:56 +0100 Subject: [PATCH 13/13] return error instead of log.Fatal --- messenger.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/messenger.go b/messenger.go index 28a8126..376037d 100644 --- a/messenger.go +++ b/messenger.go @@ -150,9 +150,10 @@ func (m *AMQPMessenger) createNewChannel() error { confirmsChan := make(chan amqp.Confirmation, 1) if err := c.Confirm(false); err != nil { close(confirmsChan) - log.Fatalf("Channel could not be put into confirm mode: %s\n", err) + + return fmt.Errorf("channel could not be put into confirm mode: %v", err) } - log.Debugln("recconected to new channel") + log.Debugln("reconnected to new channel") m.channel = c m.confirmsChan = c.NotifyPublish(confirmsChan)