diff --git a/README.md b/README.md index ca860440..176c4c39 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,11 @@ Datadog's Lambda Go client library enables distributed tracing between serverful go get github.com/DataDog/dd-lambda-go ``` +The following Datadog environment variables should be defined via the AWS CLI or Serverless Framework: + +- DATADOG_API_KEY +- DATADOG_APP_KEY + ## Usage Datadog needs to be able to read headers from the incoming Lambda event. Wrap your Lambda handler function like so: @@ -22,7 +27,14 @@ import ( func main() { // Wrap your lambda handler like this - lambda.Start( ddlambda.WrapHandler(myHandler)) + lambda.Start(ddlambda.WrapHandler(myHandler, nil)) + /* OR with manual configuration options + lambda.Start(ddlambda.WrapHandler(myHandler, &ddlambda.Config{ + BatchInterval: time.Seconds * 15 + APIKey: "my-api-key", + AppKey: "my-app-key", + })) + */ } func myHandler(ctx context.Context, event MyEvent) (string, error) { @@ -30,11 +42,41 @@ func myHandler(ctx context.Context, event MyEvent) (string, error) { } ``` -Make sure any outbound requests have Datadog's tracing headers. +## Custom Metrics + +Custom metrics can be submitted using the `Distribution` function. The metrics are submitted as [distribution metrics](https://docs.datadoghq.com/graphing/metrics/distributions/). + +```go + + +ddlambda.Distribution( + // Context, (ctx), should be the same object passed into your lambda handler function, (or a child). + // If you don't want to pass the context through your call hierarchy, you can use ddlambda.GetContext() + ctx, + "coffee_house.order_value", // Metric name + 12.45, // The value + "product:latte", "order:online" // Associated tags +) +``` + +### VPC + +If your Lambda function is associated with a VPC, you need to ensure it has access to the [public internet](https://aws.amazon.com/premiumsupport/knowledge-center/internet-access-lambda-function/). + +## Distributed Tracing + +[Distributed tracing](https://docs.datadoghq.com/tracing/guide/distributed_tracing/?tab=python) allows you to propagate a trace context from a service running on a host to a service running on AWS Lambda, and vice versa, so you can see performance end-to-end. Linking is implemented by injecting Datadog trace context into the HTTP request headers. + +Distributed tracing headers are language agnostic, e.g., a trace can be propagated between a Java service running on a host to a Lambda function written in Go. + +Because the trace context is propagated through HTTP request headers, the Lambda function needs to be triggered by AWS API Gateway or AWS Application Load Balancer. + +To enable this feature, make sure any outbound requests have Datadog's tracing headers. ```go req, err := http.NewRequest("GET", "http://api.youcompany.com/status") // Use the same Context object given to your lambda handler. + // If you don't want to pass the context through your call hierarchy, you can use ddlambda.GetContext() ddlambda.AddTraceHeaders(ctx, req) client := http.Client{} @@ -42,8 +84,83 @@ Make sure any outbound requests have Datadog's tracing headers. } ``` +## Sampling + +The traces for your Lambda function are converted by Datadog from AWS X-Ray traces. X-Ray needs to sample the traces that the Datadog tracing agent decides to sample, in order to collect as many complete traces as possible. You can create X-Ray sampling rules to ensure requests with header `x-datadog-sampling-priority:1` or `x-datadog-sampling-priority:2` via API Gateway always get sampled by X-Ray. + +These rules can be created using the following AWS CLI command. + +```bash +aws xray create-sampling-rule --cli-input-json file://datadog-sampling-priority-1.json +aws xray create-sampling-rule --cli-input-json file://datadog-sampling-priority-2.json +``` + +The file content for `datadog-sampling-priority-1.json`: + +```json +{ + "SamplingRule": { + "RuleName": "Datadog-Sampling-Priority-1", + "ResourceARN": "*", + "Priority": 9998, + "FixedRate": 1, + "ReservoirSize": 100, + "ServiceName": "*", + "ServiceType": "AWS::APIGateway::Stage", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "Attributes": { + "x-datadog-sampling-priority": "1" + } + } +} +``` + +The file content for `datadog-sampling-priority-2.json`: + +```json +{ + "SamplingRule": { + "RuleName": "Datadog-Sampling-Priority-2", + "ResourceARN": "*", + "Priority": 9999, + "FixedRate": 1, + "ReservoirSize": 100, + "ServiceName": "*", + "ServiceType": "AWS::APIGateway::Stage", + "Host": "*", + "HTTPMethod": "*", + "URLPath": "*", + "Version": 1, + "Attributes": { + "x-datadog-sampling-priority": "2" + } + } +} +``` + ## Non-proxy integration If your Lambda function is triggered by API Gateway via the non-proxy integration, then you have to set up a mapping template, which passes the Datadog trace context from the incoming HTTP request headers to the Lambda function via the event object. -If your Lambda function is deployed by the Serverless Framework, such a mapping template gets created by default. \ No newline at end of file +If your Lambda function is deployed by the Serverless Framework, such a mapping template gets created by default. + +## Opening Issues + +If you encounter a bug with this package, we want to hear about it. Before opening a new issue, search the existing issues to avoid duplicates. + +When opening an issue, include the Datadog Lambda Layer version, Python version, and stack trace if available. In addition, include the steps to reproduce when appropriate. + +You can also open an issue for a feature request. + +## Contributing + +If you find an issue with this package and have a fix, please feel free to open a pull request following the procedures. + +## License + +Unless explicitly stated otherwise all files in this repository are licensed under the Apache License Version 2.0. + +This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2019 Datadog, Inc. diff --git a/ddlambda.go b/ddlambda.go index 0cf94c76..1985158f 100644 --- a/ddlambda.go +++ b/ddlambda.go @@ -2,16 +2,48 @@ package ddlambda import ( "context" + "fmt" "net/http" + "os" + "runtime" + "time" + "github.com/DataDog/dd-lambda-go/internal/metrics" "github.com/DataDog/dd-lambda-go/internal/trace" + "github.com/DataDog/dd-lambda-go/internal/wrapper" +) + +type ( + // Config gives options for how ddlambda should behave + Config struct { + // APIKey is your Datadog API key. This is used for sending metrics. + APIKey string + // AppKey is your Datadog App key. This is used for sending metrics. + AppKey string + // ShouldRetryOnFailure is used to turn on retry logic when sending metrics via the API. This can negatively effect the performance of your lambda, + // and should only be turned on if you can't afford to lose metrics data under poor network conditions. + ShouldRetryOnFailure bool + // BatchInterval is the period of time which metrics are grouped together for processing to be sent to the API or written to logs. + // Any pending metrics are flushed at the end of the lambda. + BatchInterval time.Duration + } +) + +const ( + // DatadogAPIKeyEnvVar is the environment variable that will be used as an API key by default + DatadogAPIKeyEnvVar = "DATADOG_API_KEY" + // DatadogAPPKeyEnvVar is the environment variable that will be used as an API key by default + DatadogAPPKeyEnvVar = "DATADOG_APP_KEY" ) // WrapHandler is used to instrument your lambda functions, reading in context from API Gateway. // It returns a modified handler that can be passed directly to the lambda.Start function. -func WrapHandler(handler interface{}) interface{} { - hl := trace.Listener{} - return trace.WrapHandlerWithListener(handler, &hl) +func WrapHandler(handler interface{}, cfg *Config) interface{} { + + // Set up state that is shared between handler invocations + tl := trace.Listener{} + ml := metrics.MakeListener(cfg.toMetricsConfig()) + return wrapper.WrapHandlerWithListeners(handler, &tl, &ml) } // GetTraceHeaders reads a map containing the DataDog trace headers from a context object. @@ -27,3 +59,61 @@ func AddTraceHeaders(ctx context.Context, req *http.Request) { req.Header.Add(key, value) } } + +// GetContext retrieves the last created lambda context. +// Only use this if you aren't manually passing context through your call hierarchy. +func GetContext() context.Context { + return wrapper.CurrentContext +} + +// DistributionWithContext sends a distribution metric to DataDog +func DistributionWithContext(ctx context.Context, metric string, value float64, tags ...string) { + pr := metrics.GetProcessor(GetContext()) + if pr == nil { + return + } + + // We add our own runtime tag to the metric for version tracking + tags = append(tags, getRuntimeTag()) + + m := metrics.Distribution{ + Name: metric, + Tags: tags, + Values: []float64{}, + } + m.AddPoint(value) + pr.AddMetric(&m) +} + +// Distribution sends a distribution metric to DataDog +func Distribution(metric string, value float64, tags ...string) { + DistributionWithContext(GetContext(), metric, value, tags...) +} + +func (cfg *Config) toMetricsConfig() metrics.Config { + + mc := metrics.Config{ + ShouldRetryOnFailure: false, + } + + if cfg != nil { + mc.BatchInterval = cfg.BatchInterval + mc.ShouldRetryOnFailure = cfg.ShouldRetryOnFailure + mc.APIKey = cfg.APIKey + mc.AppKey = cfg.AppKey + } + + if mc.APIKey == "" { + mc.APIKey = os.Getenv(DatadogAPIKeyEnvVar) + + } + if mc.AppKey == "" { + mc.AppKey = os.Getenv(DatadogAPIKeyEnvVar) + } + return mc +} + +func getRuntimeTag() string { + v := runtime.Version() + return fmt.Sprintf("dd_lambda_layer:datadog-%s", v) +} diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..f11c2895 --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/DataDog/dd-lambda-go + +go 1.12 + +require ( + github.com/aws/aws-sdk-go v1.19.40 // indirect + github.com/aws/aws-xray-sdk-go v1.0.0-rc.9 + github.com/cenkalti/backoff v2.1.1+incompatible + github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect + github.com/pkg/errors v0.8.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..13c1f3e1 --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/aws/aws-sdk-go v1.19.40/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-xray-sdk-go v1.0.0-rc.9/go.mod h1:XtMKdBQfpVut+tJEwI7+dJFRxxRdxHDyVNp2tHXRq04= +github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/internal/metrics/api.go b/internal/metrics/api.go new file mode 100644 index 00000000..7f55e73f --- /dev/null +++ b/internal/metrics/api.go @@ -0,0 +1,105 @@ +package metrics + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" +) + +type ( + // Client sends metrics to Datadog + Client interface { + SendMetrics(metrics []APIMetric) error + } + + // APIClient send metrics to Datadog, via the Datadog API + APIClient struct { + apiKey string + appKey string + baseAPIURL string + httpClient *http.Client + context context.Context + } + + postMetricsModel struct { + Series []APIMetric `json:"series"` + } +) + +// MakeAPIClient creates a new API client with the given api and app keys +func MakeAPIClient(ctx context.Context, baseAPIURL, apiKey, appKey string) *APIClient { + httpClient := &http.Client{} + return &APIClient{ + apiKey: apiKey, + appKey: appKey, + baseAPIURL: baseAPIURL, + httpClient: httpClient, + context: ctx, + } +} + +// PrewarmConnection sends a redundant GET request to the Datadog API to prewarm the TSL connection +func (cl *APIClient) PrewarmConnection() error { + req, err := http.NewRequest("GET", cl.makeRoute("validate"), nil) + if err != nil { + return fmt.Errorf("Couldn't create prewarming request: %v", err) + } + req = req.WithContext(cl.context) + + cl.addAPICredentials(req) + _, err = cl.httpClient.Do(req) + if err != nil { + return fmt.Errorf("Couldn't contact server for prewarm request") + } + return nil +} + +// SendMetrics posts a batch metrics payload to the Datadog API +func (cl *APIClient) SendMetrics(metrics []APIMetric) error { + content, err := marshalAPIMetricsModel(metrics) + if err != nil { + return fmt.Errorf("Couldn't marshal metrics model: %v", err) + } + body := bytes.NewBuffer(content) + + req, err := http.NewRequest("POST", cl.makeRoute("series"), body) + if err != nil { + return fmt.Errorf("Couldn't create send metrics request:%v", err) + } + req = req.WithContext(cl.context) + + defer req.Body.Close() + + cl.addAPICredentials(req) + + resp, err := cl.httpClient.Do(req) + + if err != nil { + return fmt.Errorf("Failed to send metrics to API") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusCreated { + return fmt.Errorf("Failed to send metrics to API. Status Code %d", resp.StatusCode) + } + return nil +} + +func (cl *APIClient) addAPICredentials(req *http.Request) { + query := req.URL.Query() + query.Add(apiKeyParam, cl.apiKey) + query.Add(appKeyParam, cl.appKey) + req.URL.RawQuery = query.Encode() +} + +func (cl *APIClient) makeRoute(route string) string { + return fmt.Sprintf("%s/%s", cl.baseAPIURL, route) +} + +func marshalAPIMetricsModel(metrics []APIMetric) ([]byte, error) { + pm := postMetricsModel{} + pm.Series = metrics + return json.Marshal(pm) +} diff --git a/internal/metrics/api_test.go b/internal/metrics/api_test.go new file mode 100644 index 00000000..e482e146 --- /dev/null +++ b/internal/metrics/api_test.go @@ -0,0 +1,130 @@ +package metrics + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + mockAPIKey = "12345" + mockAppKey = "678910" +) + +func TestAddAPICredentials(t *testing.T) { + cl := MakeAPIClient("", mockAPIKey, mockAppKey) + req, _ := http.NewRequest("GET", "http://some-api.com/endpoint", nil) + cl.addAPICredentials(req) + assert.Equal(t, "http://some-api.com/endpoint?api_key=12345&application_key=678910", req.URL.String()) +} + +func TestPrewarmConnection(t *testing.T) { + + called := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + assert.Equal(t, "/validate?api_key=12345&application_key=678910", r.URL.String()) + })) + defer server.Close() + + cl := MakeAPIClient(server.URL, mockAPIKey, mockAppKey) + err := cl.PrewarmConnection() + + assert.NoError(t, err) + assert.True(t, called) +} + +func TestSendMetricsSuccess(t *testing.T) { + called := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusCreated) + body, _ := ioutil.ReadAll(r.Body) + s := string(body) + + assert.Equal(t, "/series?api_key=12345&application_key=678910", r.URL.String()) + assert.Equal(t, "{\"series\":[{\"metric\":\"metric-1\",\"tags\":[\"a\",\"b\",\"c\"],\"type\":\"distribution\",\"points\":[[1,2],[3,4],[5,6]]}]}", s) + + })) + defer server.Close() + + am := []APIMetric{ + { + Name: "metric-1", + Host: nil, + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + {float64(1), float64(2)}, {float64(3), float64(4)}, {float64(5), float64(6)}, + }, + }, + } + + cl := MakeAPIClient(server.URL, mockAPIKey, mockAppKey) + err := cl.SendMetrics(am) + + assert.NoError(t, err) + assert.True(t, called) +} + +func TestSendMetricsBadRequest(t *testing.T) { + called := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusForbidden) + body, _ := ioutil.ReadAll(r.Body) + s := string(body) + + assert.Equal(t, "/series?api_key=12345&application_key=678910", r.URL.String()) + assert.Equal(t, "{\"series\":[{\"metric\":\"metric-1\",\"tags\":[\"a\",\"b\",\"c\"],\"type\":\"distribution\",\"points\":[[1,2],[3,4],[5,6]]}]}", s) + + })) + defer server.Close() + + am := []APIMetric{ + { + Name: "metric-1", + Host: nil, + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + {float64(1), float64(2)}, {float64(3), float64(4)}, {float64(5), float64(6)}, + }, + }, + } + + cl := MakeAPIClient(server.URL, mockAPIKey, mockAppKey) + err := cl.SendMetrics(am) + + assert.Error(t, err) + assert.True(t, called) +} + +func TestSendMetricsCantReachServer(t *testing.T) { + called := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + })) + defer server.Close() + + am := []APIMetric{ + { + Name: "metric-1", + Host: nil, + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + {float64(1), float64(2)}, {float64(3), float64(4)}, {float64(5), float64(6)}, + }, + }, + } + + cl := MakeAPIClient("httpa:///badly-formatted-url", mockAPIKey, mockAppKey) + err := cl.SendMetrics(am) + + assert.Error(t, err) + assert.False(t, called) +} diff --git a/internal/metrics/batcher.go b/internal/metrics/batcher.go new file mode 100644 index 00000000..b9212ba6 --- /dev/null +++ b/internal/metrics/batcher.go @@ -0,0 +1,78 @@ +package metrics + +import ( + "fmt" + "math" + "sort" + "strings" + "time" +) + +type ( + // Batcher aggregates metrics with common properties,(metric name, tags, type etc) + Batcher struct { + metrics map[string]Metric + batchInterval time.Duration + } + // BatchKey identifies a batch of metrics + BatchKey struct { + metricType MetricType + name string + tags []string + host *string + } +) + +// MakeBatcher creates a new batcher object +func MakeBatcher(batchInterval time.Duration) *Batcher { + return &Batcher{ + batchInterval: batchInterval, + metrics: map[string]Metric{}, + } +} + +// AddMetric adds a point to a given metric +func (b *Batcher) AddMetric(timestamp time.Time, metric Metric) { + sk := b.getStringKey(timestamp, metric.ToBatchKey()) + if existing, ok := b.metrics[sk]; ok { + existing.Join(metric) + } else { + b.metrics[sk] = metric + } +} + +// ToAPIMetrics converts the current batch of metrics into API metrics +func (b *Batcher) ToAPIMetrics(timestamp time.Time) []APIMetric { + + ar := []APIMetric{} + interval := b.batchInterval / time.Second + + for _, metric := range b.metrics { + values := metric.ToAPIMetric(timestamp, interval) + for _, val := range values { + ar = append(ar, val) + } + } + return ar +} + +func (b *Batcher) getInterval(timestamp time.Time) float64 { + return float64(timestamp.Unix()) - math.Mod(float64(timestamp.Unix()), float64(b.batchInterval)) +} + +func (b *Batcher) getStringKey(timestamp time.Time, bk BatchKey) string { + interval := b.getInterval(timestamp) + tagKey := getTagKey(bk.tags) + + if bk.host != nil { + return fmt.Sprintf("(%g)-(%s)-(%s)-(%s)-(%s)", interval, bk.metricType, bk.name, tagKey, *bk.host) + } + return fmt.Sprintf("(%g)-(%s)-(%s)-(%s)", interval, bk.metricType, bk.name, tagKey) +} + +func getTagKey(tags []string) string { + sortedTags := make([]string, len(tags)) + copy(sortedTags, tags) + sort.Strings(sortedTags) + return strings.Join(sortedTags, ":") +} diff --git a/internal/metrics/batcher_test.go b/internal/metrics/batcher_test.go new file mode 100644 index 00000000..baeb2773 --- /dev/null +++ b/internal/metrics/batcher_test.go @@ -0,0 +1,140 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestGetMetricDifferentTagOrder(t *testing.T) { + + tm := time.Now() + batcher := MakeBatcher(10) + dm1 := Distribution{ + Name: "metric-1", + Values: []float64{1, 2}, + Tags: []string{"a", "b", "c"}, + } + dm2 := Distribution{ + Name: "metric-1", + Values: []float64{3, 4}, + Tags: []string{"c", "b", "a"}, + } + + batcher.AddMetric(tm, &dm1) + batcher.AddMetric(tm, &dm2) + + assert.Equal(t, []float64{1, 2, 3, 4}, dm1.Values) +} + +func TestGetMetricFailDifferentName(t *testing.T) { + + tm := time.Now() + batcher := MakeBatcher(10) + + dm1 := Distribution{ + Name: "metric-1", + Values: []float64{1, 2}, + Tags: []string{"a", "b", "c"}, + } + dm2 := Distribution{ + Name: "metric-2", + Values: []float64{3, 4}, + Tags: []string{"c", "b", "a"}, + } + + batcher.AddMetric(tm, &dm1) + batcher.AddMetric(tm, &dm2) + + assert.Equal(t, []float64{1, 2}, dm1.Values) + +} + +func TestGetMetricFailDifferentHost(t *testing.T) { + tm := time.Now() + batcher := MakeBatcher(10) + + host1 := "my-host-1" + host2 := "my-host-2" + + dm1 := Distribution{ + Name: "metric-1", + Values: []float64{1, 2}, + Tags: []string{"a", "b", "c"}, + Host: &host1, + } + dm2 := Distribution{ + Name: "metric-1", + Values: []float64{3, 4}, + Tags: []string{"a", "b", "c"}, + Host: &host2, + } + + batcher.AddMetric(tm, &dm1) + batcher.AddMetric(tm, &dm2) + + assert.Equal(t, []float64{1, 2}, dm1.Values) +} + +func TestGetMetricSameHost(t *testing.T) { + + tm := time.Now() + batcher := MakeBatcher(10) + + host := "my-host" + + dm1 := Distribution{ + Name: "metric-1", + Values: []float64{1, 2}, + Tags: []string{"a", "b", "c"}, + Host: &host, + } + dm2 := Distribution{ + Name: "metric-1", + Values: []float64{3, 4}, + Tags: []string{"a", "b", "c"}, + Host: &host, + } + + batcher.AddMetric(tm, &dm1) + batcher.AddMetric(tm, &dm2) + + assert.Equal(t, []float64{1, 2, 3, 4}, dm1.Values) +} + +func TestToAPIMetricsSameInterval(t *testing.T) { + tm := time.Now() + hostname := "host-1" + + batcher := MakeBatcher(10) + dm := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Host: &hostname, + Values: []float64{}, + } + + dm.AddPoint(1) + dm.AddPoint(2) + dm.AddPoint(3) + + batcher.AddMetric(tm, &dm) + + floatTime := float64(tm.Unix()) + result := batcher.ToAPIMetrics(tm) + expected := []APIMetric{ + { + Name: "metric-1", + Host: &hostname, + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Interval: nil, + Points: [][]float64{ + {floatTime, 1}, {floatTime, 2}, {floatTime, 3}, + }, + }, + } + + assert.Equal(t, expected, result) +} diff --git a/internal/metrics/constants.go b/internal/metrics/constants.go new file mode 100644 index 00000000..4d107137 --- /dev/null +++ b/internal/metrics/constants.go @@ -0,0 +1,20 @@ +package metrics + +import "time" + +const ( + baseAPIURL = "https://api.datadoghq.com/api/v1" + apiKeyParam = "api_key" + appKeyParam = "application_key" + defaultRetryInterval = time.Millisecond * 250 + defaultBatchInterval = time.Second * 15 +) + +// MetricType enumerates all the available metric types +type MetricType string + +const ( + + // DistributionType represents a distribution metric + DistributionType MetricType = "distribution" +) diff --git a/internal/metrics/context.go b/internal/metrics/context.go new file mode 100644 index 00000000..c1cf936d --- /dev/null +++ b/internal/metrics/context.go @@ -0,0 +1,21 @@ +package metrics + +import "context" + +type contextKeytype int + +var traceContextKey = new(contextKeytype) + +// GetProcessor retrieves the processor from a context object. +func GetProcessor(ctx context.Context) Processor { + result := ctx.Value(traceContextKey) + if result == nil { + return nil + } + return result.(Processor) +} + +// AddProcessor adds a processor to a context object +func AddProcessor(ctx context.Context, processor Processor) context.Context { + return context.WithValue(ctx, traceContextKey, processor) +} diff --git a/internal/metrics/context_test.go b/internal/metrics/context_test.go new file mode 100644 index 00000000..597f2294 --- /dev/null +++ b/internal/metrics/context_test.go @@ -0,0 +1,20 @@ +package metrics + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetProcessorEmptyContext(t *testing.T) { + ctx := context.Background() + result := GetProcessor(ctx) + assert.Nil(t, result) +} + +func TestGetProcessorSuccess(t *testing.T) { + ctx := AddProcessor(context.Background(), MakeProcessor(context.Background(), nil, nil, 0, false)) + result := GetProcessor(ctx) + assert.NotNil(t, result) +} diff --git a/internal/metrics/listener.go b/internal/metrics/listener.go new file mode 100644 index 00000000..96958de8 --- /dev/null +++ b/internal/metrics/listener.go @@ -0,0 +1,63 @@ +package metrics + +import ( + "context" + "encoding/json" + "time" +) + +type ( + // Listener implements wrapper.HandlerListener, injecting metrics into the context + Listener struct { + apiClient *APIClient + config *Config + } + + // Config gives options for how the listener should work + Config struct { + APIKey string + AppKey string + ShouldRetryOnFailure bool + BatchInterval time.Duration + } +) + +// MakeListener initializes a new metrics lambda listener +func MakeListener(config Config) Listener { + apiClient := MakeAPIClient(context.Background(), baseAPIURL, config.APIKey, config.AppKey) + if config.BatchInterval <= 0 { + config.BatchInterval = defaultBatchInterval + } + + // Do this in the background, doesn't matter if it returns + go apiClient.PrewarmConnection() + + return Listener{ + apiClient, + &config, + } +} + +// HandlerStarted adds metrics service to the context +func (l *Listener) HandlerStarted(ctx context.Context, msg json.RawMessage) context.Context { + + ts := MakeTimeService() + pr := MakeProcessor(ctx, l.apiClient, ts, l.config.BatchInterval, l.config.ShouldRetryOnFailure) + + ctx = AddProcessor(ctx, pr) + // Setting the context on the client will mean that future requests will be cancelled correctly + // if the lambda times out. + l.apiClient.context = ctx + + pr.StartProcessing() + + return ctx +} + +// HandlerFinished implemented as part of the wrapper.HandlerListener interface +func (l *Listener) HandlerFinished(ctx context.Context) { + pr := GetProcessor(ctx) + if pr != nil { + pr.FinishProcessing() + } +} diff --git a/internal/metrics/listener_test.go b/internal/metrics/listener_test.go new file mode 100644 index 00000000..1ca924f0 --- /dev/null +++ b/internal/metrics/listener_test.go @@ -0,0 +1,25 @@ +package metrics + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestHandlerAddsProcessorToContext(t *testing.T) { + listener := MakeListener(Config{}) + ctx := listener.HandlerStarted(context.Background(), json.RawMessage{}) + pr := GetProcessor(ctx) + assert.NotNil(t, pr) +} + +func TestHandlerFinishesProcessing(t *testing.T) { + listener := MakeListener(Config{}) + ctx := listener.HandlerStarted(context.Background(), json.RawMessage{}) + + pr := GetProcessor(ctx).(*processor) + listener.HandlerFinished(ctx) + assert.False(t, pr.isProcessing) +} diff --git a/internal/metrics/model.go b/internal/metrics/model.go new file mode 100644 index 00000000..b8d94d35 --- /dev/null +++ b/internal/metrics/model.go @@ -0,0 +1,82 @@ +package metrics + +import ( + "time" +) + +type ( + // Metric represents a metric that can have any kind of + Metric interface { + AddPoint(value float64) + ToAPIMetric(timestamp time.Time, interval time.Duration) []APIMetric + ToBatchKey() BatchKey + Join(metric Metric) + } + + // APIMetric is a metric that can be marshalled to send to the metrics API + APIMetric struct { + Name string `json:"metric"` + Host *string `json:"host,omitempty"` + Tags []string `json:"tags,omitempty"` + MetricType MetricType `json:"type"` + Interval *float64 `json:"interval,omitempty"` + Points [][]float64 `json:"points"` + } + + // Distribution is a type of metric that is aggregated over multiple hosts + Distribution struct { + Name string + Tags []string + Host *string + Values []float64 + } +) + +// AddPoint adds a point to the distribution metric +func (d *Distribution) AddPoint(value float64) { + d.Values = append(d.Values, value) +} + +// ToBatchKey returns a key that can be used to batch the metric +func (d *Distribution) ToBatchKey() BatchKey { + return BatchKey{ + name: d.Name, + host: d.Host, + tags: d.Tags, + metricType: DistributionType, + } +} + +// Join creates a union between two metric sets +func (d *Distribution) Join(metric Metric) { + otherDist, ok := metric.(*Distribution) + if !ok { + return + } + for _, val := range otherDist.Values { + d.AddPoint(val) + } + +} + +// ToAPIMetric converts a distribution into an API ready format. +func (d *Distribution) ToAPIMetric(timestamp time.Time, interval time.Duration) []APIMetric { + points := make([][]float64, len(d.Values)) + + currentTime := float64(timestamp.Unix()) + + for i, val := range d.Values { + points[i] = []float64{currentTime, val} + } + + return []APIMetric{ + APIMetric{ + Name: d.Name, + Host: d.Host, + Tags: d.Tags, + MetricType: DistributionType, + Points: points, + Interval: nil, + }, + } +} diff --git a/internal/metrics/processor.go b/internal/metrics/processor.go new file mode 100644 index 00000000..7c110696 --- /dev/null +++ b/internal/metrics/processor.go @@ -0,0 +1,130 @@ +package metrics + +import ( + "context" + "sync" + "time" + + "github.com/cenkalti/backoff" +) + +type ( + // Processor is used to batch metrics on a background thread, and send them on to a client periodically. + Processor interface { + // AddMetric sends a metric to the agent + AddMetric(metric Metric) + // StartProcessing begins processing metrics asynchronously + StartProcessing() + // FinishProcessing shuts down the agent, and tries to flush any remaining metrics + FinishProcessing() + } + + processor struct { + context context.Context + metricsChan chan Metric + timeService TimeService + waitGroup sync.WaitGroup + batchInterval time.Duration + client Client + batcher *Batcher + shouldRetryOnFail bool + isProcessing bool + } +) + +// MakeProcessor creates a new metrics context +func MakeProcessor(ctx context.Context, client Client, timeService TimeService, batchInterval time.Duration, shouldRetryOnFail bool) Processor { + batcher := MakeBatcher(batchInterval) + + return &processor{ + context: ctx, + metricsChan: make(chan Metric, 2000), + batchInterval: batchInterval, + waitGroup: sync.WaitGroup{}, + client: client, + batcher: batcher, + shouldRetryOnFail: shouldRetryOnFail, + timeService: timeService, + isProcessing: false, + } +} + +func (p *processor) AddMetric(metric Metric) { + // We use a large buffer in the metrics channel, to make this operation non-blocking. + // However, if the channel does fill up, this will become a blocking operation. + p.metricsChan <- metric +} + +func (p *processor) StartProcessing() { + if !p.isProcessing { + p.isProcessing = true + p.waitGroup.Add(1) + go p.processMetrics() + } + +} + +func (p *processor) FinishProcessing() { + if !p.isProcessing { + p.StartProcessing() + } + // Closes the metrics channel, and waits for the last send to complete + close(p.metricsChan) + p.waitGroup.Wait() +} + +func (p *processor) processMetrics() { + + ticker := p.timeService.NewTicker(p.batchInterval) + + doneChan := p.context.Done() + shouldExit := false + for !shouldExit { + shouldSendBatch := false + // Batches metrics until timeout is reached + select { + case <-doneChan: + // This process is being cancelled by the context,(probably due to a lambda deadline), exit without flushing. + shouldExit = true + close(p.metricsChan) + case m, ok := <-p.metricsChan: + if !ok { + // The channel has now been closed + shouldSendBatch = true + shouldExit = true + } else { + p.batcher.AddMetric(p.timeService.Now(), m) + } + case <-ticker.C: + // We are ready to send a batch to our backend + shouldSendBatch = true + } + if shouldSendBatch { + if shouldExit && p.shouldRetryOnFail { + // If we are shutting down, and we just failed to send our last batch, do a retry + bo := backoff.WithMaxRetries(backoff.NewConstantBackOff(defaultRetryInterval), 2) + backoff.Retry(p.sendMetricsBatch, bo) + } else { + p.sendMetricsBatch() + } + } + } + ticker.Stop() + p.isProcessing = false + p.waitGroup.Done() +} + +func (p *processor) sendMetricsBatch() error { + mts := p.batcher.ToAPIMetrics(p.timeService.Now()) + if len(mts) > 0 { + err := p.client.SendMetrics(mts) + if err != nil { + return err + } + // All the metrics in the batcher were sent successfully, + // the batcher can now be cleared. If there was an error, + // the metrics will stay in the batcher and be sent in the next cycle. + p.batcher = MakeBatcher(p.batchInterval) + } + return nil +} diff --git a/internal/metrics/processor_test.go b/internal/metrics/processor_test.go new file mode 100644 index 00000000..ac7d8a43 --- /dev/null +++ b/internal/metrics/processor_test.go @@ -0,0 +1,223 @@ +package metrics + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type ( + mockClient struct { + batches chan []APIMetric + sendMetricsCalledCount int + err error + } + + mockTimeService struct { + now time.Time + tickerChan chan time.Time + } +) + +func makeMockClient() mockClient { + return mockClient{ + batches: make(chan []APIMetric, 10), + err: nil, + } +} + +func makeMockTimeService() mockTimeService { + return mockTimeService{ + now: time.Now(), + tickerChan: make(chan time.Time), + } +} + +func (mc *mockClient) SendMetrics(mts []APIMetric) error { + mc.sendMetricsCalledCount++ + mc.batches <- mts + return mc.err +} + +func (ts *mockTimeService) NewTicker(duration time.Duration) *time.Ticker { + return &time.Ticker{ + C: ts.tickerChan, + } +} + +func (ts *mockTimeService) Now() time.Time { + return ts.now +} + +func TestProcessorBatches(t *testing.T) { + mc := makeMockClient() + mts := makeMockTimeService() + + mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + nowUnix := float64(mts.now.Unix()) + + processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false) + + d1 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{1, 2, 3}, + } + d2 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{4, 5, 6}, + } + + processor.AddMetric(&d1) + processor.AddMetric(&d2) + + processor.StartProcessing() + processor.FinishProcessing() + + firstBatch := <-mc.batches + + assert.Equal(t, []APIMetric{{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + []float64{nowUnix, 1}, + []float64{nowUnix, 2}, + []float64{nowUnix, 3}, + []float64{nowUnix, 4}, + []float64{nowUnix, 5}, + []float64{nowUnix, 6}, + }, + }}, firstBatch) +} + +func TestProcessorBatchesPerTick(t *testing.T) { + mc := makeMockClient() + mts := makeMockTimeService() + + firstTime, _ := time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + firstTimeUnix := float64(firstTime.Unix()) + secondTime, _ := time.Parse(time.RFC3339, "2007-01-02T15:04:05Z") + secondTimeUnix := float64(secondTime.Unix()) + mts.now = firstTime + + processor := MakeProcessor(context.Background(), &mc, &mts, 1000, false) + + d1 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{1, 2}, + } + d2 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{3}, + } + d3 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{4, 5}, + } + d4 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{6}, + } + + processor.StartProcessing() + + processor.AddMetric(&d1) + processor.AddMetric(&d2) + + // This wait is necessary to make sure both metrics have been added to the batch + <-time.Tick(time.Millisecond * 10) + // Sending time to the ticker channel will flush the batch. + mts.tickerChan <- firstTime + firstBatch := <-mc.batches + mts.now = secondTime + + processor.AddMetric(&d3) + processor.AddMetric(&d4) + + processor.FinishProcessing() + secondBatch := <-mc.batches + batches := [][]APIMetric{firstBatch, secondBatch} + + assert.Equal(t, [][]APIMetric{ + []APIMetric{ + { + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + []float64{firstTimeUnix, 1}, + []float64{firstTimeUnix, 2}, + []float64{firstTimeUnix, 3}, + }, + }}, + []APIMetric{ + { + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + MetricType: DistributionType, + Points: [][]float64{ + []float64{secondTimeUnix, 4}, + []float64{secondTimeUnix, 5}, + []float64{secondTimeUnix, 6}, + }, + }}, + }, batches) +} + +func TestProcessorPerformsRetry(t *testing.T) { + mc := makeMockClient() + mts := makeMockTimeService() + + mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + + shouldRetry := true + processor := MakeProcessor(context.Background(), &mc, &mts, 1000, shouldRetry) + + d1 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{1, 2, 3}, + } + + mc.err = errors.New("Some error") + + processor.AddMetric(&d1) + + processor.FinishProcessing() + + assert.Equal(t, 3, mc.sendMetricsCalledCount) +} + +func TestProcessorCancelsWithContext(t *testing.T) { + mc := makeMockClient() + mts := makeMockTimeService() + + mts.now, _ = time.Parse(time.RFC3339, "2006-01-02T15:04:05Z") + + shouldRetry := true + ctx, cancelFunc := context.WithCancel(context.Background()) + processor := MakeProcessor(ctx, &mc, &mts, 1000, shouldRetry) + + d1 := Distribution{ + Name: "metric-1", + Tags: []string{"a", "b", "c"}, + Values: []float64{1, 2, 3}, + } + + processor.AddMetric(&d1) + + // After calling cancelFunc, no metrics should be processed/sent + cancelFunc() + processor.FinishProcessing() + + assert.Equal(t, 0, mc.sendMetricsCalledCount) +} diff --git a/internal/metrics/time.go b/internal/metrics/time.go new file mode 100644 index 00000000..c918c251 --- /dev/null +++ b/internal/metrics/time.go @@ -0,0 +1,27 @@ +package metrics + +import "time" + +type ( + //TimeService wraps common time related operations + TimeService interface { + NewTicker(duration time.Duration) *time.Ticker + Now() time.Time + } + + timeService struct { + } +) + +// MakeTimeService creates a new time service +func MakeTimeService() TimeService { + return &timeService{} +} + +func (ts *timeService) NewTicker(duration time.Duration) *time.Ticker { + return time.NewTicker(duration) +} + +func (ts *timeService) Now() time.Time { + return time.Now() +} diff --git a/internal/trace/testdata/apig-event-metadata.json b/internal/testdata/apig-event-metadata.json similarity index 100% rename from internal/trace/testdata/apig-event-metadata.json rename to internal/testdata/apig-event-metadata.json diff --git a/internal/trace/testdata/apig-event-no-metadata.json b/internal/testdata/apig-event-no-metadata.json similarity index 100% rename from internal/trace/testdata/apig-event-no-metadata.json rename to internal/testdata/apig-event-no-metadata.json diff --git a/internal/trace/testdata/invalid.json b/internal/testdata/invalid.json similarity index 100% rename from internal/trace/testdata/invalid.json rename to internal/testdata/invalid.json diff --git a/internal/trace/testdata/non-proxy-metadata.json b/internal/testdata/non-proxy-metadata.json similarity index 100% rename from internal/trace/testdata/non-proxy-metadata.json rename to internal/testdata/non-proxy-metadata.json diff --git a/internal/trace/testdata/non-proxy-no-metadata.json b/internal/testdata/non-proxy-no-metadata.json similarity index 100% rename from internal/trace/testdata/non-proxy-no-metadata.json rename to internal/testdata/non-proxy-no-metadata.json diff --git a/internal/trace/context.go b/internal/trace/context.go index 513008bb..7671d89a 100644 --- a/internal/trace/context.go +++ b/internal/trace/context.go @@ -73,7 +73,7 @@ func GetTraceHeaders(ctx context.Context, useCurrentSegmentAsParent bool) map[st } func addTraceContextToXRay(ctx context.Context, traceContext map[string]string) error { - ctx, segment := xray.BeginSubsegment(ctx, xraySubsegmentName) + _, segment := xray.BeginSubsegment(ctx, xraySubsegmentName) err := segment.AddMetadataToNamespace(xraySubsegmentKey, xraySubsegmentNamespace, traceContext) if err != nil { return fmt.Errorf("couldn't save trace context to XRay: %v", err) diff --git a/internal/trace/context_test.go b/internal/trace/context_test.go index e7baafa7..ed1e3c5f 100644 --- a/internal/trace/context_test.go +++ b/internal/trace/context_test.go @@ -2,6 +2,8 @@ package trace import ( "context" + "encoding/json" + "io/ioutil" "testing" "github.com/aws/aws-xray-sdk-go/header" @@ -27,8 +29,18 @@ func mockLambdaTraceContext(ctx context.Context, traceID, parentID string, sampl return context.WithValue(ctx, xray.LambdaTraceHeaderKey, headerString) } +func loadRawJSON(t *testing.T, filename string) *json.RawMessage { + bytes, err := ioutil.ReadFile(filename) + if err != nil { + assert.Fail(t, "Couldn't find JSON file") + return nil + } + msg := json.RawMessage{} + msg.UnmarshalJSON(bytes) + return &msg +} func TestUnmarshalEventForTraceMetadataNonProxyEvent(t *testing.T) { - ev := loadRawJSON(t, "testdata/apig-event-metadata.json") + ev := loadRawJSON(t, "../testdata/apig-event-metadata.json") headers, ok := unmarshalEventForTraceContext(*ev) assert.True(t, ok) @@ -42,14 +54,14 @@ func TestUnmarshalEventForTraceMetadataNonProxyEvent(t *testing.T) { } func TestUnmarshalEventForInvalidData(t *testing.T) { - ev := loadRawJSON(t, "testdata/invalid.json") + ev := loadRawJSON(t, "../testdata/invalid.json") _, ok := unmarshalEventForTraceContext(*ev) assert.False(t, ok) } func TestUnmarshalEventForMissingData(t *testing.T) { - ev := loadRawJSON(t, "testdata/non-proxy-no-metadata.json") + ev := loadRawJSON(t, "../testdata/non-proxy-no-metadata.json") _, ok := unmarshalEventForTraceContext(*ev) assert.False(t, ok) @@ -108,7 +120,7 @@ func TestXrayTraceContextWithSegment(t *testing.T) { } func TestExtractTraceContextFromContext(t *testing.T) { - ev := loadRawJSON(t, "testdata/apig-event-no-metadata.json") + ev := loadRawJSON(t, "../testdata/apig-event-no-metadata.json") ctx := mockLambdaTraceContext(context.Background(), "1-5ce31dc2-2c779014b90ce44db5e03875", "779014b90ce44db5e03875", true) newCTX, err := ExtractTraceContext(ctx, *ev) @@ -120,7 +132,7 @@ func TestExtractTraceContextFromContext(t *testing.T) { assert.NotNil(t, headers[parentIDHeader]) } func TestExtractTraceContextFromEvent(t *testing.T) { - ev := loadRawJSON(t, "testdata/apig-event-metadata.json") + ev := loadRawJSON(t, "../testdata/apig-event-metadata.json") ctx := mockLambdaTraceContext(context.Background(), "1-5ce31dc2-2c779014b90ce44db5e03875", "779014b90ce44db5e03875", true) newCTX, err := ExtractTraceContext(ctx, *ev) @@ -136,7 +148,7 @@ func TestExtractTraceContextFromEvent(t *testing.T) { } func TestExtractTraceContextFail(t *testing.T) { - ev := loadRawJSON(t, "testdata/apig-event-no-metadata.json") + ev := loadRawJSON(t, "../testdata/apig-event-no-metadata.json") ctx := context.Background() _, err := ExtractTraceContext(ctx, *ev) @@ -144,7 +156,7 @@ func TestExtractTraceContextFail(t *testing.T) { } func TestGetTraceHeadersWithUpdatedParent(t *testing.T) { - ev := loadRawJSON(t, "testdata/apig-event-metadata.json") + ev := loadRawJSON(t, "../testdata/apig-event-metadata.json") ctx := mockLambdaTraceContext(context.Background(), "1-5ce31dc2-2c779014b90ce44db5e03875", "779014b90ce44db5e03874", true) ctx, _ = ExtractTraceContext(ctx, *ev) diff --git a/internal/trace/wrap_handler.go b/internal/wrapper/wrap_handler.go similarity index 85% rename from internal/trace/wrap_handler.go rename to internal/wrapper/wrap_handler.go index a616c47a..f24a28df 100644 --- a/internal/trace/wrap_handler.go +++ b/internal/wrapper/wrap_handler.go @@ -1,4 +1,4 @@ -package trace +package wrapper import ( "context" @@ -7,6 +7,11 @@ import ( "reflect" ) +var ( + // CurrentContext is the last create lambda context object. + CurrentContext context.Context +) + type ( // HandlerListener is a point where listener logic can be injected into a handler HandlerListener interface { @@ -15,8 +20,8 @@ type ( } ) -// WrapHandlerWithListener wraps a lambda handler to capture context and adds the DataDog tracing context. -func WrapHandlerWithListener(handler interface{}, hl HandlerListener) interface{} { +// WrapHandlerWithListeners wraps a lambda handler, and calls listeners before and after every invocation. +func WrapHandlerWithListeners(handler interface{}, listeners ...HandlerListener) interface{} { err := validateHandler(handler) if err != nil { @@ -24,10 +29,16 @@ func WrapHandlerWithListener(handler interface{}, hl HandlerListener) interface{ return handler } + // Return custom handler, to be called once per invocation return func(ctx context.Context, msg json.RawMessage) (interface{}, error) { - ctx = hl.HandlerStarted(ctx, msg) + for _, listener := range listeners { + ctx = listener.HandlerStarted(ctx, msg) + } + CurrentContext = ctx result, err := callHandler(ctx, msg, handler) - hl.HandlerFinished(ctx) + for _, listener := range listeners { + listener.HandlerFinished(ctx) + } return result, err } } @@ -68,7 +79,6 @@ func validateHandler(handler interface{}) error { func callHandler(ctx context.Context, msg json.RawMessage, handler interface{}) (interface{}, error) { ev, err := unmarshalEventForHandler(msg, handler) if err != nil { - // TODO Log error here return nil, err } handlerType := reflect.TypeOf(handler) diff --git a/internal/trace/wrap_handler_test.go b/internal/wrapper/wrap_handler_test.go similarity index 86% rename from internal/trace/wrap_handler_test.go rename to internal/wrapper/wrap_handler_test.go index 43e566ac..6a6644fa 100644 --- a/internal/trace/wrap_handler_test.go +++ b/internal/wrapper/wrap_handler_test.go @@ -1,4 +1,4 @@ -package trace +package wrapper import ( "context" @@ -41,7 +41,7 @@ func runHandlerWithJSON(t *testing.T, filename string, handler interface{}) (*mo mhl := mockHandlerListener{} - wrappedHandler := WrapHandlerWithListener(handler, &mhl).(func(context.Context, json.RawMessage) (interface{}, error)) + wrappedHandler := WrapHandlerWithListeners(handler, &mhl).(func(context.Context, json.RawMessage) (interface{}, error)) response, err := wrappedHandler(ctx, *payload) return &mhl, response, err @@ -130,7 +130,7 @@ func TestWrapHandlerAPIGEvent(t *testing.T) { return 5, nil } - _, response, err := runHandlerWithJSON(t, "testdata/apig-event-no-metadata.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/apig-event-no-metadata.json", handler) assert.True(t, called) assert.NoError(t, err) @@ -146,7 +146,7 @@ func TestWrapHandlerNonProxyEvent(t *testing.T) { return 5, nil } - _, response, err := runHandlerWithJSON(t, "testdata/non-proxy-no-metadata.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/non-proxy-no-metadata.json", handler) assert.True(t, called) assert.NoError(t, err) @@ -162,7 +162,7 @@ func TestWrapHandlerEventArgumentOnly(t *testing.T) { return 5, nil } - _, response, err := runHandlerWithJSON(t, "testdata/non-proxy-no-metadata.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/non-proxy-no-metadata.json", handler) assert.True(t, called) assert.NoError(t, err) @@ -177,7 +177,7 @@ func TestWrapHandlerNoArguments(t *testing.T) { return 5, nil } - _, response, err := runHandlerWithJSON(t, "testdata/non-proxy-no-metadata.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/non-proxy-no-metadata.json", handler) assert.True(t, called) assert.NoError(t, err) @@ -192,7 +192,7 @@ func TestWrapHandlerInvalidData(t *testing.T) { return 5, nil } - _, response, err := runHandlerWithJSON(t, "testdata/invalid.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/invalid.json", handler) assert.False(t, called) assert.Error(t, err) @@ -208,7 +208,7 @@ func TestWrapHandlerReturnsError(t *testing.T) { return 5, defaultErr } - _, response, err := runHandlerWithJSON(t, "testdata/non-proxy-no-metadata.json", handler) + _, response, err := runHandlerWithJSON(t, "../testdata/non-proxy-no-metadata.json", handler) assert.True(t, called) assert.Equal(t, defaultErr, err) @@ -222,7 +222,7 @@ func TestWrapHandlerReturnsOriginalHandlerIfInvalid(t *testing.T) { } mhl := mockHandlerListener{} - wrappedHandler := WrapHandlerWithListener(handler, &mhl) + wrappedHandler := WrapHandlerWithListeners(handler, &mhl) assert.Equal(t, reflect.ValueOf(handler).Pointer(), reflect.ValueOf(wrappedHandler).Pointer()) diff --git a/vendor/github.com/cenkalti/backoff/.gitignore b/vendor/github.com/cenkalti/backoff/.gitignore new file mode 100644 index 00000000..00268614 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/.gitignore @@ -0,0 +1,22 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe diff --git a/vendor/github.com/cenkalti/backoff/.travis.yml b/vendor/github.com/cenkalti/backoff/.travis.yml new file mode 100644 index 00000000..47a6a46e --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/.travis.yml @@ -0,0 +1,10 @@ +language: go +go: + - 1.7 + - 1.x + - tip +before_install: + - go get github.com/mattn/goveralls + - go get golang.org/x/tools/cmd/cover +script: + - $HOME/gopath/bin/goveralls -service=travis-ci diff --git a/vendor/github.com/cenkalti/backoff/LICENSE b/vendor/github.com/cenkalti/backoff/LICENSE new file mode 100644 index 00000000..89b81799 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2014 Cenk Altı + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/cenkalti/backoff/README.md b/vendor/github.com/cenkalti/backoff/README.md new file mode 100644 index 00000000..55ebc98f --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/README.md @@ -0,0 +1,30 @@ +# Exponential Backoff [![GoDoc][godoc image]][godoc] [![Build Status][travis image]][travis] [![Coverage Status][coveralls image]][coveralls] + +This is a Go port of the exponential backoff algorithm from [Google's HTTP Client Library for Java][google-http-java-client]. + +[Exponential backoff][exponential backoff wiki] +is an algorithm that uses feedback to multiplicatively decrease the rate of some process, +in order to gradually find an acceptable rate. +The retries exponentially increase and stop increasing when a certain threshold is met. + +## Usage + +See https://godoc.org/github.com/cenkalti/backoff#pkg-examples + +## Contributing + +* I would like to keep this library as small as possible. +* Please don't send a PR without opening an issue and discussing it first. +* If proposed change is not a common use case, I will probably not accept it. + +[godoc]: https://godoc.org/github.com/cenkalti/backoff +[godoc image]: https://godoc.org/github.com/cenkalti/backoff?status.png +[travis]: https://travis-ci.org/cenkalti/backoff +[travis image]: https://travis-ci.org/cenkalti/backoff.png?branch=master +[coveralls]: https://coveralls.io/github/cenkalti/backoff?branch=master +[coveralls image]: https://coveralls.io/repos/github/cenkalti/backoff/badge.svg?branch=master + +[google-http-java-client]: https://github.com/google/google-http-java-client/blob/da1aa993e90285ec18579f1553339b00e19b3ab5/google-http-client/src/main/java/com/google/api/client/util/ExponentialBackOff.java +[exponential backoff wiki]: http://en.wikipedia.org/wiki/Exponential_backoff + +[advanced example]: https://godoc.org/github.com/cenkalti/backoff#example_ diff --git a/vendor/github.com/cenkalti/backoff/backoff.go b/vendor/github.com/cenkalti/backoff/backoff.go new file mode 100644 index 00000000..3676ee40 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/backoff.go @@ -0,0 +1,66 @@ +// Package backoff implements backoff algorithms for retrying operations. +// +// Use Retry function for retrying operations that may fail. +// If Retry does not meet your needs, +// copy/paste the function into your project and modify as you wish. +// +// There is also Ticker type similar to time.Ticker. +// You can use it if you need to work with channels. +// +// See Examples section below for usage examples. +package backoff + +import "time" + +// BackOff is a backoff policy for retrying an operation. +type BackOff interface { + // NextBackOff returns the duration to wait before retrying the operation, + // or backoff. Stop to indicate that no more retries should be made. + // + // Example usage: + // + // duration := backoff.NextBackOff(); + // if (duration == backoff.Stop) { + // // Do not retry operation. + // } else { + // // Sleep for duration and retry operation. + // } + // + NextBackOff() time.Duration + + // Reset to initial state. + Reset() +} + +// Stop indicates that no more retries should be made for use in NextBackOff(). +const Stop time.Duration = -1 + +// ZeroBackOff is a fixed backoff policy whose backoff time is always zero, +// meaning that the operation is retried immediately without waiting, indefinitely. +type ZeroBackOff struct{} + +func (b *ZeroBackOff) Reset() {} + +func (b *ZeroBackOff) NextBackOff() time.Duration { return 0 } + +// StopBackOff is a fixed backoff policy that always returns backoff.Stop for +// NextBackOff(), meaning that the operation should never be retried. +type StopBackOff struct{} + +func (b *StopBackOff) Reset() {} + +func (b *StopBackOff) NextBackOff() time.Duration { return Stop } + +// ConstantBackOff is a backoff policy that always returns the same backoff delay. +// This is in contrast to an exponential backoff policy, +// which returns a delay that grows longer as you call NextBackOff() over and over again. +type ConstantBackOff struct { + Interval time.Duration +} + +func (b *ConstantBackOff) Reset() {} +func (b *ConstantBackOff) NextBackOff() time.Duration { return b.Interval } + +func NewConstantBackOff(d time.Duration) *ConstantBackOff { + return &ConstantBackOff{Interval: d} +} diff --git a/vendor/github.com/cenkalti/backoff/context.go b/vendor/github.com/cenkalti/backoff/context.go new file mode 100644 index 00000000..7706faa2 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/context.go @@ -0,0 +1,63 @@ +package backoff + +import ( + "context" + "time" +) + +// BackOffContext is a backoff policy that stops retrying after the context +// is canceled. +type BackOffContext interface { + BackOff + Context() context.Context +} + +type backOffContext struct { + BackOff + ctx context.Context +} + +// WithContext returns a BackOffContext with context ctx +// +// ctx must not be nil +func WithContext(b BackOff, ctx context.Context) BackOffContext { + if ctx == nil { + panic("nil context") + } + + if b, ok := b.(*backOffContext); ok { + return &backOffContext{ + BackOff: b.BackOff, + ctx: ctx, + } + } + + return &backOffContext{ + BackOff: b, + ctx: ctx, + } +} + +func ensureContext(b BackOff) BackOffContext { + if cb, ok := b.(BackOffContext); ok { + return cb + } + return WithContext(b, context.Background()) +} + +func (b *backOffContext) Context() context.Context { + return b.ctx +} + +func (b *backOffContext) NextBackOff() time.Duration { + select { + case <-b.ctx.Done(): + return Stop + default: + } + next := b.BackOff.NextBackOff() + if deadline, ok := b.ctx.Deadline(); ok && deadline.Sub(time.Now()) < next { + return Stop + } + return next +} diff --git a/vendor/github.com/cenkalti/backoff/exponential.go b/vendor/github.com/cenkalti/backoff/exponential.go new file mode 100644 index 00000000..a031a659 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/exponential.go @@ -0,0 +1,153 @@ +package backoff + +import ( + "math/rand" + "time" +) + +/* +ExponentialBackOff is a backoff implementation that increases the backoff +period for each retry attempt using a randomization function that grows exponentially. + +NextBackOff() is calculated using the following formula: + + randomized interval = + RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor]) + +In other words NextBackOff() will range between the randomization factor +percentage below and above the retry interval. + +For example, given the following parameters: + + RetryInterval = 2 + RandomizationFactor = 0.5 + Multiplier = 2 + +the actual backoff period used in the next retry attempt will range between 1 and 3 seconds, +multiplied by the exponential, that is, between 2 and 6 seconds. + +Note: MaxInterval caps the RetryInterval and not the randomized interval. + +If the time elapsed since an ExponentialBackOff instance is created goes past the +MaxElapsedTime, then the method NextBackOff() starts returning backoff.Stop. + +The elapsed time can be reset by calling Reset(). + +Example: Given the following default arguments, for 10 tries the sequence will be, +and assuming we go over the MaxElapsedTime on the 10th try: + + Request # RetryInterval (seconds) Randomized Interval (seconds) + + 1 0.5 [0.25, 0.75] + 2 0.75 [0.375, 1.125] + 3 1.125 [0.562, 1.687] + 4 1.687 [0.8435, 2.53] + 5 2.53 [1.265, 3.795] + 6 3.795 [1.897, 5.692] + 7 5.692 [2.846, 8.538] + 8 8.538 [4.269, 12.807] + 9 12.807 [6.403, 19.210] + 10 19.210 backoff.Stop + +Note: Implementation is not thread-safe. +*/ +type ExponentialBackOff struct { + InitialInterval time.Duration + RandomizationFactor float64 + Multiplier float64 + MaxInterval time.Duration + // After MaxElapsedTime the ExponentialBackOff stops. + // It never stops if MaxElapsedTime == 0. + MaxElapsedTime time.Duration + Clock Clock + + currentInterval time.Duration + startTime time.Time +} + +// Clock is an interface that returns current time for BackOff. +type Clock interface { + Now() time.Time +} + +// Default values for ExponentialBackOff. +const ( + DefaultInitialInterval = 500 * time.Millisecond + DefaultRandomizationFactor = 0.5 + DefaultMultiplier = 1.5 + DefaultMaxInterval = 60 * time.Second + DefaultMaxElapsedTime = 15 * time.Minute +) + +// NewExponentialBackOff creates an instance of ExponentialBackOff using default values. +func NewExponentialBackOff() *ExponentialBackOff { + b := &ExponentialBackOff{ + InitialInterval: DefaultInitialInterval, + RandomizationFactor: DefaultRandomizationFactor, + Multiplier: DefaultMultiplier, + MaxInterval: DefaultMaxInterval, + MaxElapsedTime: DefaultMaxElapsedTime, + Clock: SystemClock, + } + b.Reset() + return b +} + +type systemClock struct{} + +func (t systemClock) Now() time.Time { + return time.Now() +} + +// SystemClock implements Clock interface that uses time.Now(). +var SystemClock = systemClock{} + +// Reset the interval back to the initial retry interval and restarts the timer. +func (b *ExponentialBackOff) Reset() { + b.currentInterval = b.InitialInterval + b.startTime = b.Clock.Now() +} + +// NextBackOff calculates the next backoff interval using the formula: +// Randomized interval = RetryInterval +/- (RandomizationFactor * RetryInterval) +func (b *ExponentialBackOff) NextBackOff() time.Duration { + // Make sure we have not gone over the maximum elapsed time. + if b.MaxElapsedTime != 0 && b.GetElapsedTime() > b.MaxElapsedTime { + return Stop + } + defer b.incrementCurrentInterval() + return getRandomValueFromInterval(b.RandomizationFactor, rand.Float64(), b.currentInterval) +} + +// GetElapsedTime returns the elapsed time since an ExponentialBackOff instance +// is created and is reset when Reset() is called. +// +// The elapsed time is computed using time.Now().UnixNano(). It is +// safe to call even while the backoff policy is used by a running +// ticker. +func (b *ExponentialBackOff) GetElapsedTime() time.Duration { + return b.Clock.Now().Sub(b.startTime) +} + +// Increments the current interval by multiplying it with the multiplier. +func (b *ExponentialBackOff) incrementCurrentInterval() { + // Check for overflow, if overflow is detected set the current interval to the max interval. + if float64(b.currentInterval) >= float64(b.MaxInterval)/b.Multiplier { + b.currentInterval = b.MaxInterval + } else { + b.currentInterval = time.Duration(float64(b.currentInterval) * b.Multiplier) + } +} + +// Returns a random value from the following interval: +// [randomizationFactor * currentInterval, randomizationFactor * currentInterval]. +func getRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration { + var delta = randomizationFactor * float64(currentInterval) + var minInterval = float64(currentInterval) - delta + var maxInterval = float64(currentInterval) + delta + + // Get a random value from the range [minInterval, maxInterval]. + // The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then + // we want a 33% chance for selecting either 1, 2 or 3. + return time.Duration(minInterval + (random * (maxInterval - minInterval + 1))) +} diff --git a/vendor/github.com/cenkalti/backoff/go.mod b/vendor/github.com/cenkalti/backoff/go.mod new file mode 100644 index 00000000..479e62ad --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/go.mod @@ -0,0 +1,3 @@ +module github.com/cenkalti/backoff/v3 + +go 1.12 diff --git a/vendor/github.com/cenkalti/backoff/retry.go b/vendor/github.com/cenkalti/backoff/retry.go new file mode 100644 index 00000000..e936a506 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/retry.go @@ -0,0 +1,82 @@ +package backoff + +import "time" + +// An Operation is executing by Retry() or RetryNotify(). +// The operation will be retried using a backoff policy if it returns an error. +type Operation func() error + +// Notify is a notify-on-error function. It receives an operation error and +// backoff delay if the operation failed (with an error). +// +// NOTE that if the backoff policy stated to stop retrying, +// the notify function isn't called. +type Notify func(error, time.Duration) + +// Retry the operation o until it does not return error or BackOff stops. +// o is guaranteed to be run at least once. +// +// If o returns a *PermanentError, the operation is not retried, and the +// wrapped error is returned. +// +// Retry sleeps the goroutine for the duration returned by BackOff after a +// failed operation returns. +func Retry(o Operation, b BackOff) error { return RetryNotify(o, b, nil) } + +// RetryNotify calls notify function with the error and wait duration +// for each failed attempt before sleep. +func RetryNotify(operation Operation, b BackOff, notify Notify) error { + var err error + var next time.Duration + var t *time.Timer + + cb := ensureContext(b) + + b.Reset() + for { + if err = operation(); err == nil { + return nil + } + + if permanent, ok := err.(*PermanentError); ok { + return permanent.Err + } + + if next = cb.NextBackOff(); next == Stop { + return err + } + + if notify != nil { + notify(err, next) + } + + if t == nil { + t = time.NewTimer(next) + defer t.Stop() + } else { + t.Reset(next) + } + + select { + case <-cb.Context().Done(): + return err + case <-t.C: + } + } +} + +// PermanentError signals that the operation should not be retried. +type PermanentError struct { + Err error +} + +func (e *PermanentError) Error() string { + return e.Err.Error() +} + +// Permanent wraps the given err in a *PermanentError. +func Permanent(err error) *PermanentError { + return &PermanentError{ + Err: err, + } +} diff --git a/vendor/github.com/cenkalti/backoff/ticker.go b/vendor/github.com/cenkalti/backoff/ticker.go new file mode 100644 index 00000000..e41084b0 --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/ticker.go @@ -0,0 +1,82 @@ +package backoff + +import ( + "sync" + "time" +) + +// Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff. +// +// Ticks will continue to arrive when the previous operation is still running, +// so operations that take a while to fail could run in quick succession. +type Ticker struct { + C <-chan time.Time + c chan time.Time + b BackOffContext + stop chan struct{} + stopOnce sync.Once +} + +// NewTicker returns a new Ticker containing a channel that will send +// the time at times specified by the BackOff argument. Ticker is +// guaranteed to tick at least once. The channel is closed when Stop +// method is called or BackOff stops. It is not safe to manipulate the +// provided backoff policy (notably calling NextBackOff or Reset) +// while the ticker is running. +func NewTicker(b BackOff) *Ticker { + c := make(chan time.Time) + t := &Ticker{ + C: c, + c: c, + b: ensureContext(b), + stop: make(chan struct{}), + } + t.b.Reset() + go t.run() + return t +} + +// Stop turns off a ticker. After Stop, no more ticks will be sent. +func (t *Ticker) Stop() { + t.stopOnce.Do(func() { close(t.stop) }) +} + +func (t *Ticker) run() { + c := t.c + defer close(c) + + // Ticker is guaranteed to tick at least once. + afterC := t.send(time.Now()) + + for { + if afterC == nil { + return + } + + select { + case tick := <-afterC: + afterC = t.send(tick) + case <-t.stop: + t.c = nil // Prevent future ticks from being sent to the channel. + return + case <-t.b.Context().Done(): + return + } + } +} + +func (t *Ticker) send(tick time.Time) <-chan time.Time { + select { + case t.c <- tick: + case <-t.stop: + return nil + } + + next := t.b.NextBackOff() + if next == Stop { + t.Stop() + return nil + } + + return time.After(next) +} diff --git a/vendor/github.com/cenkalti/backoff/tries.go b/vendor/github.com/cenkalti/backoff/tries.go new file mode 100644 index 00000000..cfeefd9b --- /dev/null +++ b/vendor/github.com/cenkalti/backoff/tries.go @@ -0,0 +1,35 @@ +package backoff + +import "time" + +/* +WithMaxRetries creates a wrapper around another BackOff, which will +return Stop if NextBackOff() has been called too many times since +the last time Reset() was called + +Note: Implementation is not thread-safe. +*/ +func WithMaxRetries(b BackOff, max uint64) BackOff { + return &backOffTries{delegate: b, maxTries: max} +} + +type backOffTries struct { + delegate BackOff + maxTries uint64 + numTries uint64 +} + +func (b *backOffTries) NextBackOff() time.Duration { + if b.maxTries > 0 { + if b.maxTries <= b.numTries { + return Stop + } + b.numTries++ + } + return b.delegate.NextBackOff() +} + +func (b *backOffTries) Reset() { + b.numTries = 0 + b.delegate.Reset() +}