From 51196c3a1184bc4ae317285aa4c02318837762db Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 22 May 2019 17:17:35 -0400 Subject: [PATCH 01/26] Make WrapHandlerWithListeners a variadic function --- ddlambda.go | 2 +- internal/trace/wrap_handler.go | 12 ++++++++---- internal/trace/wrap_handler_test.go | 4 ++-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/ddlambda.go b/ddlambda.go index 0cf94c76..100ebeda 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -11,7 +11,7 @@ import ( // It returns a modified handler that can be passed directly to the lambda.Start function. func WrapHandler(handler interface{}) interface{} { hl := trace.Listener{} - return trace.WrapHandlerWithListener(handler, &hl) + return trace.WrapHandlerWithListeners(handler, &hl) } // GetTraceHeaders reads a map containing the DataDog trace headers from a context object. diff --git a/internal/trace/wrap_handler.go b/internal/trace/wrap_handler.go index a616c47a..294aaa53 100644 --- a/internal/trace/wrap_handler.go +++ b/internal/trace/wrap_handler.go @@ -15,8 +15,8 @@ type ( } ) -// WrapHandlerWithListener wraps a lambda handler to capture context and adds the DataDog tracing context. -func WrapHandlerWithListener(handler interface{}, hl HandlerListener) interface{} { +// WrapHandlerWithListeners wraps a lambda handler to capture context and adds the DataDog tracing context. +func WrapHandlerWithListeners(handler interface{}, listeners ...HandlerListener) interface{} { err := validateHandler(handler) if err != nil { @@ -25,9 +25,13 @@ func WrapHandlerWithListener(handler interface{}, hl HandlerListener) interface{ } return func(ctx context.Context, msg json.RawMessage) (interface{}, error) { - ctx = hl.HandlerStarted(ctx, msg) + for _, listener := range listeners { + ctx = listener.HandlerStarted(ctx, msg) + } result, err := callHandler(ctx, msg, handler) - hl.HandlerFinished(ctx) + for _, listener := range listeners { + listener.HandlerFinished(ctx) + } return result, err } } diff --git a/internal/trace/wrap_handler_test.go b/internal/trace/wrap_handler_test.go index 43e566ac..df49f9f4 100644 --- a/internal/trace/wrap_handler_test.go +++ b/internal/trace/wrap_handler_test.go @@ -41,7 +41,7 @@ func runHandlerWithJSON(t *testing.T, filename string, handler interface{}) (*mo mhl := mockHandlerListener{} - wrappedHandler := WrapHandlerWithListener(handler, &mhl).(func(context.Context, json.RawMessage) (interface{}, error)) + wrappedHandler := WrapHandlerWithListeners(handler, &mhl).(func(context.Context, json.RawMessage) (interface{}, error)) response, err := wrappedHandler(ctx, *payload) return &mhl, response, err @@ -222,7 +222,7 @@ func TestWrapHandlerReturnsOriginalHandlerIfInvalid(t *testing.T) { } mhl := mockHandlerListener{} - wrappedHandler := WrapHandlerWithListener(handler, &mhl) + wrappedHandler := WrapHandlerWithListeners(handler, &mhl) assert.Equal(t, reflect.ValueOf(handler).Pointer(), reflect.ValueOf(wrappedHandler).Pointer()) From ab5aed30537b3e83aefcaf8baadfecb2cabfffc3 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 22 May 2019 17:21:28 -0400 Subject: [PATCH 02/26] Move wrapper into it's own package --- ddlambda.go | 3 ++- .../testdata/apig-event-metadata.json | 0 .../testdata/apig-event-no-metadata.json | 0 internal/{trace => }/testdata/invalid.json | 0 .../testdata/non-proxy-metadata.json | 0 .../testdata/non-proxy-no-metadata.json | 0 internal/trace/context_test.go | 26 ++++++++++++++----- internal/{trace => wrapper}/wrap_handler.go | 2 +- .../{trace => wrapper}/wrap_handler_test.go | 14 +++++----- 9 files changed, 29 insertions(+), 16 deletions(-) rename internal/{trace => }/testdata/apig-event-metadata.json (100%) rename internal/{trace => }/testdata/apig-event-no-metadata.json (100%) rename internal/{trace => }/testdata/invalid.json (100%) rename internal/{trace => }/testdata/non-proxy-metadata.json (100%) rename internal/{trace => }/testdata/non-proxy-no-metadata.json (100%) rename internal/{trace => wrapper}/wrap_handler.go (99%) rename internal/{trace => wrapper}/wrap_handler_test.go (89%) diff --git a/ddlambda.go b/ddlambda.go index 100ebeda..c020655a 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -5,13 +5,14 @@ import ( "net/http" "github.com/DataDog/dd-lambda-go/internal/trace" + "github.com/DataDog/dd-lambda-go/internal/wrapper" ) // WrapHandler is used to instrument your lambda functions, reading in context from API Gateway. // It returns a modified handler that can be passed directly to the lambda.Start function. func WrapHandler(handler interface{}) interface{} { hl := trace.Listener{} - return trace.WrapHandlerWithListeners(handler, &hl) + return wrapper.WrapHandlerWithListeners(handler, &hl) } // GetTraceHeaders reads a map containing the DataDog trace headers from a context object. diff --git a/internal/trace/testdata/apig-event-metadata.json b/internal/testdata/apig-event-metadata.json similarity index 100% rename from internal/trace/testdata/apig-event-metadata.json rename to internal/testdata/apig-event-metadata.json diff --git a/internal/trace/testdata/apig-event-no-metadata.json b/internal/testdata/apig-event-no-metadata.json similarity index 100% rename from internal/trace/testdata/apig-event-no-metadata.json rename to internal/testdata/apig-event-no-metadata.json diff --git a/internal/trace/testdata/invalid.json b/internal/testdata/invalid.json similarity index 100% rename from internal/trace/testdata/invalid.json rename to internal/testdata/invalid.json diff --git a/internal/trace/testdata/non-proxy-metadata.json b/internal/testdata/non-proxy-metadata.json similarity index 100% rename from internal/trace/testdata/non-proxy-metadata.json rename to internal/testdata/non-proxy-metadata.json diff --git a/internal/trace/testdata/non-proxy-no-metadata.json b/internal/testdata/non-proxy-no-metadata.json similarity index 100% rename from internal/trace/testdata/non-proxy-no-metadata.json rename to internal/testdata/non-proxy-no-metadata.json diff --git a/internal/trace/context_test.go b/internal/trace/context_test.go index e7baafa7..ed1e3c5f 100644 --- a/internal/trace/context_test.go +++ b/internal/trace/context_test.go @@ -2,6 +2,8 @@ package trace import ( "context" + "encoding/json" + "io/ioutil" "testing" "github.com/aws/aws-xray-sdk-go/header" @@ -27,8 +29,18 @@ func mockLambdaTraceContext(ctx context.Context, traceID, parentID string, sampl return context.WithValue(ctx, xray.LambdaTraceHeaderKey, headerString) } +func loadRawJSON(t *testing.T, filename string) *json.RawMessage { + bytes, err := ioutil.ReadFile(filename) + if err != nil { + assert.Fail(t, "Couldn't find JSON file") + return nil + } + msg := json.RawMessage{} + msg.UnmarshalJSON(bytes) + return &msg +} func TestUnmarshalEventForTraceMetadataNonProxyEvent(t *testing.T) { - ev := loadRawJSON(t, "testdata/apig-event-metadata.json") + ev := loadRawJSON(t, "../testdata/apig-event-metadata.json") headers, ok := unmarshalEventForTraceContext(*ev) assert.True(t, ok) @@ -42,14 +54,14 @@ func TestUnmarshalEventForTraceMetadataNonProxyEvent(t *testing.T) { } func TestUnmarshalEventForInvalidData(t *testing.T) { - ev := loadRawJSON(t, "testdata/invalid.json") + ev := loadRawJSON(t, "../testdata/invalid.json") _, ok := unmarshalEventForTraceContext(*ev) assert.False(t, ok) } func TestUnmarshalEventForMissingData(t *testing.T) { - ev := loadRawJSON(t, "testdata/non-proxy-no-metadata.json") + ev := loadRawJSON(t, "../testdata/non-proxy-no-metadata.json") _, ok := unmarshalEventForTraceContext(*ev) assert.False(t, ok) @@ -108,7 +120,7 @@ func TestXrayTraceContextWithSegment(t *testing.T) { } func TestExtractTraceContextFromContext(t *testing.T) { - ev := loadRawJSON(t, "testdata/apig-event-no-metadata.json") + ev := loadRawJSON(t, "../testdata/apig-event-no-metadata.json") ctx := mockLambdaTraceContext(context.Background(), "1-5ce31dc2-2c779014b90ce44db5e03875", "779014b90ce44db5e03875", true) newCTX, err := ExtractTraceContext(ctx, *ev) @@ -120,7 +132,7 @@ func TestExtractTraceContextFromContext(t *testing.T) { assert.NotNil(t, headers[parentIDHeader]) } func TestExtractTraceContextFromEvent(t *testing.T) { - ev := loadRawJSON(t, "testdata/apig-event-metadata.json") + ev := loadRawJSON(t, "../testdata/apig-event-metadata.json") ctx := mockLambdaTraceContext(context.Background(), "1-5ce31dc2-2c779014b90ce44db5e03875", "779014b90ce44db5e03875", true) newCTX, err := ExtractTraceContext(ctx, *ev) @@ -136,7 +148,7 @@ func TestExtractTraceContextFromEvent(t *testing.T) { } func TestExtractTraceContextFail(t *testing.T) { - ev := loadRawJSON(t, "testdata/apig-event-no-metadata.json") + ev := loadRawJSON(t, "../testdata/apig-event-no-metadata.json") ctx := context.Background() _, err := ExtractTraceContext(ctx, *ev) @@ -144,7 +156,7 @@ func TestExtractTraceContextFail(t *testing.T) { } func TestGetTraceHeadersWithUpdatedParent(t *testing.T) { - ev := loadRawJSON(t, "testdata/apig-event-metadata.json") + ev := loadRawJSON(t, "../testdata/apig-event-metadata.json") ctx := mockLambdaTraceContext(context.Background(), "1-5ce31dc2-2c779014b90ce44db5e03875", "779014b90ce44db5e03874", true) ctx, _ = ExtractTraceContext(ctx, *ev) diff --git a/internal/trace/wrap_handler.go b/internal/wrapper/wrap_handler.go similarity index 99% rename from internal/trace/wrap_handler.go rename to internal/wrapper/wrap_handler.go index 294aaa53..db543761 100644 --- a/internal/trace/wrap_handler.go +++ b/internal/wrapper/wrap_handler.go @@ -1,4 +1,4 @@ -package trace +package wrapper import ( "context" diff --git a/internal/trace/wrap_handler_test.go b/internal/wrapper/wrap_handler_test.go similarity index 89% rename from internal/trace/wrap_handler_test.go rename to internal/wrapper/wrap_handler_test.go index df49f9f4..6a6644fa 100644 --- a/internal/trace/wrap_handler_test.go +++ b/internal/wrapper/wrap_handler_test.go @@ -1,4 +1,4 @@ -package trace +package wrapper import ( "context" @@ -130,7 +130,7 @@ func TestWrapHandlerAPIGEvent(t *testing.T) { return 5, nil } - _, response, err := runHandlerWithJSON(t, "testdata/apig-event-no-metadata.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/apig-event-no-metadata.json", handler) assert.True(t, called) assert.NoError(t, err) @@ -146,7 +146,7 @@ func TestWrapHandlerNonProxyEvent(t *testing.T) { return 5, nil } - _, response, err := runHandlerWithJSON(t, "testdata/non-proxy-no-metadata.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/non-proxy-no-metadata.json", handler) assert.True(t, called) assert.NoError(t, err) @@ -162,7 +162,7 @@ func TestWrapHandlerEventArgumentOnly(t *testing.T) { return 5, nil } - _, response, err := runHandlerWithJSON(t, "testdata/non-proxy-no-metadata.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/non-proxy-no-metadata.json", handler) assert.True(t, called) assert.NoError(t, err) @@ -177,7 +177,7 @@ func TestWrapHandlerNoArguments(t *testing.T) { return 5, nil } - _, response, err := runHandlerWithJSON(t, "testdata/non-proxy-no-metadata.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/non-proxy-no-metadata.json", handler) assert.True(t, called) assert.NoError(t, err) @@ -192,7 +192,7 @@ func TestWrapHandlerInvalidData(t *testing.T) { return 5, nil } - _, response, err := runHandlerWithJSON(t, "testdata/invalid.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/invalid.json", handler) assert.False(t, called) assert.Error(t, err) @@ -208,7 +208,7 @@ func TestWrapHandlerReturnsError(t *testing.T) { return 5, defaultErr } - _, response, err := runHandlerWithJSON(t, "testdata/non-proxy-no-metadata.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/non-proxy-no-metadata.json", handler) assert.True(t, called) assert.Equal(t, defaultErr, err) From 459a11e0a8280ad53a9010e51b7b51fed478b716 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 22 May 2019 17:43:24 -0400 Subject: [PATCH 03/26] Add empty listener to metrics package --- ddlambda.go | 5 +++-- internal/metrics/listener.go | 20 ++++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) create mode 100644 internal/metrics/listener.go diff --git a/ddlambda.go b/ddlambda.go index c020655a..d3710ec4 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -11,8 +11,9 @@ import ( // WrapHandler is used to instrument your lambda functions, reading in context from API Gateway. // It returns a modified handler that can be passed directly to the lambda.Start function. func WrapHandler(handler interface{}) interface{} { - hl := trace.Listener{} - return wrapper.WrapHandlerWithListeners(handler, &hl) + tl := trace.Listener{} + ml := trace.Listener{} + return wrapper.WrapHandlerWithListeners(handler, &tl, &ml) } // GetTraceHeaders reads a map containing the DataDog trace headers from a context object. diff --git a/internal/metrics/listener.go b/internal/metrics/listener.go new file mode 100644 index 00000000..c06b0bda --- /dev/null +++ b/internal/metrics/listener.go @@ -0,0 +1,20 @@ +package metrics + +import ( + "context" + "encoding/json" +) + +type ( + // Listener implements wrapper.HandlerListener, injecting metrics into the context + Listener struct{} +) + +// HandlerStarted adds metrics service to the context +func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) context.Context { + return ctx +} + +// HandlerFinished implemented as part of the wrapper.HandlerListener interface +func (l *Listener) HandlerFinished(ctx context.Context) { +} From 7ba41a3f494bb0d4ae970e01091a24211ffe7779 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Thu, 23 May 2019 14:18:38 -0400 Subject: [PATCH 04/26] Add API client with method for prewarming connections. --- ddlambda.go | 3 +- internal/metrics/api.go | 52 +++++++++++++++++++++++++++++++++++ internal/metrics/api_test.go | 37 +++++++++++++++++++++++++ internal/metrics/constants.go | 7 +++++ internal/metrics/listener.go | 16 ++++++++++- 5 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 internal/metrics/api.go create mode 100644 internal/metrics/api_test.go create mode 100644 internal/metrics/constants.go diff --git a/ddlambda.go b/ddlambda.go index d3710ec4..52a4aca4 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -4,6 +4,7 @@ import ( "context" "net/http" + "github.com/DataDog/dd-lambda-go/internal/metrics" "github.com/DataDog/dd-lambda-go/internal/trace" "github.com/DataDog/dd-lambda-go/internal/wrapper" ) @@ -12,7 +13,7 @@ import ( // It returns a modified handler that can be passed directly to the lambda.Start function. func WrapHandler(handler interface{}) interface{} { tl := trace.Listener{} - ml := trace.Listener{} + ml := metrics.MakeListener() return wrapper.WrapHandlerWithListeners(handler, &tl, &ml) } diff --git a/internal/metrics/api.go b/internal/metrics/api.go new file mode 100644 index 00000000..2ca624ca --- /dev/null +++ b/internal/metrics/api.go @@ -0,0 +1,52 @@ +package metrics + +import ( + "fmt" + "net/http" +) + +type ( + // APIClient sends metrics to Datadog + APIClient struct { + apiKey string + appKey string + baseAPIURL string + httpClient *http.Client + } +) + +// MakeAPIClient creates a new API client with the given api and app keys +func MakeAPIClient(baseAPIURL, apiKey, appKey string) *APIClient { + httpClient := &http.Client{} + return &APIClient{ + apiKey, + appKey, + baseAPIURL, + httpClient, + } +} + +// PrewarmConnection sends a redundant GET request to the Datadog API to prewarm the TSL connection +func (cl *APIClient) PrewarmConnection() error { + req, err := http.NewRequest("GET", cl.makeRoute("validate"), nil) + if err != nil { + return fmt.Errorf("Couldn't create prewarming request: %v", err) + } + cl.addAPICredentials(req) + _, err = cl.httpClient.Do(req) + if err != nil { + return fmt.Errorf("Couldn't contact server for prewarm request %v", err) + } + return nil +} + +func (cl *APIClient) addAPICredentials(req *http.Request) { + query := req.URL.Query() + query.Add(apiKeyParam, cl.apiKey) + query.Add(appKeyParam, cl.appKey) + req.URL.RawQuery = query.Encode() +} + +func (cl *APIClient) makeRoute(route string) string { + return fmt.Sprintf("%s/%s", cl.baseAPIURL, route) +} diff --git a/internal/metrics/api_test.go b/internal/metrics/api_test.go new file mode 100644 index 00000000..d3f6155e --- /dev/null +++ b/internal/metrics/api_test.go @@ -0,0 +1,37 @@ +package metrics + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + mockAPIKey = "12345" + mockAppKey = "678910" +) + +func TestAddAPICredentials(t *testing.T) { + cl := MakeAPIClient("", mockAPIKey, mockAppKey) + req, _ := http.NewRequest("GET", "http://some-api.com/endpoint", nil) + cl.addAPICredentials(req) + assert.Equal(t, "http://some-api.com/endpoint?api_key=12345&application_key=678910", req.URL.String()) +} + +func TestPrewarmConnection(t *testing.T) { + + called := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + assert.Equal(t, "/validate?api_key=12345&application_key=678910", r.URL.String()) + })) + defer server.Close() + + cl := MakeAPIClient(server.URL, mockAPIKey, mockAppKey) + err := cl.PrewarmConnection() + + assert.NoError(t, err) + assert.True(t, called) +} diff --git a/internal/metrics/constants.go b/internal/metrics/constants.go new file mode 100644 index 00000000..3accdedc --- /dev/null +++ b/internal/metrics/constants.go @@ -0,0 +1,7 @@ +package metrics + +const ( + baseAPIURL = "https://api.datadoghq.com/api/v1" + apiKeyParam = "api_key" + appKeyParam = "application_key" +) diff --git a/internal/metrics/listener.go b/internal/metrics/listener.go index c06b0bda..af576e73 100644 --- a/internal/metrics/listener.go +++ b/internal/metrics/listener.go @@ -7,9 +7,23 @@ import ( type ( // Listener implements wrapper.HandlerListener, injecting metrics into the context - Listener struct{} + Listener struct { + apiClient *APIClient + } ) +// MakeListener initializes a new metrics lambda listener +func MakeListener() Listener { + apiClient := MakeAPIClient(baseAPIURL, "", "") + + // Do this in the background, doesn't matter if it returns + go apiClient.PrewarmConnection() + + return Listener{ + apiClient, + } +} + // HandlerStarted adds metrics service to the context func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) context.Context { return ctx From b252dbb6030fbaa37c92abdd62b7dfb7c12b68c1 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Thu, 23 May 2019 21:16:44 -0400 Subject: [PATCH 05/26] Add batcher for rolling up metrics --- internal/metrics/api.go | 4 ++ internal/metrics/batcher.go | 71 +++++++++++++++++++ internal/metrics/batcher_test.go | 113 +++++++++++++++++++++++++++++++ internal/metrics/constants.go | 6 ++ internal/metrics/model.go | 62 +++++++++++++++++ 5 files changed, 256 insertions(+) create mode 100644 internal/metrics/batcher.go create mode 100644 internal/metrics/batcher_test.go create mode 100644 internal/metrics/model.go diff --git a/internal/metrics/api.go b/internal/metrics/api.go index 2ca624ca..c1c98e81 100644 --- a/internal/metrics/api.go +++ b/internal/metrics/api.go @@ -40,6 +40,10 @@ func (cl *APIClient) PrewarmConnection() error { return nil } +/*func (cl *APIClient) SendRequest() error { + +}*/ + func (cl *APIClient) addAPICredentials(req *http.Request) { query := req.URL.Query() query.Add(apiKeyParam, cl.apiKey) diff --git a/internal/metrics/batcher.go b/internal/metrics/batcher.go new file mode 100644 index 00000000..35c816d1 --- /dev/null +++ b/internal/metrics/batcher.go @@ -0,0 +1,71 @@ +package metrics + +import ( + "fmt" + "math" + "sort" + "strings" + "time" +) + +type ( + // Batcher batches + Batcher struct { + metrics map[string]Metric + batchInterval float64 + } + // BatchKey identifies a batch of metrics + BatchKey struct { + timestamp time.Time + metricType MetricType + name string + tags []string + host *string + } +) + +// MakeBatcher creates a new batcher object +func MakeBatcher(batchInterval float64) *Batcher { + return &Batcher{ + batchInterval: batchInterval, + metrics: map[string]Metric{}, + } +} + +// GetMetric gets an existing metric +func (b *Batcher) GetMetric(bk BatchKey) Metric { + sk := b.getStringKey(bk) + return b.metrics[sk] +} + +// AddMetric adds a point to a given metric +func (b *Batcher) AddMetric(bk BatchKey, metric Metric) { + sk := b.getStringKey(bk) + b.metrics[sk] = metric +} + +// Flush converts the current batch of metrics into API metrics +func (b *Batcher) Flush(timestamp time.Time) []APIMetric { + return []APIMetric{} +} + +func (b *Batcher) getInterval(timestamp time.Time) float64 { + return float64(timestamp.Unix()) - math.Mod(float64(timestamp.Unix()), b.batchInterval) +} + +func (b *Batcher) getStringKey(bk BatchKey) string { + interval := b.getInterval(bk.timestamp) + tagKey := getTagKey(bk.tags) + + if bk.host != nil { + return fmt.Sprintf("(%g)-(%s)-(%s)-(%s)-(%s)", interval, bk.metricType, bk.name, tagKey, *bk.host) + } + return fmt.Sprintf("(%g)-(%s)-(%s)-(%s)", interval, bk.metricType, bk.name, tagKey) +} + +func getTagKey(tags []string) string { + sortedTags := make([]string, len(tags)) + copy(tags, sortedTags) + sort.Strings(sortedTags) + return strings.Join(sortedTags, ":") +} diff --git a/internal/metrics/batcher_test.go b/internal/metrics/batcher_test.go new file mode 100644 index 00000000..facedaff --- /dev/null +++ b/internal/metrics/batcher_test.go @@ -0,0 +1,113 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestGetMetricDifferentTagOrder(t *testing.T) { + + tm := time.Now() + key1 := BatchKey{ + timestamp: tm, + metricType: DistributionType, + name: "metric-1", + tags: []string{"a", "b", "c"}, + } + key2 := BatchKey{ + timestamp: tm, + metricType: DistributionType, + name: "metric-1", + tags: []string{"c", "b", "a"}, + } + batcher := MakeBatcher(10) + dm := Distribution{ + Name: "metric-1", + } + + batcher.AddMetric(key1, &dm) + result := batcher.GetMetric(key2) + assert.Equal(t, &dm, result) +} + +func TestGetMetricFailDifferentName(t *testing.T) { + + tm := time.Now() + key1 := BatchKey{ + timestamp: tm, + metricType: DistributionType, + name: "metric-1", + tags: []string{"a", "b", "c"}, + } + key2 := BatchKey{ + timestamp: tm, + metricType: DistributionType, + name: "metric-2", + tags: []string{"a", "b", "c"}, + } + batcher := MakeBatcher(10) + dm := Distribution{ + Name: "metric-1", + } + + batcher.AddMetric(key1, &dm) + result := batcher.GetMetric(key2) + assert.Nil(t, result) +} + +func TestGetMetricFailDifferentHost(t *testing.T) { + + tm := time.Now() + hostname := "host-1" + key1 := BatchKey{ + timestamp: tm, + metricType: DistributionType, + name: "metric-1", + tags: []string{"a", "b", "c"}, + host: &hostname, + } + key2 := BatchKey{ + timestamp: tm, + metricType: DistributionType, + name: "metric-2", + tags: []string{"a", "b", "c"}, + } + batcher := MakeBatcher(10) + dm := Distribution{ + Name: "metric-1", + } + + batcher.AddMetric(key1, &dm) + result := batcher.GetMetric(key2) + assert.Nil(t, result) +} + +func TestGetMetricSameHost(t *testing.T) { + + tm := time.Now() + hostname := "host-1" + key1 := BatchKey{ + timestamp: tm, + metricType: DistributionType, + name: "metric-1", + tags: []string{"a", "b", "c"}, + host: &hostname, + } + key2 := BatchKey{ + timestamp: tm, + metricType: DistributionType, + name: "metric-2", + tags: []string{"a", "b", "c"}, + host: &hostname, + } + batcher := MakeBatcher(10) + dm := Distribution{ + Name: "metric-1", + } + + batcher.AddMetric(key1, &dm) + result := batcher.GetMetric(key2) + assert.Equal(t, &dm, result) +} diff --git a/internal/metrics/constants.go b/internal/metrics/constants.go index 3accdedc..a128376e 100644 --- a/internal/metrics/constants.go +++ b/internal/metrics/constants.go @@ -5,3 +5,9 @@ const ( apiKeyParam = "api_key" appKeyParam = "application_key" ) + +type MetricType string + +const ( + DistributionType MetricType = "distribution" +) diff --git a/internal/metrics/model.go b/internal/metrics/model.go new file mode 100644 index 00000000..9a36e2b8 --- /dev/null +++ b/internal/metrics/model.go @@ -0,0 +1,62 @@ +package metrics + +import ( + "time" +) + +type ( + // Metric represents a metric that can have any kind of + Metric interface { + AddPoint(value float64) + ToAPIMetric(timestamp time.Time, interval time.Duration) []APIMetric + } + + // APIMetric is a metric that can be marshalled to send to the metrics API + APIMetric struct { + Name string `json:"metric"` + Host *string `json:"host,omitempty"` + Tags []string `json:"tags,omitempty"` + MetricType MetricType `json:"type"` + Interval *float64 `json:"interval,omitempty"` + Points [][]float64 `json:"points"` + } + + // Distribution is a type of metric that is aggregated over multiple hosts + Distribution struct { + Name string + Tags []string + Host *string + Values []float64 + } +) + +// AddPoint adds a point to the distribution metric +func (d *Distribution) AddPoint(value float64) { + d.Values = append(d.Values, value) +} + +// ToAPIMetric converts a distribution into an API ready format. +func (d *Distribution) ToAPIMetric(timestamp time.Time, interval time.Duration) []APIMetric { + + intervalSeconds := new(float64) + *intervalSeconds = interval.Seconds() + + points := make([][]float64, len(d.Values)) + + currentTime := float64(timestamp.Unix()) + + for i, val := range d.Values { + points[i] = []float64{currentTime, val} + } + + return []APIMetric{ + APIMetric{ + Name: d.Name, + Host: d.Host, + Tags: d.Tags, + MetricType: DistributionType, + Points: points, + Interval: intervalSeconds, + }, + } +} From 48592ae5eeff63fa3a21aad74a9ce6d4407bf047 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Fri, 24 May 2019 10:28:42 -0400 Subject: [PATCH 06/26] Implement flush method on batcher --- internal/metrics/batcher.go | 16 ++++++++++-- internal/metrics/batcher_test.go | 44 +++++++++++++++++++++++++++++++- internal/metrics/constants.go | 3 +++ internal/metrics/model.go | 6 +---- 4 files changed, 61 insertions(+), 8 deletions(-) diff --git a/internal/metrics/batcher.go b/internal/metrics/batcher.go index 35c816d1..3a98122c 100644 --- a/internal/metrics/batcher.go +++ b/internal/metrics/batcher.go @@ -46,7 +46,19 @@ func (b *Batcher) AddMetric(bk BatchKey, metric Metric) { // Flush converts the current batch of metrics into API metrics func (b *Batcher) Flush(timestamp time.Time) []APIMetric { - return []APIMetric{} + + ar := []APIMetric{} + interval := time.Duration(0) // TODO Get actual interval + + for _, metric := range b.metrics { + values := metric.ToAPIMetric(timestamp, interval) + for _, val := range values { + ar = append(ar, val) + } + } + b.metrics = map[string]Metric{} + + return ar } func (b *Batcher) getInterval(timestamp time.Time) float64 { @@ -65,7 +77,7 @@ func (b *Batcher) getStringKey(bk BatchKey) string { func getTagKey(tags []string) string { sortedTags := make([]string, len(tags)) - copy(tags, sortedTags) + copy(sortedTags, tags) sort.Strings(sortedTags) return strings.Join(sortedTags, ":") } diff --git a/internal/metrics/batcher_test.go b/internal/metrics/batcher_test.go index facedaff..9e1f7da4 100644 --- a/internal/metrics/batcher_test.go +++ b/internal/metrics/batcher_test.go @@ -98,7 +98,7 @@ func TestGetMetricSameHost(t *testing.T) { key2 := BatchKey{ timestamp: tm, metricType: DistributionType, - name: "metric-2", + name: "metric-1", tags: []string{"a", "b", "c"}, host: &hostname, } @@ -111,3 +111,45 @@ func TestGetMetricSameHost(t *testing.T) { result := batcher.GetMetric(key2) assert.Equal(t, &dm, result) } + +func TestFlushSameInterval(t *testing.T) { + tm := time.Now() + hostname := "host-1" + key1 := BatchKey{ + timestamp: tm, + metricType: DistributionType, + name: "metric-1", + tags: []string{"a", "b", "c"}, + host: &hostname, + } + batcher := MakeBatcher(10) + dm := Distribution{ + Name: "metric-1", + Tags: key1.tags, + Host: &hostname, + Values: []float64{}, + } + + dm.AddPoint(1) + dm.AddPoint(2) + dm.AddPoint(3) + + batcher.AddMetric(key1, &dm) + + floatTime := float64(tm.Unix()) + result := batcher.Flush(tm) + expected := []APIMetric{ + { + Name: "metric-1", + Host: &hostname, + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Interval: nil, + Points: [][]float64{ + {floatTime, 1}, {floatTime, 2}, {floatTime, 3}, + }, + }, + } + + assert.Equal(t, expected, result) +} diff --git a/internal/metrics/constants.go b/internal/metrics/constants.go index a128376e..4019b762 100644 --- a/internal/metrics/constants.go +++ b/internal/metrics/constants.go @@ -6,8 +6,11 @@ const ( appKeyParam = "application_key" ) +// MetricType enumerates all the available metric types type MetricType string const ( + + // DistributionType represents a distribution metric DistributionType MetricType = "distribution" ) diff --git a/internal/metrics/model.go b/internal/metrics/model.go index 9a36e2b8..97ac86d4 100644 --- a/internal/metrics/model.go +++ b/internal/metrics/model.go @@ -37,10 +37,6 @@ func (d *Distribution) AddPoint(value float64) { // ToAPIMetric converts a distribution into an API ready format. func (d *Distribution) ToAPIMetric(timestamp time.Time, interval time.Duration) []APIMetric { - - intervalSeconds := new(float64) - *intervalSeconds = interval.Seconds() - points := make([][]float64, len(d.Values)) currentTime := float64(timestamp.Unix()) @@ -56,7 +52,7 @@ func (d *Distribution) ToAPIMetric(timestamp time.Time, interval time.Duration) Tags: d.Tags, MetricType: DistributionType, Points: points, - Interval: intervalSeconds, + Interval: nil, }, } } From c4f7096047267dcd146aec423aef7f1156108166 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Fri, 24 May 2019 11:19:58 -0400 Subject: [PATCH 07/26] Implement SendMetrics method --- internal/metrics/api.go | 43 +++++++++++++++-- internal/metrics/api_test.go | 93 ++++++++++++++++++++++++++++++++++++ 2 files changed, 133 insertions(+), 3 deletions(-) diff --git a/internal/metrics/api.go b/internal/metrics/api.go index c1c98e81..4fd1d63f 100644 --- a/internal/metrics/api.go +++ b/internal/metrics/api.go @@ -1,6 +1,8 @@ package metrics import ( + "bytes" + "encoding/json" "fmt" "net/http" ) @@ -13,6 +15,10 @@ type ( baseAPIURL string httpClient *http.Client } + + postMetricsModel struct { + Series []APIMetric `json:"series"` + } ) // MakeAPIClient creates a new API client with the given api and app keys @@ -35,14 +41,39 @@ func (cl *APIClient) PrewarmConnection() error { cl.addAPICredentials(req) _, err = cl.httpClient.Do(req) if err != nil { - return fmt.Errorf("Couldn't contact server for prewarm request %v", err) + return fmt.Errorf("Couldn't contact server for prewarm request") } return nil } -/*func (cl *APIClient) SendRequest() error { +// SendMetrics posts a batch metrics payload to the Datadog API +func (cl *APIClient) SendMetrics(metrics []APIMetric) error { + content, err := marshalAPIMetricsModel(metrics) + if err != nil { + return fmt.Errorf("Couldn't marshal metrics model: %v", err) + } + body := bytes.NewBuffer(content) + + req, err := http.NewRequest("POST", cl.makeRoute("series"), body) + if err != nil { + return fmt.Errorf("Couldn't create send metrics request:%v", err) + } + defer req.Body.Close() + + cl.addAPICredentials(req) + + resp, err := cl.httpClient.Do(req) -}*/ + if err != nil { + return fmt.Errorf("Failed to send metrics to API") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusCreated { + return fmt.Errorf("Failed to send metrics to API. Status Code %d", resp.StatusCode) + } + return nil +} func (cl *APIClient) addAPICredentials(req *http.Request) { query := req.URL.Query() @@ -54,3 +85,9 @@ func (cl *APIClient) addAPICredentials(req *http.Request) { func (cl *APIClient) makeRoute(route string) string { return fmt.Sprintf("%s/%s", cl.baseAPIURL, route) } + +func marshalAPIMetricsModel(metrics []APIMetric) ([]byte, error) { + pm := postMetricsModel{} + pm.Series = metrics + return json.Marshal(pm) +} diff --git a/internal/metrics/api_test.go b/internal/metrics/api_test.go index d3f6155e..e482e146 100644 --- a/internal/metrics/api_test.go +++ b/internal/metrics/api_test.go @@ -1,6 +1,7 @@ package metrics import ( + "io/ioutil" "net/http" "net/http/httptest" "testing" @@ -35,3 +36,95 @@ func TestPrewarmConnection(t *testing.T) { assert.NoError(t, err) assert.True(t, called) } + +func TestSendMetricsSuccess(t *testing.T) { + called := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusCreated) + body, _ := ioutil.ReadAll(r.Body) + s := string(body) + + assert.Equal(t, "/series?api_key=12345&application_key=678910", r.URL.String()) + assert.Equal(t, "{\"series\":[{\"metric\":\"metric-1\",\"tags\":[\"a\",\"b\",\"c\"],\"type\":\"distribution\",\"points\":[[1,2],[3,4],[5,6]]}]}", s) + + })) + defer server.Close() + + am := []APIMetric{ + { + Name: "metric-1", + Host: nil, + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + {float64(1), float64(2)}, {float64(3), float64(4)}, {float64(5), float64(6)}, + }, + }, + } + + cl := MakeAPIClient(server.URL, mockAPIKey, mockAppKey) + err := cl.SendMetrics(am) + + assert.NoError(t, err) + assert.True(t, called) +} + +func TestSendMetricsBadRequest(t *testing.T) { + called := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusForbidden) + body, _ := ioutil.ReadAll(r.Body) + s := string(body) + + assert.Equal(t, "/series?api_key=12345&application_key=678910", r.URL.String()) + assert.Equal(t, "{\"series\":[{\"metric\":\"metric-1\",\"tags\":[\"a\",\"b\",\"c\"],\"type\":\"distribution\",\"points\":[[1,2],[3,4],[5,6]]}]}", s) + + })) + defer server.Close() + + am := []APIMetric{ + { + Name: "metric-1", + Host: nil, + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + {float64(1), float64(2)}, {float64(3), float64(4)}, {float64(5), float64(6)}, + }, + }, + } + + cl := MakeAPIClient(server.URL, mockAPIKey, mockAppKey) + err := cl.SendMetrics(am) + + assert.Error(t, err) + assert.True(t, called) +} + +func TestSendMetricsCantReachServer(t *testing.T) { + called := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + })) + defer server.Close() + + am := []APIMetric{ + { + Name: "metric-1", + Host: nil, + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + {float64(1), float64(2)}, {float64(3), float64(4)}, {float64(5), float64(6)}, + }, + }, + } + + cl := MakeAPIClient("httpa:///badly-formatted-url", mockAPIKey, mockAppKey) + err := cl.SendMetrics(am) + + assert.Error(t, err) + assert.False(t, called) +} From 8253e74e3229877bab6ec1c6eb587987289f12cf Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Tue, 28 May 2019 14:06:22 -0400 Subject: [PATCH 08/26] Make batcher only accept metrics and a timestamp --- internal/metrics/batcher.go | 27 +++--- internal/metrics/batcher_test.go | 155 ++++++++++++++----------------- internal/metrics/model.go | 24 +++++ 3 files changed, 105 insertions(+), 101 deletions(-) diff --git a/internal/metrics/batcher.go b/internal/metrics/batcher.go index 3a98122c..1939907c 100644 --- a/internal/metrics/batcher.go +++ b/internal/metrics/batcher.go @@ -16,7 +16,6 @@ type ( } // BatchKey identifies a batch of metrics BatchKey struct { - timestamp time.Time metricType MetricType name string tags []string @@ -32,20 +31,18 @@ func MakeBatcher(batchInterval float64) *Batcher { } } -// GetMetric gets an existing metric -func (b *Batcher) GetMetric(bk BatchKey) Metric { - sk := b.getStringKey(bk) - return b.metrics[sk] -} - // AddMetric adds a point to a given metric -func (b *Batcher) AddMetric(bk BatchKey, metric Metric) { - sk := b.getStringKey(bk) - b.metrics[sk] = metric +func (b *Batcher) AddMetric(timestamp time.Time, metric Metric) { + sk := b.getStringKey(timestamp, metric.ToBatchKey()) + if existing, ok := b.metrics[sk]; ok { + existing.Join(metric) + } else { + b.metrics[sk] = metric + } } -// Flush converts the current batch of metrics into API metrics -func (b *Batcher) Flush(timestamp time.Time) []APIMetric { +// ToAPIMetrics converts the current batch of metrics into API metrics +func (b *Batcher) ToAPIMetrics(timestamp time.Time) []APIMetric { ar := []APIMetric{} interval := time.Duration(0) // TODO Get actual interval @@ -56,8 +53,6 @@ func (b *Batcher) Flush(timestamp time.Time) []APIMetric { ar = append(ar, val) } } - b.metrics = map[string]Metric{} - return ar } @@ -65,8 +60,8 @@ func (b *Batcher) getInterval(timestamp time.Time) float64 { return float64(timestamp.Unix()) - math.Mod(float64(timestamp.Unix()), b.batchInterval) } -func (b *Batcher) getStringKey(bk BatchKey) string { - interval := b.getInterval(bk.timestamp) +func (b *Batcher) getStringKey(timestamp time.Time, bk BatchKey) string { + interval := b.getInterval(timestamp) tagKey := getTagKey(bk.tags) if bk.host != nil { diff --git a/internal/metrics/batcher_test.go b/internal/metrics/batcher_test.go index 9e1f7da4..baeb2773 100644 --- a/internal/metrics/batcher_test.go +++ b/internal/metrics/batcher_test.go @@ -10,122 +10,107 @@ import ( func TestGetMetricDifferentTagOrder(t *testing.T) { tm := time.Now() - key1 := BatchKey{ - timestamp: tm, - metricType: DistributionType, - name: "metric-1", - tags: []string{"a", "b", "c"}, - } - key2 := BatchKey{ - timestamp: tm, - metricType: DistributionType, - name: "metric-1", - tags: []string{"c", "b", "a"}, - } batcher := MakeBatcher(10) - dm := Distribution{ - Name: "metric-1", + dm1 := Distribution{ + Name: "metric-1", + Values: []float64{1, 2}, + Tags: []string{"a", "b", "c"}, + } + dm2 := Distribution{ + Name: "metric-1", + Values: []float64{3, 4}, + Tags: []string{"c", "b", "a"}, } - batcher.AddMetric(key1, &dm) - result := batcher.GetMetric(key2) - assert.Equal(t, &dm, result) + batcher.AddMetric(tm, &dm1) + batcher.AddMetric(tm, &dm2) + + assert.Equal(t, []float64{1, 2, 3, 4}, dm1.Values) } func TestGetMetricFailDifferentName(t *testing.T) { tm := time.Now() - key1 := BatchKey{ - timestamp: tm, - metricType: DistributionType, - name: "metric-1", - tags: []string{"a", "b", "c"}, - } - key2 := BatchKey{ - timestamp: tm, - metricType: DistributionType, - name: "metric-2", - tags: []string{"a", "b", "c"}, - } batcher := MakeBatcher(10) - dm := Distribution{ - Name: "metric-1", + + dm1 := Distribution{ + Name: "metric-1", + Values: []float64{1, 2}, + Tags: []string{"a", "b", "c"}, + } + dm2 := Distribution{ + Name: "metric-2", + Values: []float64{3, 4}, + Tags: []string{"c", "b", "a"}, } - batcher.AddMetric(key1, &dm) - result := batcher.GetMetric(key2) - assert.Nil(t, result) + batcher.AddMetric(tm, &dm1) + batcher.AddMetric(tm, &dm2) + + assert.Equal(t, []float64{1, 2}, dm1.Values) + } func TestGetMetricFailDifferentHost(t *testing.T) { - tm := time.Now() - hostname := "host-1" - key1 := BatchKey{ - timestamp: tm, - metricType: DistributionType, - name: "metric-1", - tags: []string{"a", "b", "c"}, - host: &hostname, - } - key2 := BatchKey{ - timestamp: tm, - metricType: DistributionType, - name: "metric-2", - tags: []string{"a", "b", "c"}, - } batcher := MakeBatcher(10) - dm := Distribution{ - Name: "metric-1", + + host1 := "my-host-1" + host2 := "my-host-2" + + dm1 := Distribution{ + Name: "metric-1", + Values: []float64{1, 2}, + Tags: []string{"a", "b", "c"}, + Host: &host1, + } + dm2 := Distribution{ + Name: "metric-1", + Values: []float64{3, 4}, + Tags: []string{"a", "b", "c"}, + Host: &host2, } - batcher.AddMetric(key1, &dm) - result := batcher.GetMetric(key2) - assert.Nil(t, result) + batcher.AddMetric(tm, &dm1) + batcher.AddMetric(tm, &dm2) + + assert.Equal(t, []float64{1, 2}, dm1.Values) } func TestGetMetricSameHost(t *testing.T) { tm := time.Now() - hostname := "host-1" - key1 := BatchKey{ - timestamp: tm, - metricType: DistributionType, - name: "metric-1", - tags: []string{"a", "b", "c"}, - host: &hostname, - } - key2 := BatchKey{ - timestamp: tm, - metricType: DistributionType, - name: "metric-1", - tags: []string{"a", "b", "c"}, - host: &hostname, - } batcher := MakeBatcher(10) - dm := Distribution{ - Name: "metric-1", + + host := "my-host" + + dm1 := Distribution{ + Name: "metric-1", + Values: []float64{1, 2}, + Tags: []string{"a", "b", "c"}, + Host: &host, } + dm2 := Distribution{ + Name: "metric-1", + Values: []float64{3, 4}, + Tags: []string{"a", "b", "c"}, + Host: &host, + } + + batcher.AddMetric(tm, &dm1) + batcher.AddMetric(tm, &dm2) - batcher.AddMetric(key1, &dm) - result := batcher.GetMetric(key2) - assert.Equal(t, &dm, result) + assert.Equal(t, []float64{1, 2, 3, 4}, dm1.Values) } -func TestFlushSameInterval(t *testing.T) { +func TestToAPIMetricsSameInterval(t *testing.T) { tm := time.Now() hostname := "host-1" - key1 := BatchKey{ - timestamp: tm, - metricType: DistributionType, - name: "metric-1", - tags: []string{"a", "b", "c"}, - host: &hostname, - } + batcher := MakeBatcher(10) dm := Distribution{ Name: "metric-1", - Tags: key1.tags, + Tags: []string{"a", "b", "c"}, Host: &hostname, Values: []float64{}, } @@ -134,10 +119,10 @@ func TestFlushSameInterval(t *testing.T) { dm.AddPoint(2) dm.AddPoint(3) - batcher.AddMetric(key1, &dm) + batcher.AddMetric(tm, &dm) floatTime := float64(tm.Unix()) - result := batcher.Flush(tm) + result := batcher.ToAPIMetrics(tm) expected := []APIMetric{ { Name: "metric-1", diff --git a/internal/metrics/model.go b/internal/metrics/model.go index 97ac86d4..b8d94d35 100644 --- a/internal/metrics/model.go +++ b/internal/metrics/model.go @@ -9,6 +9,8 @@ type ( Metric interface { AddPoint(value float64) ToAPIMetric(timestamp time.Time, interval time.Duration) []APIMetric + ToBatchKey() BatchKey + Join(metric Metric) } // APIMetric is a metric that can be marshalled to send to the metrics API @@ -35,6 +37,28 @@ func (d *Distribution) AddPoint(value float64) { d.Values = append(d.Values, value) } +// ToBatchKey returns a key that can be used to batch the metric +func (d *Distribution) ToBatchKey() BatchKey { + return BatchKey{ + name: d.Name, + host: d.Host, + tags: d.Tags, + metricType: DistributionType, + } +} + +// Join creates a union between two metric sets +func (d *Distribution) Join(metric Metric) { + otherDist, ok := metric.(*Distribution) + if !ok { + return + } + for _, val := range otherDist.Values { + d.AddPoint(val) + } + +} + // ToAPIMetric converts a distribution into an API ready format. func (d *Distribution) ToAPIMetric(timestamp time.Time, interval time.Duration) []APIMetric { points := make([][]float64, len(d.Values)) From 6a2b5493c680017558e55f18902c9a259ef66c2c Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Tue, 28 May 2019 14:06:59 -0400 Subject: [PATCH 09/26] Add a time service, (so time related operations can be mocked) --- internal/metrics/time.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 internal/metrics/time.go diff --git a/internal/metrics/time.go b/internal/metrics/time.go new file mode 100644 index 00000000..c918c251 --- /dev/null +++ b/internal/metrics/time.go @@ -0,0 +1,27 @@ +package metrics + +import "time" + +type ( + //TimeService wraps common time related operations + TimeService interface { + NewTicker(duration time.Duration) *time.Ticker + Now() time.Time + } + + timeService struct { + } +) + +// MakeTimeService creates a new time service +func MakeTimeService() TimeService { + return &timeService{} +} + +func (ts *timeService) NewTicker(duration time.Duration) *time.Ticker { + return time.NewTicker(duration) +} + +func (ts *timeService) Now() time.Time { + return time.Now() +} From eb477c32e26d125763e5b2f0c77eeda643dd81c6 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Tue, 28 May 2019 14:15:34 -0400 Subject: [PATCH 10/26] Add processor class for sending metrics --- internal/metrics/api.go | 7 +- internal/metrics/constants.go | 9 +- internal/metrics/processor.go | 120 ++++++++++++++++++ internal/metrics/processor_test.go | 197 +++++++++++++++++++++++++++++ 4 files changed, 329 insertions(+), 4 deletions(-) create mode 100644 internal/metrics/processor.go create mode 100644 internal/metrics/processor_test.go diff --git a/internal/metrics/api.go b/internal/metrics/api.go index 4fd1d63f..4e22fe88 100644 --- a/internal/metrics/api.go +++ b/internal/metrics/api.go @@ -8,7 +8,12 @@ import ( ) type ( - // APIClient sends metrics to Datadog + // Client sends metrics to Datadog + Client interface { + SendMetrics(metrics []APIMetric) error + } + + // APIClient send metrics to Datadog, via the Datadog API APIClient struct { apiKey string appKey string diff --git a/internal/metrics/constants.go b/internal/metrics/constants.go index 4019b762..efcc0c67 100644 --- a/internal/metrics/constants.go +++ b/internal/metrics/constants.go @@ -1,9 +1,12 @@ package metrics +import "time" + const ( - baseAPIURL = "https://api.datadoghq.com/api/v1" - apiKeyParam = "api_key" - appKeyParam = "application_key" + baseAPIURL = "https://api.datadoghq.com/api/v1" + apiKeyParam = "api_key" + appKeyParam = "application_key" + defaultRetryInterval = time.Millisecond * 250 ) // MetricType enumerates all the available metric types diff --git a/internal/metrics/processor.go b/internal/metrics/processor.go new file mode 100644 index 00000000..e2c81f30 --- /dev/null +++ b/internal/metrics/processor.go @@ -0,0 +1,120 @@ +package metrics + +import ( + "sync" + "time" + + "github.com/cenkalti/backoff" +) + +type ( + // Processor is used to batch metrics on a background thread, and send them on to a client. + Processor interface { + // AddMetric sends a metric to the agent + AddMetric(metric Metric) + // StartProcessing begins processing metrics asynchronously + StartProcessing() + // FinishProcessing shuts down the agent, and tries to flush any remaining metrics + FinishProcessing() + } + + processor struct { + metricsChan chan Metric + timeService TimeService + waitGroup sync.WaitGroup + batchInterval float64 + client Client + batcher *Batcher + shouldRetryOnFail bool + isProcessing bool + } +) + +// MakeProcessor creates a new metrics context +func MakeProcessor(client Client, timeService TimeService, batchInterval float64, shouldRetryOnFail bool) Processor { + batcher := MakeBatcher(batchInterval) + + return &processor{ + metricsChan: make(chan Metric, 2000), + batchInterval: batchInterval, + waitGroup: sync.WaitGroup{}, + client: client, + batcher: batcher, + shouldRetryOnFail: shouldRetryOnFail, + timeService: timeService, + isProcessing: false, + } +} + +func (p *processor) AddMetric(metric Metric) { + // We use a large buffer in the metrics channel, to make this operation non-blocking. + // However, if the channel does fill up, this will become a blocking operation. + p.metricsChan <- metric +} + +func (p *processor) StartProcessing() { + if !p.isProcessing { + p.isProcessing = true + p.waitGroup.Add(1) + go p.processMetrics() + } + +} + +func (p *processor) FinishProcessing() { + if !p.isProcessing { + p.StartProcessing() + } + // Closes the metrics channel, and waits for the last send to complete + close(p.metricsChan) + p.waitGroup.Wait() +} + +func (p *processor) processMetrics() { + + ticker := p.timeService.NewTicker(time.Duration(p.batchInterval) * time.Second) + + shouldExit := false + for !shouldExit { + shouldProcess := false + // Batches metrics until timeout is reached + select { + case m, ok := <-p.metricsChan: + if !ok { + // The channel has now been closed + shouldProcess = true + shouldExit = true + } else { + p.batcher.AddMetric(p.timeService.Now(), m) + } + case <-ticker.C: + shouldProcess = true + } + if shouldProcess { + if shouldExit && p.shouldRetryOnFail { + // If we are shutting down, and we just failed to send our last batch, do a retry + bo := backoff.WithMaxRetries(backoff.NewConstantBackOff(defaultRetryInterval), 2) + backoff.Retry(p.sendMetricsBatch, bo) + } else { + p.sendMetricsBatch() + } + } + } + ticker.Stop() + p.waitGroup.Done() +} + +func (p *processor) sendMetricsBatch() error { + mts := p.batcher.ToAPIMetrics(p.timeService.Now()) + if len(mts) > 0 { + err := p.client.SendMetrics(mts) + if err != nil { + return err + } + // All the metrics in the batcher were sent successfully, + // the batcher can now be cleared. If there was an error, + // the metrics will stay in the batcher and be sent in the next cycle. + p.batcher = MakeBatcher(p.batchInterval) + } + return nil +} diff --git a/internal/metrics/processor_test.go b/internal/metrics/processor_test.go new file mode 100644 index 00000000..e31a8d2b --- /dev/null +++ b/internal/metrics/processor_test.go @@ -0,0 +1,197 @@ +package metrics + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type ( + mockClient struct { + batches chan []APIMetric + sendMetricsCalledCount int + err error + } + + mockTimeService struct { + now time.Time + tickerChan chan time.Time + } +) + +func makeMockClient() mockClient { + return mockClient{ + batches: make(chan []APIMetric, 10), + err: nil, + } +} + +func makeMockTimeService() mockTimeService { + return mockTimeService{ + now: time.Now(), + tickerChan: make(chan time.Time), + } +} + +func (mc *mockClient) SendMetrics(mts []APIMetric) error { + mc.sendMetricsCalledCount++ + mc.batches <- mts + return mc.err +} + +func (ts *mockTimeService) NewTicker(duration time.Duration) *time.Ticker { + return &time.Ticker{ + C: ts.tickerChan, + } +} + +func (ts *mockTimeService) Now() time.Time { + return ts.now +} + +func TestProcessorBatches(t *testing.T) { + mc := makeMockClient() + mts := makeMockTimeService() + + mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + nowUnix := float64(mts.now.Unix()) + + processor := MakeProcessor(&mc, &mts, 1000, false) + + d1 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{1, 2, 3}, + } + d2 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{4, 5, 6}, + } + + processor.AddMetric(&d1) + processor.AddMetric(&d2) + + processor.StartProcessing() + processor.FinishProcessing() + + firstBatch := <-mc.batches + + assert.Equal(t, []APIMetric{{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + []float64{nowUnix, 1}, + []float64{nowUnix, 2}, + []float64{nowUnix, 3}, + []float64{nowUnix, 4}, + []float64{nowUnix, 5}, + []float64{nowUnix, 6}, + }, + }}, firstBatch) +} + +func TestProcessorBatchesPerTick(t *testing.T) { + mc := makeMockClient() + mts := makeMockTimeService() + + firstTime, _ := time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + firstTimeUnix := float64(firstTime.Unix()) + secondTime, _ := time.Parse(time.RFC3339, "2007-01-02T15:04:05Z") + secondTimeUnix := float64(secondTime.Unix()) + mts.now = firstTime + + processor := MakeProcessor(&mc, &mts, 1000, false) + + d1 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{1, 2}, + } + d2 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{3}, + } + d3 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{4, 5}, + } + d4 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{6}, + } + + processor.StartProcessing() + + processor.AddMetric(&d1) + processor.AddMetric(&d2) + + // This wait is necessary to make sure both metrics have been added to the batch + <-time.Tick(time.Millisecond * 10) + // Sending time to the ticker channel will flush the batch. + mts.tickerChan <- firstTime + firstBatch := <-mc.batches + mts.now = secondTime + + processor.AddMetric(&d3) + processor.AddMetric(&d4) + + processor.FinishProcessing() + secondBatch := <-mc.batches + batches := [][]APIMetric{firstBatch, secondBatch} + + assert.Equal(t, [][]APIMetric{ + []APIMetric{ + { + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + []float64{firstTimeUnix, 1}, + []float64{firstTimeUnix, 2}, + []float64{firstTimeUnix, 3}, + }, + }}, + []APIMetric{ + { + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + []float64{secondTimeUnix, 4}, + []float64{secondTimeUnix, 5}, + []float64{secondTimeUnix, 6}, + }, + }}, + }, batches) +} + +func TestProcessorPerformsRetry(t *testing.T) { + mc := makeMockClient() + mts := makeMockTimeService() + + mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + + shouldRetry := true + processor := MakeProcessor(&mc, &mts, 1000, shouldRetry) + + d1 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{1, 2, 3}, + } + + mc.err = errors.New("Some error") + + processor.AddMetric(&d1) + + processor.FinishProcessing() + + assert.Equal(t, 3, mc.sendMetricsCalledCount) +} From c3fe8301ab3e7844e7c72b3f806b5138665dec57 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Tue, 28 May 2019 14:44:08 -0400 Subject: [PATCH 11/26] Expose metrics publically in package --- ddlambda.go | 15 +++++++++++++++ internal/metrics/constants.go | 1 + internal/metrics/context.go | 21 +++++++++++++++++++++ internal/metrics/context_test.go | 20 ++++++++++++++++++++ internal/metrics/listener.go | 11 +++++++++++ 5 files changed, 68 insertions(+) create mode 100644 internal/metrics/context.go create mode 100644 internal/metrics/context_test.go diff --git a/ddlambda.go b/ddlambda.go index 52a4aca4..d0c28601 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -30,3 +30,18 @@ func AddTraceHeaders(ctx context.Context, req *http.Request) { req.Header.Add(key, value) } } + +// DistributionMetric sends a distribution metric to DataDog +func DistributionMetric(ctx context.Context, metric string, value float64, tags ...string) { + pr := metrics.GetProcessor(ctx) + if pr == nil { + return + } + m := metrics.Distribution{ + Name: metric, + Tags: tags, + Values: []float64{}, + } + m.AddPoint(value) + pr.AddMetric(&m) +} diff --git a/internal/metrics/constants.go b/internal/metrics/constants.go index efcc0c67..4d107137 100644 --- a/internal/metrics/constants.go +++ b/internal/metrics/constants.go @@ -7,6 +7,7 @@ const ( apiKeyParam = "api_key" appKeyParam = "application_key" defaultRetryInterval = time.Millisecond * 250 + defaultBatchInterval = time.Second * 15 ) // MetricType enumerates all the available metric types diff --git a/internal/metrics/context.go b/internal/metrics/context.go new file mode 100644 index 00000000..c1cf936d --- /dev/null +++ b/internal/metrics/context.go @@ -0,0 +1,21 @@ +package metrics + +import "context" + +type contextKeytype int + +var traceContextKey = new(contextKeytype) + +// GetProcessor retrieves the processor from a context object. +func GetProcessor(ctx context.Context) Processor { + result := ctx.Value(traceContextKey) + if result == nil { + return nil + } + return result.(Processor) +} + +// AddProcessor adds a processor to a context object +func AddProcessor(ctx context.Context, processor Processor) context.Context { + return context.WithValue(ctx, traceContextKey, processor) +} diff --git a/internal/metrics/context_test.go b/internal/metrics/context_test.go new file mode 100644 index 00000000..1822798c --- /dev/null +++ b/internal/metrics/context_test.go @@ -0,0 +1,20 @@ +package metrics + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetProcessorEmptyContext(t *testing.T) { + ctx := context.Background() + result := GetProcessor(ctx) + assert.Nil(t, result) +} + +func TestGetProcessorSuccess(t *testing.T) { + ctx := AddProcessor(context.Background(), MakeProcessor(nil, nil, 0, false)) + result := GetProcessor(ctx) + assert.NotNil(t, result) +} diff --git a/internal/metrics/listener.go b/internal/metrics/listener.go index af576e73..3bb88a8f 100644 --- a/internal/metrics/listener.go +++ b/internal/metrics/listener.go @@ -26,9 +26,20 @@ func MakeListener() Listener { // HandlerStarted adds metrics service to the context func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) context.Context { + + ts := MakeTimeService() + pr := MakeProcessor(l.apiClient, ts, float64(defaultBatchInterval), false) + + ctx = AddProcessor(ctx, pr) + pr.StartProcessing() + return ctx } // HandlerFinished implemented as part of the wrapper.HandlerListener interface func (l *Listener) HandlerFinished(ctx context.Context) { + pr := GetProcessor(ctx) + if pr != nil { + pr.FinishProcessing() + } } From 709f3e5df8198591d085e1b4f47fb1e11727d8c6 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Tue, 28 May 2019 15:21:55 -0400 Subject: [PATCH 12/26] Expose configuration options for ddlambda --- ddlambda.go | 46 ++++++++++++++++++++++++++++++++++-- internal/metrics/listener.go | 20 +++++++++++++--- 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/ddlambda.go b/ddlambda.go index d0c28601..376c8eda 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -3,17 +3,36 @@ package ddlambda import ( "context" "net/http" + "os" + "time" "github.com/DataDog/dd-lambda-go/internal/metrics" "github.com/DataDog/dd-lambda-go/internal/trace" "github.com/DataDog/dd-lambda-go/internal/wrapper" ) +type ( + // Config gives options for how ddlambda should behave + Config struct { + APIKey string + AppKey string + ShouldRetryOnFailure bool + BatchInterval time.Duration + } +) + +const ( + // DatadogAPIKeyEnvVar is the environment variable that will be used as an API key by default + DatadogAPIKeyEnvVar = "DATADOG_API_KEY" + // DatadogAPPKeyEnvVar is the environment variable that will be used as an API key by default + DatadogAPPKeyEnvVar = "DATADOG_APP_KEY" +) + // WrapHandler is used to instrument your lambda functions, reading in context from API Gateway. // It returns a modified handler that can be passed directly to the lambda.Start function. -func WrapHandler(handler interface{}) interface{} { +func WrapHandler(handler interface{}, cfg *Config) interface{} { tl := trace.Listener{} - ml := metrics.MakeListener() + ml := metrics.MakeListener(cfg.toMetricsConfig()) return wrapper.WrapHandlerWithListeners(handler, &tl, &ml) } @@ -45,3 +64,26 @@ func DistributionMetric(ctx context.Context, metric string, value float64, tags m.AddPoint(value) pr.AddMetric(&m) } + +func (cfg *Config) toMetricsConfig() metrics.Config { + + mc := metrics.Config{ + ShouldRetryOnFailure: false, + } + + if cfg != nil { + mc.BatchInterval = cfg.BatchInterval + mc.ShouldRetryOnFailure = cfg.ShouldRetryOnFailure + mc.APIKey = cfg.APIKey + mc.AppKey = cfg.AppKey + } + + if mc.APIKey == "" { + mc.APIKey = os.Getenv(DatadogAPIKeyEnvVar) + + } + if mc.AppKey == "" { + mc.AppKey = os.Getenv(DatadogAPIKeyEnvVar) + } + return mc +} diff --git a/internal/metrics/listener.go b/internal/metrics/listener.go index 3bb88a8f..ca627242 100644 --- a/internal/metrics/listener.go +++ b/internal/metrics/listener.go @@ -3,24 +3,38 @@ package metrics import ( "context" "encoding/json" + "time" ) type ( // Listener implements wrapper.HandlerListener, injecting metrics into the context Listener struct { apiClient *APIClient + config *Config + } + + // Config gives options for how the listener should work + Config struct { + APIKey string + AppKey string + ShouldRetryOnFailure bool + BatchInterval time.Duration } ) // MakeListener initializes a new metrics lambda listener -func MakeListener() Listener { - apiClient := MakeAPIClient(baseAPIURL, "", "") +func MakeListener(config Config) Listener { + apiClient := MakeAPIClient(baseAPIURL, config.APIKey, config.AppKey) + if config.BatchInterval <= 0 { + config.BatchInterval = defaultBatchInterval + } // Do this in the background, doesn't matter if it returns go apiClient.PrewarmConnection() return Listener{ apiClient, + &config, } } @@ -28,7 +42,7 @@ func MakeListener() Listener { func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) context.Context { ts := MakeTimeService() - pr := MakeProcessor(l.apiClient, ts, float64(defaultBatchInterval), false) + pr := MakeProcessor(l.apiClient, ts, float64(l.config.BatchInterval), l.config.ShouldRetryOnFailure) ctx = AddProcessor(ctx, pr) pr.StartProcessing() From 72082e040c7c4553794459d18813db7a5f31c07d Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Tue, 28 May 2019 16:02:34 -0400 Subject: [PATCH 13/26] Add documentation to ddlambda.go --- ddlambda.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ddlambda.go b/ddlambda.go index 376c8eda..7a58e0a6 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -14,10 +14,16 @@ import ( type ( // Config gives options for how ddlambda should behave Config struct { - APIKey string - AppKey string + // APIKey is your Datadog API key. This is used for sending metrics. + APIKey string + // AppKey is your Datadog App key. This is used for sending metrics. + AppKey string + // ShouldRetryOnFailure is used to turn on retry logic when sending metrics via the API. This can negatively effect the performance of your lambda, + // and should only be turned on if you can't afford to lose metrics data under poor network conditions. ShouldRetryOnFailure bool - BatchInterval time.Duration + // BatchInterval is the period of time which metrics are grouped together for processing to be sent to the API or written to logs. + // Any pending metrics are flushed at the end of the lambda. + BatchInterval time.Duration } ) From 1742f8140ae31bcdac7ac454a9a1506945eb39ea Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Tue, 28 May 2019 16:02:56 -0400 Subject: [PATCH 14/26] Remove useless assignment warning --- internal/trace/context.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/trace/context.go b/internal/trace/context.go index 513008bb..7671d89a 100644 --- a/internal/trace/context.go +++ b/internal/trace/context.go @@ -73,7 +73,7 @@ func GetTraceHeaders(ctx context.Context, useCurrentSegmentAsParent bool) map[st } func addTraceContextToXRay(ctx context.Context, traceContext map[string]string) error { - ctx, segment := xray.BeginSubsegment(ctx, xraySubsegmentName) + _, segment := xray.BeginSubsegment(ctx, xraySubsegmentName) err := segment.AddMetadataToNamespace(xraySubsegmentKey, xraySubsegmentNamespace, traceContext) if err != nil { return fmt.Errorf("couldn't save trace context to XRay: %v", err) From e9de789665d570ce6ebc0903e9d517f15b3abdf7 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Tue, 28 May 2019 16:03:16 -0400 Subject: [PATCH 15/26] Update README --- README.md | 121 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 118 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index ca860440..4520a11a 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,11 @@ Datadog's Lambda Go client library enables distributed tracing between serverful go get github.com/DataDog/dd-lambda-go ``` +The following Datadog environment variables should be defined via the AWS CLI or Serverless Framework: + +- DATADOG_API_KEY +- DATADOG_APP_KEY + ## Usage Datadog needs to be able to read headers from the incoming Lambda event. Wrap your Lambda handler function like so: @@ -22,7 +27,14 @@ import ( func main() { // Wrap your lambda handler like this - lambda.Start( ddlambda.WrapHandler(myHandler)) + lambda.Start( ddlambda.WrapHandler(myHandler, nil)) + /* OR with manual configuration options + lambda.Start(ddlambda.WrapHandler(myHandler, &ddlambda.Config{ + BatchInterval: time.Seconds * 15 + APIKey: "my-api-key", + APPKey: "my-app-key", + })) + */ } func myHandler(ctx context.Context, event MyEvent) (string, error) { @@ -30,7 +42,34 @@ func myHandler(ctx context.Context, event MyEvent) (string, error) { } ``` -Make sure any outbound requests have Datadog's tracing headers. + + +## Custom Metrics + +Custom metrics can be submitted using the `DistributionMetric` function. The metrics are submitted as [distribution metrics](https://docs.datadoghq.com/graphing/metrics/distributions/). + +```go +ddlambda.DistributionMetric( + ctx, // Use the context object, (or child), passed into your handler + "coffee_house.order_value", // Metric name + 12.45, // The value + "product:latte", "order:online" // Associated tags +) +``` + +### VPC + +If your Lambda function is associated with a VPC, you need to ensure it has access to the [public internet](https://aws.amazon.com/premiumsupport/knowledge-center/internet-access-lambda-function/). + +## Distributed Tracing + +[Distributed tracing](https://docs.datadoghq.com/tracing/guide/distributed_tracing/?tab=python) allows you to propagate a trace context from a service running on a host to a service running on AWS Lambda, and vice versa, so you can see performance end-to-end. Linking is implemented by injecting Datadog trace context into the HTTP request headers. + +Distributed tracing headers are language agnostic, e.g., a trace can be propagated between a Java service running on a host to a Lambda function written in Go. + +Because the trace context is propagated through HTTP request headers, the Lambda function needs to be triggered by AWS API Gateway or AWS Application Load Balancer. + +To enable this feature, make sure any outbound requests have Datadog's tracing headers. ```go req, err := http.NewRequest("GET", "http://api.youcompany.com/status") @@ -42,8 +81,84 @@ Make sure any outbound requests have Datadog's tracing headers. } ``` +## Sampling + +The traces for your Lambda function are converted by Datadog from AWS X-Ray traces. X-Ray needs to sample the traces that the Datadog tracing agent decides to sample, in order to collect as many complete traces as possible. You can create X-Ray sampling rules to ensure requests with header `x-datadog-sampling-priority:1` or `x-datadog-sampling-priority:2` via API Gateway always get sampled by X-Ray. + +These rules can be created using the following AWS CLI command. + +```bash +aws xray create-sampling-rule --cli-input-json file://datadog-sampling-priority-1.json +aws xray create-sampling-rule --cli-input-json file://datadog-sampling-priority-2.json +``` + +The file content for `datadog-sampling-priority-1.json`: + +```json +{ + "SamplingRule": { + "RuleName": "Datadog-Sampling-Priority-1", + "ResourceARN": "*", + "Priority": 9998, + "FixedRate": 1, + "ReservoirSize": 100, + "ServiceName": "*", + "ServiceType": "AWS::APIGateway::Stage", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "Attributes": { + "x-datadog-sampling-priority": "1" + } + } +} +``` + +The file content for `datadog-sampling-priority-2.json`: + +```json +{ + "SamplingRule": { + "RuleName": "Datadog-Sampling-Priority-2", + "ResourceARN": "*", + "Priority": 9999, + "FixedRate": 1, + "ReservoirSize": 100, + "ServiceName": "*", + "ServiceType": "AWS::APIGateway::Stage", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "Attributes": { + "x-datadog-sampling-priority": "2" + } + } +} +``` + + ## Non-proxy integration If your Lambda function is triggered by API Gateway via the non-proxy integration, then you have to set up a mapping template, which passes the Datadog trace context from the incoming HTTP request headers to the Lambda function via the event object. -If your Lambda function is deployed by the Serverless Framework, such a mapping template gets created by default. \ No newline at end of file +If your Lambda function is deployed by the Serverless Framework, such a mapping template gets created by default. + +## Opening Issues + +If you encounter a bug with this package, we want to hear about it. Before opening a new issue, search the existing issues to avoid duplicates. + +When opening an issue, include the Datadog Lambda Layer version, Python version, and stack trace if available. In addition, include the steps to reproduce when appropriate. + +You can also open an issue for a feature request. + +## Contributing + +If you find an issue with this package and have a fix, please feel free to open a pull request following the procedures. + +## License + +Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. + +This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2019 Datadog, Inc. \ No newline at end of file From ecf74bf82f0bff8fdbaefb60ad0683474560f859 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Tue, 28 May 2019 21:47:32 -0400 Subject: [PATCH 16/26] Fix default batch interval being zero --- README.md | 1 - internal/metrics/batcher.go | 2 +- internal/metrics/listener_test.go | 25 +++++++++++++++++++++++++ internal/metrics/processor.go | 3 ++- 4 files changed, 28 insertions(+), 3 deletions(-) create mode 100644 internal/metrics/listener_test.go diff --git a/README.md b/README.md index 4520a11a..6af0c866 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,6 @@ The file content for `datadog-sampling-priority-2.json`: } ``` - ## Non-proxy integration If your Lambda function is triggered by API Gateway via the non-proxy integration, then you have to set up a mapping template, which passes the Datadog trace context from the incoming HTTP request headers to the Lambda function via the event object. diff --git a/internal/metrics/batcher.go b/internal/metrics/batcher.go index 1939907c..7db6d551 100644 --- a/internal/metrics/batcher.go +++ b/internal/metrics/batcher.go @@ -45,7 +45,7 @@ func (b *Batcher) AddMetric(timestamp time.Time, metric Metric) { func (b *Batcher) ToAPIMetrics(timestamp time.Time) []APIMetric { ar := []APIMetric{} - interval := time.Duration(0) // TODO Get actual interval + interval := time.Duration(b.batchInterval) / time.Second for _, metric := range b.metrics { values := metric.ToAPIMetric(timestamp, interval) diff --git a/internal/metrics/listener_test.go b/internal/metrics/listener_test.go new file mode 100644 index 00000000..1ca924f0 --- /dev/null +++ b/internal/metrics/listener_test.go @@ -0,0 +1,25 @@ +package metrics + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHandlerAddsProcessorToContext(t *testing.T) { + listener := MakeListener(Config{}) + ctx := listener.HandlerStarted(context.Background(), json.RawMessage{}) + pr := GetProcessor(ctx) + assert.NotNil(t, pr) +} + +func TestHandlerFinishesProcessing(t *testing.T) { + listener := MakeListener(Config{}) + ctx := listener.HandlerStarted(context.Background(), json.RawMessage{}) + + pr := GetProcessor(ctx).(*processor) + listener.HandlerFinished(ctx) + assert.False(t, pr.isProcessing) +} diff --git a/internal/metrics/processor.go b/internal/metrics/processor.go index e2c81f30..adb392f3 100644 --- a/internal/metrics/processor.go +++ b/internal/metrics/processor.go @@ -68,11 +68,12 @@ func (p *processor) FinishProcessing() { // Closes the metrics channel, and waits for the last send to complete close(p.metricsChan) p.waitGroup.Wait() + p.isProcessing = false } func (p *processor) processMetrics() { - ticker := p.timeService.NewTicker(time.Duration(p.batchInterval) * time.Second) + ticker := p.timeService.NewTicker(time.Duration(p.batchInterval)) shouldExit := false for !shouldExit { From f76aac10a23fd8626ac380e6c28036d9e6bbbc2f Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Tue, 28 May 2019 21:58:51 -0400 Subject: [PATCH 17/26] Store batch interval as time.Duration value --- internal/metrics/batcher.go | 8 ++++---- internal/metrics/listener.go | 2 +- internal/metrics/processor.go | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/metrics/batcher.go b/internal/metrics/batcher.go index 7db6d551..a6748802 100644 --- a/internal/metrics/batcher.go +++ b/internal/metrics/batcher.go @@ -12,7 +12,7 @@ type ( // Batcher batches Batcher struct { metrics map[string]Metric - batchInterval float64 + batchInterval time.Duration } // BatchKey identifies a batch of metrics BatchKey struct { @@ -24,7 +24,7 @@ type ( ) // MakeBatcher creates a new batcher object -func MakeBatcher(batchInterval float64) *Batcher { +func MakeBatcher(batchInterval time.Duration) *Batcher { return &Batcher{ batchInterval: batchInterval, metrics: map[string]Metric{}, @@ -45,7 +45,7 @@ func (b *Batcher) AddMetric(timestamp time.Time, metric Metric) { func (b *Batcher) ToAPIMetrics(timestamp time.Time) []APIMetric { ar := []APIMetric{} - interval := time.Duration(b.batchInterval) / time.Second + interval := b.batchInterval / time.Second for _, metric := range b.metrics { values := metric.ToAPIMetric(timestamp, interval) @@ -57,7 +57,7 @@ func (b *Batcher) ToAPIMetrics(timestamp time.Time) []APIMetric { } func (b *Batcher) getInterval(timestamp time.Time) float64 { - return float64(timestamp.Unix()) - math.Mod(float64(timestamp.Unix()), b.batchInterval) + return float64(timestamp.Unix()) - math.Mod(float64(timestamp.Unix()), float64(b.batchInterval)) } func (b *Batcher) getStringKey(timestamp time.Time, bk BatchKey) string { diff --git a/internal/metrics/listener.go b/internal/metrics/listener.go index ca627242..7c73d81d 100644 --- a/internal/metrics/listener.go +++ b/internal/metrics/listener.go @@ -42,7 +42,7 @@ func MakeListener(config Config) Listener { func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) context.Context { ts := MakeTimeService() - pr := MakeProcessor(l.apiClient, ts, float64(l.config.BatchInterval), l.config.ShouldRetryOnFailure) + pr := MakeProcessor(l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure) ctx = AddProcessor(ctx, pr) pr.StartProcessing() diff --git a/internal/metrics/processor.go b/internal/metrics/processor.go index adb392f3..4df1bded 100644 --- a/internal/metrics/processor.go +++ b/internal/metrics/processor.go @@ -22,7 +22,7 @@ type ( metricsChan chan Metric timeService TimeService waitGroup sync.WaitGroup - batchInterval float64 + batchInterval time.Duration client Client batcher *Batcher shouldRetryOnFail bool @@ -31,7 +31,7 @@ type ( ) // MakeProcessor creates a new metrics context -func MakeProcessor(client Client, timeService TimeService, batchInterval float64, shouldRetryOnFail bool) Processor { +func MakeProcessor(client Client, timeService TimeService, batchInterval time.Duration, shouldRetryOnFail bool) Processor { batcher := MakeBatcher(batchInterval) return &processor{ @@ -73,7 +73,7 @@ func (p *processor) FinishProcessing() { func (p *processor) processMetrics() { - ticker := p.timeService.NewTicker(time.Duration(p.batchInterval)) + ticker := p.timeService.NewTicker(p.batchInterval) shouldExit := false for !shouldExit { From 4a3965bcecace91e69c5104c531602daf75eec04 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 29 May 2019 10:16:44 -0400 Subject: [PATCH 18/26] Add runtime version tag to all metrics --- ddlambda.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/ddlambda.go b/ddlambda.go index 7a58e0a6..a7f905d4 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -2,8 +2,10 @@ package ddlambda import ( "context" + "fmt" "net/http" "os" + "runtime" "time" "github.com/DataDog/dd-lambda-go/internal/metrics" @@ -62,6 +64,10 @@ func DistributionMetric(ctx context.Context, metric string, value float64, tags if pr == nil { return } + + // We add our own runtime tag to the metric for version tracking + tags = append(tags, getRuntimeTag()) + m := metrics.Distribution{ Name: metric, Tags: tags, @@ -93,3 +99,8 @@ func (cfg *Config) toMetricsConfig() metrics.Config { } return mc } + +func getRuntimeTag() string { + v := runtime.Version() + return fmt.Sprintf("dd_lambda_layer:datadog-%s", v) +} From 1045f0e3eadcfbd197a08d7069a4432aa3035651 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 29 May 2019 11:57:30 -0400 Subject: [PATCH 19/26] Improve documentation --- README.md | 8 +++----- ddlambda.go | 6 ++++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 6af0c866..47952d23 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ func main() { lambda.Start(ddlambda.WrapHandler(myHandler, &ddlambda.Config{ BatchInterval: time.Seconds * 15 APIKey: "my-api-key", - APPKey: "my-app-key", + AppKey: "my-app-key", })) */ } @@ -42,15 +42,13 @@ func myHandler(ctx context.Context, event MyEvent) (string, error) { } ``` - - ## Custom Metrics Custom metrics can be submitted using the `DistributionMetric` function. The metrics are submitted as [distribution metrics](https://docs.datadoghq.com/graphing/metrics/distributions/). ```go -ddlambda.DistributionMetric( - ctx, // Use the context object, (or child), passed into your handler +ddlambda.Distribution( + ctx, // Use the context object, (or child), that was passed into your handler "coffee_house.order_value", // Metric name 12.45, // The value "product:latte", "order:online" // Associated tags diff --git a/ddlambda.go b/ddlambda.go index a7f905d4..14926609 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -39,6 +39,8 @@ const ( // WrapHandler is used to instrument your lambda functions, reading in context from API Gateway. // It returns a modified handler that can be passed directly to the lambda.Start function. func WrapHandler(handler interface{}, cfg *Config) interface{} { + + // Set up state that is shared between handler invocations tl := trace.Listener{} ml := metrics.MakeListener(cfg.toMetricsConfig()) return wrapper.WrapHandlerWithListeners(handler, &tl, &ml) @@ -58,8 +60,8 @@ func AddTraceHeaders(ctx context.Context, req *http.Request) { } } -// DistributionMetric sends a distribution metric to DataDog -func DistributionMetric(ctx context.Context, metric string, value float64, tags ...string) { +// Distribution sends a distribution metric to DataDog +func Distribution(ctx context.Context, metric string, value float64, tags ...string) { pr := metrics.GetProcessor(ctx) if pr == nil { return From 3baaf4637ab4fa0a0abe24248252131dabecd231 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 29 May 2019 12:04:12 -0400 Subject: [PATCH 20/26] Improve comments --- internal/metrics/batcher.go | 2 +- internal/metrics/processor.go | 10 +++++----- internal/wrapper/wrap_handler.go | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/metrics/batcher.go b/internal/metrics/batcher.go index a6748802..b9212ba6 100644 --- a/internal/metrics/batcher.go +++ b/internal/metrics/batcher.go @@ -9,7 +9,7 @@ import ( ) type ( - // Batcher batches + // Batcher aggregates metrics with common properties,(metric name, tags, type etc) Batcher struct { metrics map[string]Metric batchInterval time.Duration diff --git a/internal/metrics/processor.go b/internal/metrics/processor.go index 4df1bded..f0ef67dd 100644 --- a/internal/metrics/processor.go +++ b/internal/metrics/processor.go @@ -8,7 +8,7 @@ import ( ) type ( - // Processor is used to batch metrics on a background thread, and send them on to a client. + // Processor is used to batch metrics on a background thread, and send them on to a client periodically. Processor interface { // AddMetric sends a metric to the agent AddMetric(metric Metric) @@ -77,21 +77,21 @@ func (p *processor) processMetrics() { shouldExit := false for !shouldExit { - shouldProcess := false + shouldSendBatch := false // Batches metrics until timeout is reached select { case m, ok := <-p.metricsChan: if !ok { // The channel has now been closed - shouldProcess = true + shouldSendBatch = true shouldExit = true } else { p.batcher.AddMetric(p.timeService.Now(), m) } case <-ticker.C: - shouldProcess = true + shouldSendBatch = true } - if shouldProcess { + if shouldSendBatch { if shouldExit && p.shouldRetryOnFail { // If we are shutting down, and we just failed to send our last batch, do a retry bo := backoff.WithMaxRetries(backoff.NewConstantBackOff(defaultRetryInterval), 2) diff --git a/internal/wrapper/wrap_handler.go b/internal/wrapper/wrap_handler.go index db543761..41949ce6 100644 --- a/internal/wrapper/wrap_handler.go +++ b/internal/wrapper/wrap_handler.go @@ -15,7 +15,7 @@ type ( } ) -// WrapHandlerWithListeners wraps a lambda handler to capture context and adds the DataDog tracing context. +// WrapHandlerWithListeners wraps a lambda handler, and calls listeners before and after every invocation. func WrapHandlerWithListeners(handler interface{}, listeners ...HandlerListener) interface{} { err := validateHandler(handler) @@ -24,6 +24,7 @@ func WrapHandlerWithListeners(handler interface{}, listeners ...HandlerListener) return handler } + // Return custom handler, to be called once per invocation return func(ctx context.Context, msg json.RawMessage) (interface{}, error) { for _, listener := range listeners { ctx = listener.HandlerStarted(ctx, msg) @@ -72,7 +73,6 @@ func validateHandler(handler interface{}) error { func callHandler(ctx context.Context, msg json.RawMessage, handler interface{}) (interface{}, error) { ev, err := unmarshalEventForHandler(msg, handler) if err != nil { - // TODO Log error here return nil, err } handlerType := reflect.TypeOf(handler) From e1a0aaf712da277e41052127e08b61608bdcab96 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 29 May 2019 13:57:19 -0400 Subject: [PATCH 21/26] Make processor automatically cancel using context object. --- README.md | 6 ++++-- ddlambda.go | 6 ++++++ internal/metrics/context_test.go | 2 +- internal/metrics/listener.go | 3 ++- internal/metrics/processor.go | 14 +++++++++++-- internal/metrics/processor_test.go | 32 +++++++++++++++++++++++++++--- internal/wrapper/wrap_handler.go | 6 ++++++ 7 files changed, 60 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 47952d23..1860e65f 100644 --- a/README.md +++ b/README.md @@ -44,11 +44,13 @@ func myHandler(ctx context.Context, event MyEvent) (string, error) { ## Custom Metrics -Custom metrics can be submitted using the `DistributionMetric` function. The metrics are submitted as [distribution metrics](https://docs.datadoghq.com/graphing/metrics/distributions/). +Custom metrics can be submitted using the `Distribution` function. The metrics are submitted as [distribution metrics](https://docs.datadoghq.com/graphing/metrics/distributions/). ```go + ddlambda.Distribution( - ctx, // Use the context object, (or child), that was passed into your handler + // Use the context object, (or a child context), that was passed into your handler function, or grab it globally using ddlambda.GetContext() + ctx, "coffee_house.order_value", // Metric name 12.45, // The value "product:latte", "order:online" // Associated tags diff --git a/ddlambda.go b/ddlambda.go index 14926609..57f32d65 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -60,6 +60,12 @@ func AddTraceHeaders(ctx context.Context, req *http.Request) { } } +// GetContext retrieves the last created lambda context. +// Only use this if you aren't manually passing context through your call hierarchy. +func GetContext() context.Context { + return wrapper.CurrentContext +} + // Distribution sends a distribution metric to DataDog func Distribution(ctx context.Context, metric string, value float64, tags ...string) { pr := metrics.GetProcessor(ctx) diff --git a/internal/metrics/context_test.go b/internal/metrics/context_test.go index 1822798c..597f2294 100644 --- a/internal/metrics/context_test.go +++ b/internal/metrics/context_test.go @@ -14,7 +14,7 @@ func TestGetProcessorEmptyContext(t *testing.T) { } func TestGetProcessorSuccess(t *testing.T) { - ctx := AddProcessor(context.Background(), MakeProcessor(nil, nil, 0, false)) + ctx := AddProcessor(context.Background(), MakeProcessor(context.Background(), nil, nil, 0, false)) result := GetProcessor(ctx) assert.NotNil(t, result) } diff --git a/internal/metrics/listener.go b/internal/metrics/listener.go index 7c73d81d..8e20f24d 100644 --- a/internal/metrics/listener.go +++ b/internal/metrics/listener.go @@ -42,9 +42,10 @@ func MakeListener(config Config) Listener { func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) context.Context { ts := MakeTimeService() - pr := MakeProcessor(l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure) + pr := MakeProcessor(ctx, l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure) ctx = AddProcessor(ctx, pr) + pr.StartProcessing() return ctx diff --git a/internal/metrics/processor.go b/internal/metrics/processor.go index f0ef67dd..9307a087 100644 --- a/internal/metrics/processor.go +++ b/internal/metrics/processor.go @@ -1,6 +1,7 @@ package metrics import ( + "context" "sync" "time" @@ -19,6 +20,7 @@ type ( } processor struct { + context context.Context metricsChan chan Metric timeService TimeService waitGroup sync.WaitGroup @@ -31,10 +33,11 @@ type ( ) // MakeProcessor creates a new metrics context -func MakeProcessor(client Client, timeService TimeService, batchInterval time.Duration, shouldRetryOnFail bool) Processor { +func MakeProcessor(ctx context.Context, client Client, timeService TimeService, batchInterval time.Duration, shouldRetryOnFail bool) Processor { batcher := MakeBatcher(batchInterval) return &processor{ + context: ctx, metricsChan: make(chan Metric, 2000), batchInterval: batchInterval, waitGroup: sync.WaitGroup{}, @@ -68,18 +71,23 @@ func (p *processor) FinishProcessing() { // Closes the metrics channel, and waits for the last send to complete close(p.metricsChan) p.waitGroup.Wait() - p.isProcessing = false } func (p *processor) processMetrics() { ticker := p.timeService.NewTicker(p.batchInterval) + doneChan := p.context.Done() + shouldExit := false for !shouldExit { shouldSendBatch := false // Batches metrics until timeout is reached select { + case <-doneChan: + // This process is being cancelled by the context, exit without flushing + shouldExit = true + close(p.metricsChan) case m, ok := <-p.metricsChan: if !ok { // The channel has now been closed @@ -89,6 +97,7 @@ func (p *processor) processMetrics() { p.batcher.AddMetric(p.timeService.Now(), m) } case <-ticker.C: + // We are ready to send a batch to our backend shouldSendBatch = true } if shouldSendBatch { @@ -102,6 +111,7 @@ func (p *processor) processMetrics() { } } ticker.Stop() + p.isProcessing = false p.waitGroup.Done() } diff --git a/internal/metrics/processor_test.go b/internal/metrics/processor_test.go index e31a8d2b..ac7d8a43 100644 --- a/internal/metrics/processor_test.go +++ b/internal/metrics/processor_test.go @@ -1,6 +1,7 @@ package metrics import ( + "context" "errors" "testing" "time" @@ -58,7 +59,7 @@ func TestProcessorBatches(t *testing.T) { mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") nowUnix := float64(mts.now.Unix()) - processor := MakeProcessor(&mc, &mts, 1000, false) + processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false) d1 := Distribution{ Name: "metric-1", @@ -104,7 +105,7 @@ func TestProcessorBatchesPerTick(t *testing.T) { secondTimeUnix := float64(secondTime.Unix()) mts.now = firstTime - processor := MakeProcessor(&mc, &mts, 1000, false) + processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false) d1 := Distribution{ Name: "metric-1", @@ -179,7 +180,7 @@ func TestProcessorPerformsRetry(t *testing.T) { mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") shouldRetry := true - processor := MakeProcessor(&mc, &mts, 1000, shouldRetry) + processor := MakeProcessor(context.Background(), &mc, &mts, 1000, shouldRetry) d1 := Distribution{ Name: "metric-1", @@ -195,3 +196,28 @@ func TestProcessorPerformsRetry(t *testing.T) { assert.Equal(t, 3, mc.sendMetricsCalledCount) } + +func TestProcessorCancelsWithContext(t *testing.T) { + mc := makeMockClient() + mts := makeMockTimeService() + + mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + + shouldRetry := true + ctx, cancelFunc := context.WithCancel(context.Background()) + processor := MakeProcessor(ctx, &mc, &mts, 1000, shouldRetry) + + d1 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{1, 2, 3}, + } + + processor.AddMetric(&d1) + + // After calling cancelFunc, no metrics should be processed/sent + cancelFunc() + processor.FinishProcessing() + + assert.Equal(t, 0, mc.sendMetricsCalledCount) +} diff --git a/internal/wrapper/wrap_handler.go b/internal/wrapper/wrap_handler.go index 41949ce6..f24a28df 100644 --- a/internal/wrapper/wrap_handler.go +++ b/internal/wrapper/wrap_handler.go @@ -7,6 +7,11 @@ import ( "reflect" ) +var ( + // CurrentContext is the last create lambda context object. + CurrentContext context.Context +) + type ( // HandlerListener is a point where listener logic can be injected into a handler HandlerListener interface { @@ -29,6 +34,7 @@ func WrapHandlerWithListeners(handler interface{}, listeners ...HandlerListener) for _, listener := range listeners { ctx = listener.HandlerStarted(ctx, msg) } + CurrentContext = ctx result, err := callHandler(ctx, msg, handler) for _, listener := range listeners { listener.HandlerFinished(ctx) From 5d8e6743a44c0c9c1d259aa55e09fc3811501c65 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 29 May 2019 14:30:21 -0400 Subject: [PATCH 22/26] Make http requests respect context cancellation --- README.md | 5 ++++- internal/metrics/api.go | 17 ++++++++++++----- internal/metrics/listener.go | 5 ++++- internal/metrics/processor.go | 3 +-- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 1860e65f..67c30903 100644 --- a/README.md +++ b/README.md @@ -48,8 +48,10 @@ Custom metrics can be submitted using the `Distribution` function. The metrics a ```go + ddlambda.Distribution( - // Use the context object, (or a child context), that was passed into your handler function, or grab it globally using ddlambda.GetContext() + // Context, (ctx), should be the same object passed into your lambda handler function, (or a child). + // If you don't want to pass the context through your call hierarchy, you can use ddlambda.GetContext() ctx, "coffee_house.order_value", // Metric name 12.45, // The value @@ -74,6 +76,7 @@ To enable this feature, make sure any outbound requests have Datadog's tracing h ```go req, err := http.NewRequest("GET", "http://api.youcompany.com/status") // Use the same Context object given to your lambda handler. + // If you don't want to pass the context through your call hierarchy, you can use ddlambda.GetContext() ddlambda.AddTraceHeaders(ctx, req) client := http.Client{} diff --git a/internal/metrics/api.go b/internal/metrics/api.go index 4e22fe88..7f55e73f 100644 --- a/internal/metrics/api.go +++ b/internal/metrics/api.go @@ -2,6 +2,7 @@ package metrics import ( "bytes" + "context" "encoding/json" "fmt" "net/http" @@ -19,6 +20,7 @@ type ( appKey string baseAPIURL string httpClient *http.Client + context context.Context } postMetricsModel struct { @@ -27,13 +29,14 @@ type ( ) // MakeAPIClient creates a new API client with the given api and app keys -func MakeAPIClient(baseAPIURL, apiKey, appKey string) *APIClient { +func MakeAPIClient(ctx context.Context, baseAPIURL, apiKey, appKey string) *APIClient { httpClient := &http.Client{} return &APIClient{ - apiKey, - appKey, - baseAPIURL, - httpClient, + apiKey: apiKey, + appKey: appKey, + baseAPIURL: baseAPIURL, + httpClient: httpClient, + context: ctx, } } @@ -43,6 +46,8 @@ func (cl *APIClient) PrewarmConnection() error { if err != nil { return fmt.Errorf("Couldn't create prewarming request: %v", err) } + req = req.WithContext(cl.context) + cl.addAPICredentials(req) _, err = cl.httpClient.Do(req) if err != nil { @@ -63,6 +68,8 @@ func (cl *APIClient) SendMetrics(metrics []APIMetric) error { if err != nil { return fmt.Errorf("Couldn't create send metrics request:%v", err) } + req = req.WithContext(cl.context) + defer req.Body.Close() cl.addAPICredentials(req) diff --git a/internal/metrics/listener.go b/internal/metrics/listener.go index 8e20f24d..96958de8 100644 --- a/internal/metrics/listener.go +++ b/internal/metrics/listener.go @@ -24,7 +24,7 @@ type ( // MakeListener initializes a new metrics lambda listener func MakeListener(config Config) Listener { - apiClient := MakeAPIClient(baseAPIURL, config.APIKey, config.AppKey) + apiClient := MakeAPIClient(context.Background(), baseAPIURL, config.APIKey, config.AppKey) if config.BatchInterval <= 0 { config.BatchInterval = defaultBatchInterval } @@ -45,6 +45,9 @@ func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) cont pr := MakeProcessor(ctx, l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure) ctx = AddProcessor(ctx, pr) + // Setting the context on the client will mean that future requests will be cancelled correctly + // if the lambda times out. + l.apiClient.context = ctx pr.StartProcessing() diff --git a/internal/metrics/processor.go b/internal/metrics/processor.go index 9307a087..7c110696 100644 --- a/internal/metrics/processor.go +++ b/internal/metrics/processor.go @@ -78,14 +78,13 @@ func (p *processor) processMetrics() { ticker := p.timeService.NewTicker(p.batchInterval) doneChan := p.context.Done() - shouldExit := false for !shouldExit { shouldSendBatch := false // Batches metrics until timeout is reached select { case <-doneChan: - // This process is being cancelled by the context, exit without flushing + // This process is being cancelled by the context,(probably due to a lambda deadline), exit without flushing. shouldExit = true close(p.metricsChan) case m, ok := <-p.metricsChan: From 52e376c44a9880caaf467d48300753981b0db7c2 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 29 May 2019 14:57:17 -0400 Subject: [PATCH 23/26] Add option to call Distribution method without context --- ddlambda.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ddlambda.go b/ddlambda.go index 57f32d65..1985158f 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -66,9 +66,9 @@ func GetContext() context.Context { return wrapper.CurrentContext } -// Distribution sends a distribution metric to DataDog -func Distribution(ctx context.Context, metric string, value float64, tags ...string) { - pr := metrics.GetProcessor(ctx) +// DistributionWithContext sends a distribution metric to DataDog +func DistributionWithContext(ctx context.Context, metric string, value float64, tags ...string) { + pr := metrics.GetProcessor(GetContext()) if pr == nil { return } @@ -85,6 +85,11 @@ func Distribution(ctx context.Context, metric string, value float64, tags ...str pr.AddMetric(&m) } +// Distribution sends a distribution metric to DataDog +func Distribution(metric string, value float64, tags ...string) { + DistributionWithContext(GetContext(), metric, value, tags...) +} + func (cfg *Config) toMetricsConfig() metrics.Config { mc := metrics.Config{ From 23a8e06debb8f4343324cbfdd1afb2565fb80034 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 29 May 2019 16:42:31 -0400 Subject: [PATCH 24/26] Add go mod support --- go.mod | 11 +++++++++++ go.sum | 6 ++++++ 2 files changed, 17 insertions(+) create mode 100644 go.mod create mode 100644 go.sum diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..f11c2895 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/DataDog/dd-lambda-go + +go 1.12 + +require ( + github.com/aws/aws-sdk-go v1.19.40 // indirect + github.com/aws/aws-xray-sdk-go v1.0.0-rc.9 + github.com/cenkalti/backoff v2.1.1+incompatible + github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect + github.com/pkg/errors v0.8.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..13c1f3e1 --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/aws/aws-sdk-go v1.19.40/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-xray-sdk-go v1.0.0-rc.9/go.mod h1:XtMKdBQfpVut+tJEwI7+dJFRxxRdxHDyVNp2tHXRq04= +github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= From 0a90e990f26bd6d4d11cda14f4201f882e2cc68f Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 29 May 2019 17:53:27 -0400 Subject: [PATCH 25/26] Add backoff to vendor folder --- vendor/github.com/cenkalti/backoff/.gitignore | 22 +++ .../github.com/cenkalti/backoff/.travis.yml | 10 ++ vendor/github.com/cenkalti/backoff/LICENSE | 20 +++ vendor/github.com/cenkalti/backoff/README.md | 30 ++++ vendor/github.com/cenkalti/backoff/backoff.go | 66 ++++++++ vendor/github.com/cenkalti/backoff/context.go | 63 ++++++++ .../cenkalti/backoff/exponential.go | 153 ++++++++++++++++++ vendor/github.com/cenkalti/backoff/go.mod | 3 + vendor/github.com/cenkalti/backoff/retry.go | 82 ++++++++++ vendor/github.com/cenkalti/backoff/ticker.go | 82 ++++++++++ vendor/github.com/cenkalti/backoff/tries.go | 35 ++++ 11 files changed, 566 insertions(+) create mode 100644 vendor/github.com/cenkalti/backoff/.gitignore create mode 100644 vendor/github.com/cenkalti/backoff/.travis.yml create mode 100644 vendor/github.com/cenkalti/backoff/LICENSE create mode 100644 vendor/github.com/cenkalti/backoff/README.md create mode 100644 vendor/github.com/cenkalti/backoff/backoff.go create mode 100644 vendor/github.com/cenkalti/backoff/context.go create mode 100644 vendor/github.com/cenkalti/backoff/exponential.go create mode 100644 vendor/github.com/cenkalti/backoff/go.mod create mode 100644 vendor/github.com/cenkalti/backoff/retry.go create mode 100644 vendor/github.com/cenkalti/backoff/ticker.go create mode 100644 vendor/github.com/cenkalti/backoff/tries.go diff --git a/vendor/github.com/cenkalti/backoff/.gitignore b/vendor/github.com/cenkalti/backoff/.gitignore new file mode 100644 index 00000000..00268614 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/.gitignore @@ -0,0 +1,22 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe diff --git a/vendor/github.com/cenkalti/backoff/.travis.yml b/vendor/github.com/cenkalti/backoff/.travis.yml new file mode 100644 index 00000000..47a6a46e --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/.travis.yml @@ -0,0 +1,10 @@ +language: go +go: + - 1.7 + - 1.x + - tip +before_install: + - go get github.com/mattn/goveralls + - go get golang.org/x/tools/cmd/cover +script: + - $HOME/gopath/bin/goveralls -service=travis-ci diff --git a/vendor/github.com/cenkalti/backoff/LICENSE b/vendor/github.com/cenkalti/backoff/LICENSE new file mode 100644 index 00000000..89b81799 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2014 Cenk Altı + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/cenkalti/backoff/README.md b/vendor/github.com/cenkalti/backoff/README.md new file mode 100644 index 00000000..55ebc98f --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/README.md @@ -0,0 +1,30 @@ +# Exponential Backoff [![GoDoc][godoc image]][godoc] [![Build Status][travis image]][travis] [![Coverage Status][coveralls image]][coveralls] + +This is a Go port of the exponential backoff algorithm from [Google's HTTP Client Library for Java][google-http-java-client]. + +[Exponential backoff][exponential backoff wiki] +is an algorithm that uses feedback to multiplicatively decrease the rate of some process, +in order to gradually find an acceptable rate. +The retries exponentially increase and stop increasing when a certain threshold is met. + +## Usage + +See https://godoc.org/github.com/cenkalti/backoff#pkg-examples + +## Contributing + +* I would like to keep this library as small as possible. +* Please don't send a PR without opening an issue and discussing it first. +* If proposed change is not a common use case, I will probably not accept it. + +[godoc]: https://godoc.org/github.com/cenkalti/backoff +[godoc image]: https://godoc.org/github.com/cenkalti/backoff?status.png +[travis]: https://travis-ci.org/cenkalti/backoff +[travis image]: https://travis-ci.org/cenkalti/backoff.png?branch=master +[coveralls]: https://coveralls.io/github/cenkalti/backoff?branch=master +[coveralls image]: https://coveralls.io/repos/github/cenkalti/backoff/badge.svg?branch=master + +[google-http-java-client]: https://github.com/google/google-http-java-client/blob/da1aa993e90285ec18579f1553339b00e19b3ab5/google-http-client/src/main/java/com/google/api/client/util/ExponentialBackOff.java +[exponential backoff wiki]: http://en.wikipedia.org/wiki/Exponential_backoff + +[advanced example]: https://godoc.org/github.com/cenkalti/backoff#example_ diff --git a/vendor/github.com/cenkalti/backoff/backoff.go b/vendor/github.com/cenkalti/backoff/backoff.go new file mode 100644 index 00000000..3676ee40 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/backoff.go @@ -0,0 +1,66 @@ +// Package backoff implements backoff algorithms for retrying operations. +// +// Use Retry function for retrying operations that may fail. +// If Retry does not meet your needs, +// copy/paste the function into your project and modify as you wish. +// +// There is also Ticker type similar to time.Ticker. +// You can use it if you need to work with channels. +// +// See Examples section below for usage examples. +package backoff + +import "time" + +// BackOff is a backoff policy for retrying an operation. +type BackOff interface { + // NextBackOff returns the duration to wait before retrying the operation, + // or backoff. Stop to indicate that no more retries should be made. + // + // Example usage: + // + // duration := backoff.NextBackOff(); + // if (duration == backoff.Stop) { + // // Do not retry operation. + // } else { + // // Sleep for duration and retry operation. + // } + // + NextBackOff() time.Duration + + // Reset to initial state. + Reset() +} + +// Stop indicates that no more retries should be made for use in NextBackOff(). +const Stop time.Duration = -1 + +// ZeroBackOff is a fixed backoff policy whose backoff time is always zero, +// meaning that the operation is retried immediately without waiting, indefinitely. +type ZeroBackOff struct{} + +func (b *ZeroBackOff) Reset() {} + +func (b *ZeroBackOff) NextBackOff() time.Duration { return 0 } + +// StopBackOff is a fixed backoff policy that always returns backoff.Stop for +// NextBackOff(), meaning that the operation should never be retried. +type StopBackOff struct{} + +func (b *StopBackOff) Reset() {} + +func (b *StopBackOff) NextBackOff() time.Duration { return Stop } + +// ConstantBackOff is a backoff policy that always returns the same backoff delay. +// This is in contrast to an exponential backoff policy, +// which returns a delay that grows longer as you call NextBackOff() over and over again. +type ConstantBackOff struct { + Interval time.Duration +} + +func (b *ConstantBackOff) Reset() {} +func (b *ConstantBackOff) NextBackOff() time.Duration { return b.Interval } + +func NewConstantBackOff(d time.Duration) *ConstantBackOff { + return &ConstantBackOff{Interval: d} +} diff --git a/vendor/github.com/cenkalti/backoff/context.go b/vendor/github.com/cenkalti/backoff/context.go new file mode 100644 index 00000000..7706faa2 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/context.go @@ -0,0 +1,63 @@ +package backoff + +import ( + "context" + "time" +) + +// BackOffContext is a backoff policy that stops retrying after the context +// is canceled. +type BackOffContext interface { + BackOff + Context() context.Context +} + +type backOffContext struct { + BackOff + ctx context.Context +} + +// WithContext returns a BackOffContext with context ctx +// +// ctx must not be nil +func WithContext(b BackOff, ctx context.Context) BackOffContext { + if ctx == nil { + panic("nil context") + } + + if b, ok := b.(*backOffContext); ok { + return &backOffContext{ + BackOff: b.BackOff, + ctx: ctx, + } + } + + return &backOffContext{ + BackOff: b, + ctx: ctx, + } +} + +func ensureContext(b BackOff) BackOffContext { + if cb, ok := b.(BackOffContext); ok { + return cb + } + return WithContext(b, context.Background()) +} + +func (b *backOffContext) Context() context.Context { + return b.ctx +} + +func (b *backOffContext) NextBackOff() time.Duration { + select { + case <-b.ctx.Done(): + return Stop + default: + } + next := b.BackOff.NextBackOff() + if deadline, ok := b.ctx.Deadline(); ok && deadline.Sub(time.Now()) < next { + return Stop + } + return next +} diff --git a/vendor/github.com/cenkalti/backoff/exponential.go b/vendor/github.com/cenkalti/backoff/exponential.go new file mode 100644 index 00000000..a031a659 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/exponential.go @@ -0,0 +1,153 @@ +package backoff + +import ( + "math/rand" + "time" +) + +/* +ExponentialBackOff is a backoff implementation that increases the backoff +period for each retry attempt using a randomization function that grows exponentially. + +NextBackOff() is calculated using the following formula: + + randomized interval = + RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor]) + +In other words NextBackOff() will range between the randomization factor +percentage below and above the retry interval. + +For example, given the following parameters: + + RetryInterval = 2 + RandomizationFactor = 0.5 + Multiplier = 2 + +the actual backoff period used in the next retry attempt will range between 1 and 3 seconds, +multiplied by the exponential, that is, between 2 and 6 seconds. + +Note: MaxInterval caps the RetryInterval and not the randomized interval. + +If the time elapsed since an ExponentialBackOff instance is created goes past the +MaxElapsedTime, then the method NextBackOff() starts returning backoff.Stop. + +The elapsed time can be reset by calling Reset(). + +Example: Given the following default arguments, for 10 tries the sequence will be, +and assuming we go over the MaxElapsedTime on the 10th try: + + Request # RetryInterval (seconds) Randomized Interval (seconds) + + 1 0.5 [0.25, 0.75] + 2 0.75 [0.375, 1.125] + 3 1.125 [0.562, 1.687] + 4 1.687 [0.8435, 2.53] + 5 2.53 [1.265, 3.795] + 6 3.795 [1.897, 5.692] + 7 5.692 [2.846, 8.538] + 8 8.538 [4.269, 12.807] + 9 12.807 [6.403, 19.210] + 10 19.210 backoff.Stop + +Note: Implementation is not thread-safe. +*/ +type ExponentialBackOff struct { + InitialInterval time.Duration + RandomizationFactor float64 + Multiplier float64 + MaxInterval time.Duration + // After MaxElapsedTime the ExponentialBackOff stops. + // It never stops if MaxElapsedTime == 0. + MaxElapsedTime time.Duration + Clock Clock + + currentInterval time.Duration + startTime time.Time +} + +// Clock is an interface that returns current time for BackOff. +type Clock interface { + Now() time.Time +} + +// Default values for ExponentialBackOff. +const ( + DefaultInitialInterval = 500 * time.Millisecond + DefaultRandomizationFactor = 0.5 + DefaultMultiplier = 1.5 + DefaultMaxInterval = 60 * time.Second + DefaultMaxElapsedTime = 15 * time.Minute +) + +// NewExponentialBackOff creates an instance of ExponentialBackOff using default values. +func NewExponentialBackOff() *ExponentialBackOff { + b := &ExponentialBackOff{ + InitialInterval: DefaultInitialInterval, + RandomizationFactor: DefaultRandomizationFactor, + Multiplier: DefaultMultiplier, + MaxInterval: DefaultMaxInterval, + MaxElapsedTime: DefaultMaxElapsedTime, + Clock: SystemClock, + } + b.Reset() + return b +} + +type systemClock struct{} + +func (t systemClock) Now() time.Time { + return time.Now() +} + +// SystemClock implements Clock interface that uses time.Now(). +var SystemClock = systemClock{} + +// Reset the interval back to the initial retry interval and restarts the timer. +func (b *ExponentialBackOff) Reset() { + b.currentInterval = b.InitialInterval + b.startTime = b.Clock.Now() +} + +// NextBackOff calculates the next backoff interval using the formula: +// Randomized interval = RetryInterval +/- (RandomizationFactor * RetryInterval) +func (b *ExponentialBackOff) NextBackOff() time.Duration { + // Make sure we have not gone over the maximum elapsed time. + if b.MaxElapsedTime != 0 && b.GetElapsedTime() > b.MaxElapsedTime { + return Stop + } + defer b.incrementCurrentInterval() + return getRandomValueFromInterval(b.RandomizationFactor, rand.Float64(), b.currentInterval) +} + +// GetElapsedTime returns the elapsed time since an ExponentialBackOff instance +// is created and is reset when Reset() is called. +// +// The elapsed time is computed using time.Now().UnixNano(). It is +// safe to call even while the backoff policy is used by a running +// ticker. +func (b *ExponentialBackOff) GetElapsedTime() time.Duration { + return b.Clock.Now().Sub(b.startTime) +} + +// Increments the current interval by multiplying it with the multiplier. +func (b *ExponentialBackOff) incrementCurrentInterval() { + // Check for overflow, if overflow is detected set the current interval to the max interval. + if float64(b.currentInterval) >= float64(b.MaxInterval)/b.Multiplier { + b.currentInterval = b.MaxInterval + } else { + b.currentInterval = time.Duration(float64(b.currentInterval) * b.Multiplier) + } +} + +// Returns a random value from the following interval: +// [randomizationFactor * currentInterval, randomizationFactor * currentInterval]. +func getRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration { + var delta = randomizationFactor * float64(currentInterval) + var minInterval = float64(currentInterval) - delta + var maxInterval = float64(currentInterval) + delta + + // Get a random value from the range [minInterval, maxInterval]. + // The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then + // we want a 33% chance for selecting either 1, 2 or 3. + return time.Duration(minInterval + (random * (maxInterval - minInterval + 1))) +} diff --git a/vendor/github.com/cenkalti/backoff/go.mod b/vendor/github.com/cenkalti/backoff/go.mod new file mode 100644 index 00000000..479e62ad --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/go.mod @@ -0,0 +1,3 @@ +module github.com/cenkalti/backoff/v3 + +go 1.12 diff --git a/vendor/github.com/cenkalti/backoff/retry.go b/vendor/github.com/cenkalti/backoff/retry.go new file mode 100644 index 00000000..e936a506 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/retry.go @@ -0,0 +1,82 @@ +package backoff + +import "time" + +// An Operation is executing by Retry() or RetryNotify(). +// The operation will be retried using a backoff policy if it returns an error. +type Operation func() error + +// Notify is a notify-on-error function. It receives an operation error and +// backoff delay if the operation failed (with an error). +// +// NOTE that if the backoff policy stated to stop retrying, +// the notify function isn't called. +type Notify func(error, time.Duration) + +// Retry the operation o until it does not return error or BackOff stops. +// o is guaranteed to be run at least once. +// +// If o returns a *PermanentError, the operation is not retried, and the +// wrapped error is returned. +// +// Retry sleeps the goroutine for the duration returned by BackOff after a +// failed operation returns. +func Retry(o Operation, b BackOff) error { return RetryNotify(o, b, nil) } + +// RetryNotify calls notify function with the error and wait duration +// for each failed attempt before sleep. +func RetryNotify(operation Operation, b BackOff, notify Notify) error { + var err error + var next time.Duration + var t *time.Timer + + cb := ensureContext(b) + + b.Reset() + for { + if err = operation(); err == nil { + return nil + } + + if permanent, ok := err.(*PermanentError); ok { + return permanent.Err + } + + if next = cb.NextBackOff(); next == Stop { + return err + } + + if notify != nil { + notify(err, next) + } + + if t == nil { + t = time.NewTimer(next) + defer t.Stop() + } else { + t.Reset(next) + } + + select { + case <-cb.Context().Done(): + return err + case <-t.C: + } + } +} + +// PermanentError signals that the operation should not be retried. +type PermanentError struct { + Err error +} + +func (e *PermanentError) Error() string { + return e.Err.Error() +} + +// Permanent wraps the given err in a *PermanentError. +func Permanent(err error) *PermanentError { + return &PermanentError{ + Err: err, + } +} diff --git a/vendor/github.com/cenkalti/backoff/ticker.go b/vendor/github.com/cenkalti/backoff/ticker.go new file mode 100644 index 00000000..e41084b0 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/ticker.go @@ -0,0 +1,82 @@ +package backoff + +import ( + "sync" + "time" +) + +// Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff. +// +// Ticks will continue to arrive when the previous operation is still running, +// so operations that take a while to fail could run in quick succession. +type Ticker struct { + C <-chan time.Time + c chan time.Time + b BackOffContext + stop chan struct{} + stopOnce sync.Once +} + +// NewTicker returns a new Ticker containing a channel that will send +// the time at times specified by the BackOff argument. Ticker is +// guaranteed to tick at least once. The channel is closed when Stop +// method is called or BackOff stops. It is not safe to manipulate the +// provided backoff policy (notably calling NextBackOff or Reset) +// while the ticker is running. +func NewTicker(b BackOff) *Ticker { + c := make(chan time.Time) + t := &Ticker{ + C: c, + c: c, + b: ensureContext(b), + stop: make(chan struct{}), + } + t.b.Reset() + go t.run() + return t +} + +// Stop turns off a ticker. After Stop, no more ticks will be sent. +func (t *Ticker) Stop() { + t.stopOnce.Do(func() { close(t.stop) }) +} + +func (t *Ticker) run() { + c := t.c + defer close(c) + + // Ticker is guaranteed to tick at least once. + afterC := t.send(time.Now()) + + for { + if afterC == nil { + return + } + + select { + case tick := <-afterC: + afterC = t.send(tick) + case <-t.stop: + t.c = nil // Prevent future ticks from being sent to the channel. + return + case <-t.b.Context().Done(): + return + } + } +} + +func (t *Ticker) send(tick time.Time) <-chan time.Time { + select { + case t.c <- tick: + case <-t.stop: + return nil + } + + next := t.b.NextBackOff() + if next == Stop { + t.Stop() + return nil + } + + return time.After(next) +} diff --git a/vendor/github.com/cenkalti/backoff/tries.go b/vendor/github.com/cenkalti/backoff/tries.go new file mode 100644 index 00000000..cfeefd9b --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/tries.go @@ -0,0 +1,35 @@ +package backoff + +import "time" + +/* +WithMaxRetries creates a wrapper around another BackOff, which will +return Stop if NextBackOff() has been called too many times since +the last time Reset() was called + +Note: Implementation is not thread-safe. +*/ +func WithMaxRetries(b BackOff, max uint64) BackOff { + return &backOffTries{delegate: b, maxTries: max} +} + +type backOffTries struct { + delegate BackOff + maxTries uint64 + numTries uint64 +} + +func (b *backOffTries) NextBackOff() time.Duration { + if b.maxTries > 0 { + if b.maxTries <= b.numTries { + return Stop + } + b.numTries++ + } + return b.delegate.NextBackOff() +} + +func (b *backOffTries) Reset() { + b.numTries = 0 + b.delegate.Reset() +} From 1b2697512b15708f7c475f51d8a8f00e3f3db234 Mon Sep 17 00:00:00 2001 From: "darcy.rayner" Date: Wed, 29 May 2019 17:53:42 -0400 Subject: [PATCH 26/26] Remove whitespace from readme --- README.md | 64 +++++++++++++++++++++++++++---------------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 67c30903..176c4c39 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ import ( func main() { // Wrap your lambda handler like this - lambda.Start( ddlambda.WrapHandler(myHandler, nil)) + lambda.Start(ddlambda.WrapHandler(myHandler, nil)) /* OR with manual configuration options lambda.Start(ddlambda.WrapHandler(myHandler, &ddlambda.Config{ BatchInterval: time.Seconds * 15 @@ -99,22 +99,22 @@ The file content for `datadog-sampling-priority-1.json`: ```json { - "SamplingRule": { - "RuleName": "Datadog-Sampling-Priority-1", - "ResourceARN": "*", - "Priority": 9998, - "FixedRate": 1, - "ReservoirSize": 100, - "ServiceName": "*", - "ServiceType": "AWS::APIGateway::Stage", - "Host": "*", - "HTTPMethod": "*", - "URLPath": "*", - "Version": 1, - "Attributes": { - "x-datadog-sampling-priority": "1" - } + "SamplingRule": { + "RuleName": "Datadog-Sampling-Priority-1", + "ResourceARN": "*", + "Priority": 9998, + "FixedRate": 1, + "ReservoirSize": 100, + "ServiceName": "*", + "ServiceType": "AWS::APIGateway::Stage", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "Attributes": { + "x-datadog-sampling-priority": "1" } + } } ``` @@ -122,22 +122,22 @@ The file content for `datadog-sampling-priority-2.json`: ```json { - "SamplingRule": { - "RuleName": "Datadog-Sampling-Priority-2", - "ResourceARN": "*", - "Priority": 9999, - "FixedRate": 1, - "ReservoirSize": 100, - "ServiceName": "*", - "ServiceType": "AWS::APIGateway::Stage", - "Host": "*", - "HTTPMethod": "*", - "URLPath": "*", - "Version": 1, - "Attributes": { - "x-datadog-sampling-priority": "2" - } + "SamplingRule": { + "RuleName": "Datadog-Sampling-Priority-2", + "ResourceARN": "*", + "Priority": 9999, + "FixedRate": 1, + "ReservoirSize": 100, + "ServiceName": "*", + "ServiceType": "AWS::APIGateway::Stage", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "Attributes": { + "x-datadog-sampling-priority": "2" } + } } ``` @@ -163,4 +163,4 @@ If you find an issue with this package and have a fix, please feel free to open Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. -This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2019 Datadog, Inc. \ No newline at end of file +This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2019 Datadog, Inc.