diff --git a/CHANGELOG.md b/CHANGELOG.md index 340b879f0..04e17cca2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http ### Added +- Support `github.com/go-redis/redis/v8`([#1522](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1525)) - Support `SELECT`, `INSERT`, `UPDATE`, and `DELETE` for database span names and `db.operation.name` attribute. ([#1253](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1253)) - Support `go.opentelemetry.io/otel@v1.33.0`. ([#1417](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1417)) - Support `google.golang.org/grpc` `1.69.0`. ([#1417](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/1417)) diff --git a/COMPATIBILITY.md b/COMPATIBILITY.md index 87bd6e0e7..f87c5b7f9 100644 --- a/COMPATIBILITY.md +++ b/COMPATIBILITY.md @@ -19,6 +19,7 @@ Tracing instrumentation is provided for the following Go libraries. - [`github.com/segmentio/kafka-go`](#githubcomsegmentiokafka-go) - [`google.golang.org/grpc`](#googlegolangorggrpc) - [`net/http`](#nethttp) +- [`github.com/go-redis/redis/v8`](#githubcomgo-redisredisv8) ### database/sql @@ -51,3 +52,9 @@ Supported version ranges: Supported version ranges: - `go1.12` to `go1.23.4` + +### github.com/go-redis/redis/v8 + +[Package documentation](https://pkg.go.dev/github.com/go-redis/redis/v8) + +- `v8.0.0` to `v8.11.5` \ No newline at end of file diff --git a/examples/httpRedis/Dockerfile b/examples/httpRedis/Dockerfile new file mode 100644 index 000000000..e8983bdf0 --- /dev/null +++ b/examples/httpRedis/Dockerfile @@ -0,0 +1,7 @@ +FROM golang:1.23.4@sha256:70031844b8c225351d0bb63e2c383f80db85d92ba894e3da7e13bcf80efa9a37 +WORKDIR /app +COPY ./*.go . +RUN go mod init main +RUN go mod tidy +RUN go build -o main +ENTRYPOINT ["/app/main"] \ No newline at end of file diff --git a/examples/httpRedis/README.md b/examples/httpRedis/README.md new file mode 100644 index 000000000..e9003bfd9 --- /dev/null +++ b/examples/httpRedis/README.md @@ -0,0 +1,82 @@ +# Example of Auto instrumentation of HTTP server + Redis + +This example only test [go-redis/v8](https://pkg.go.dev/github.com/go-redis/redis/v8). + +**It is highly recommended to deploy the demo using docker compose**. + + +## Docker compose + +Setup the example: + +``` +docker compose up +``` + +Add a key-value pair to redis using below command: + +``` +curl -X POST http://localhost:8080/set -d '{"key":"name", "value":"Alice"}' +``` + +Every hit to the server should generate a trace that we can observe in [Jaeger UI](http://localhost:16686/). + + +## Local deployment + +### Setup OpenTelemetry Collector and Jaeger + +You can setup a local [OpenTelemetry Collector](https://github.com/open-telemetry/opentelemetry-collector) and start it. + +Assuming you've exposed port `4318`, and configured the [Jaeger](http://jaegertracing.io/docs) backend service in collector. + + +### Setup auto-instrumentation binary + +Build the binary + +``` +make build +``` + +You will get binary `otel-go-instrumentation` in current directory. Then start instrumenting the target app + +``` +sudo OTEL_GO_AUTO_TARGET_EXE= OTEL_SERVICE_NAME=eBPFApp OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 OTEL_GO_AUTO_INCLUDE_DB_STATEMENT=true ./otel-go-instrumentation +``` + +### Setup and run the demo +Build and run + +``` +go build -o main + +./main +``` + +Set + +``` +curl -X POST http://localhost:8080/set -d '{"key":"name", "value":"Alice"}' +``` + +Get + +``` +curl -X POST http://localhost:8080/get -d '{"key":"name"}' +``` + +Sadd + +``` +curl -X POST http://localhost:8080/sadd -d '{"key":"mySet", "values":["val1", "val2", "val3", "val4"]}' +``` + +Pipelining mode + +``` +curl -X POST http://localhost:8080/pipeline -d '{ "commands": [ {"command": "set", "key": "key1", "value": "value1"}, {"command": "get", "key": "key1"}, {"command": "sadd", "key": "mySet", "value": "value1"} ] }' +``` + + +You can observe the trace in [Jaeger UI](http://localhost:16686/). diff --git a/examples/httpRedis/docker-compose.yaml b/examples/httpRedis/docker-compose.yaml new file mode 100644 index 000000000..db715211e --- /dev/null +++ b/examples/httpRedis/docker-compose.yaml @@ -0,0 +1,70 @@ +version: "3.9" + +networks: + default: + name: http-redis-network + driver: bridge + +services: + http-redis: + depends_on: + - jaeger + - redis + build: + context: . + dockerfile: ./Dockerfile + pid: "host" + ports: + - "8080:8080" + volumes: + - /proc:/host/proc + environment: + - REDIS_HOST=redis + - REDIS_PORT=6379 + + redis: + image: redis:latest + ports: + - "6379:6379" + volumes: + - redis_data:/data + restart: unless-stopped + deploy: + resources: + limits: + memory: 256M + + go-auto: + depends_on: + - http-redis + build: + context: ../.. + dockerfile: Dockerfile + privileged: true + pid: "host" + environment: + - OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4318 + - OTEL_GO_AUTO_TARGET_EXE=/app/main + - OTEL_GO_AUTO_INCLUDE_DB_STATEMENT=true + - OTEL_SERVICE_NAME=eBPF-httpRedis + - OTEL_PROPAGATORS=tracecontext,baggage + - CGO_ENABLED=1 + volumes: + - /proc:/host/proc + + jaeger: + image: jaegertracing/all-in-one:1.60@sha256:4fd2d70fa347d6a47e79fcb06b1c177e6079f92cba88b083153d56263082135e + ports: + - "16686:16686" + - "14268:14268" + environment: + - COLLECTOR_OTLP_ENABLED=true + - LOG_LEVEL=debug + deploy: + resources: + limits: + memory: 300M + restart: unless-stopped + +volumes: + redis_data: \ No newline at end of file diff --git a/examples/httpRedis/go.mod b/examples/httpRedis/go.mod new file mode 100644 index 000000000..6c85e1931 --- /dev/null +++ b/examples/httpRedis/go.mod @@ -0,0 +1,10 @@ +module go.opentelemetry.io/auto/examples/httpRedis + +go 1.23.1 + +require github.com/go-redis/redis/v8 v8.11.5 + +require ( + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect +) diff --git a/examples/httpRedis/go.sum b/examples/httpRedis/go.sum new file mode 100644 index 000000000..17906ec30 --- /dev/null +++ b/examples/httpRedis/go.sum @@ -0,0 +1,24 @@ +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/examples/httpRedis/main.go b/examples/httpRedis/main.go new file mode 100644 index 000000000..44d837b55 --- /dev/null +++ b/examples/httpRedis/main.go @@ -0,0 +1,226 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "time" + + "github.com/go-redis/redis/v8" +) + +var rdb *redis.Client + +func main() { + initRedis() + + http.HandleFunc("/set", setHandler) + http.HandleFunc("/get", getHandler) + http.HandleFunc("/setex", setexHandler) + http.HandleFunc("/sadd", saddHandler) + http.HandleFunc("/pipeline", pipelineHandler) + + log.Println("Starting server on :8080") + log.Fatal(http.ListenAndServe(":8080", nil)) +} + +func initRedis() { + redisHost := os.Getenv("REDIS_HOST") + redisPort := os.Getenv("REDIS_PORT") + redisAddr := fmt.Sprintf("%s:%s", redisHost, redisPort) + if redisHost == "" || redisPort == "" { + redisAddr = "localhost:6379" + } + rdb = redis.NewClient(&redis.Options{ + Addr: redisAddr, + Password: "", + DB: 0, + }) +} + +func setHandler(w http.ResponseWriter, r *http.Request) { + var req struct { + Key string `json:"key"` + Value string `json:"value"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err := rdb.Set(r.Context(), req.Key, req.Value, 0).Err() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, err = w.Write([]byte("Key-Value pair set successfully\n")) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func getHandler(w http.ResponseWriter, r *http.Request) { + var req struct { + Key string `json:"key"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + value, err := rdb.Get(r.Context(), req.Key).Result() + if err == redis.Nil { + http.Error(w, "Key does not exist", http.StatusNotFound) + return + } else if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(map[string]string{ + "key": req.Key, + "value": value, + }) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func setexHandler(w http.ResponseWriter, r *http.Request) { + var req struct { + Key string `json:"key"` + Value string `json:"value"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + result, err := rdb.SetEX(r.Context(), req.Key, req.Value, time.Second).Result() + if err == redis.Nil { + http.Error(w, "Key does not exist", http.StatusNotFound) + return + } else if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(map[string]string{ + "key": req.Key, + "value": result, + }) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func saddHandler(w http.ResponseWriter, r *http.Request) { + var req struct { + Key string `json:"key"` + Values []string `json:"values"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + err := rdb.SAdd(r.Context(), req.Key, req.Values).Err() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(map[string]interface{}{ + "key": req.Key, + "values": req.Values, + "status": "added successfully", + }) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func pipelineHandler(w http.ResponseWriter, r *http.Request) { + var req struct { + Commands []struct { + Command string `json:"command"` + Key string `json:"key"` + Value string `json:"value"` + } `json:"commands"` + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + pipe := rdb.Pipeline() + + var results []redis.Cmder + for _, cmd := range req.Commands { + switch cmd.Command { + case "set": + results = append(results, pipe.Set(r.Context(), cmd.Key, cmd.Value, 2)) + case "get": + results = append(results, pipe.Get(r.Context(), cmd.Key)) + case "sadd": + results = append(results, pipe.SAdd(r.Context(), cmd.Key, cmd.Value)) + default: + http.Error(w, "Unsupported command", http.StatusBadRequest) + return + } + } + + _, err := pipe.Exec(r.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + var response []map[string]interface{} + for _, result := range results { + if result.Err() != nil { + response = append(response, map[string]interface{}{ + "command": result.Name(), + "error": result.Err().Error(), + }) + } else { + switch cmd := result.(type) { + case *redis.StatusCmd: + response = append(response, map[string]interface{}{ + "command": result.Name(), + "result": cmd.Val(), + }) + case *redis.StringCmd: + response = append(response, map[string]interface{}{ + "command": result.Name(), + "result": cmd.Val(), + }) + case *redis.IntCmd: + response = append(response, map[string]interface{}{ + "command": result.Name(), + "result": cmd.Val(), + }) + default: + response = append(response, map[string]interface{}{ + "command": result.Name(), + "result": "unknown result type", + }) + } + } + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, "Failed to encode response", http.StatusInternalServerError) + log.Printf("Failed to encode response: %v", err) + return + } +} diff --git a/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/bpf/probe.bpf.c b/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/bpf/probe.bpf.c new file mode 100644 index 000000000..a7a87e400 --- /dev/null +++ b/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/bpf/probe.bpf.c @@ -0,0 +1,175 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "arguments.h" +#include "trace/span_context.h" +#include "go_context.h" +#include "go_types.h" +#include "uprobe.h" +#include "trace/start_span.h" + +char __license[] SEC("license") = "Dual MIT/GPL"; + +#define MAX_QUERY_SIZE 256 +#define MAX_CONCURRENT 50 +#define MAX_RESP_BUF_SIZE 256 +#define MAX_SUBCMD_CNT 10 + +struct sql_request_t { + BASE_SPAN_PROPERTIES + u8 resp_msg[MAX_QUERY_SIZE]; + int segs; // segs only be set in redis pipeline mode +}; + +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __type(key, void*); + __type(value, struct sql_request_t); + __uint(max_entries, MAX_CONCURRENT); +} sql_events SEC(".maps"); + +struct +{ + __uint(type, BPF_MAP_TYPE_LRU_HASH); + __type(key, void*); + __type(value, void*); + __uint(max_entries, MAX_CONCURRENT); +} writer_conn SEC(".maps"); + +// Storage the segments of db stmt. Example: `set name alice px 2`, the segments is 5. +struct +{ + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __uint(key_size, sizeof(u32)); + __uint(value_size, sizeof(u64)); + __uint(max_entries, 1); +} stmt_segments SEC(".maps"); + +// Injected in init +volatile const bool should_include_db_statement; + +// func (c *baseClient) generalProcessPipeline(ctx context.Context, cmds []Cmder, p pipelineProcessor) error +SEC("uprobe/processPipeline") +int uprobe_processPipeline(struct pt_regs *ctx) { + if (!should_include_db_statement) { + return 0; + } + u64 cmds_len_pos = 5; + u64 cmds_len = (u64)get_argument(ctx, cmds_len_pos); + + u64 cmds_ptr_pos = 4; + u64 cmds_ptr = (u64)get_argument(ctx, cmds_ptr_pos); + + u64 ele_ptr = 0; + u64 segs = 0; + for (u64 i = 0; i < MAX_SUBCMD_CNT; i++) { + if (i >= cmds_len) { + break; + } + // 8 = iface.tab + bpf_probe_read(&ele_ptr, sizeof(ele_ptr), (void *)(cmds_ptr+8)); + u64 subcmd_segs = 0; + /* + type StatusCmd struct { + ctx context.Context + args []interface{} <----- target field + err error + keyPos int8 + _readTimeout *time.Duration + val string + } + */ + // 24 = 16(StatusCmd.ctx) + 8(StatusCmd.args.array) + bpf_probe_read(&subcmd_segs, sizeof(subcmd_segs), (void *)(ele_ptr+24)); + cmds_ptr += 16; + segs += subcmd_segs; + } + u32 map_id = 0; + bpf_map_update_elem(&stmt_segments, &map_id, &segs, BPF_ANY); + return 0; +} + +// func (cn *Conn) WithWriter(ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error) +SEC("uprobe/WithWriter") +int uprobe_WithWriter(struct pt_regs *ctx) { + void *conn_ptr = get_argument(ctx, 1); + + struct go_iface go_context = {0}; + get_Go_context(ctx, 2, 0, true, &go_context); + + struct sql_request_t sql_request = {0}; + sql_request.start_time = bpf_ktime_get_ns(); + get_Go_context(ctx, 2, 0, true, &go_context); + start_span_params_t start_span_params = { + .ctx = ctx, + .go_context = &go_context, + .psc = &sql_request.psc, + .sc = &sql_request.sc, + .get_parent_span_context_fn = NULL, + .get_parent_span_context_arg = NULL, + }; + start_span(&start_span_params); + + void *key = get_consistent_key(ctx, go_context.data); + bpf_map_update_elem(&writer_conn, &key, &conn_ptr, 0); + bpf_map_update_elem(&sql_events, &key, &sql_request, 0); + return 0; +} + + +// func (cn *Conn) WithWriter(ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error) +SEC("uprobe/WithWriter") +int uprobe_WithWriter_Returns(struct pt_regs *ctx) { + u64 end_time = bpf_ktime_get_ns(); + + int segs = 0; + u32 map_id = 0; + u64 *value; + value = bpf_map_lookup_elem(&stmt_segments, &map_id); + if (value == NULL) { + bpf_printk("map stmt_segments lookup failed"); + } else { + segs = (int)*value; + } + + struct go_iface go_context = {0}; + get_Go_context(ctx, 2, 0, true, &go_context); + void *key = get_consistent_key(ctx, go_context.data); + void **conn_ptr = bpf_map_lookup_elem(&writer_conn, &key); + if (!conn_ptr) { + bpf_printk("map writer_conn lookup failed"); + return 0; + } + + u64 bw_offset = 32; + u64 bw_ptr; + bpf_probe_read(&bw_ptr, sizeof(bw_ptr), (void *)(*conn_ptr+bw_offset)); + + u64 buf_ele_ptr; + bpf_probe_read(&buf_ele_ptr, sizeof(buf_ele_ptr), (void *)(bw_ptr + 16)); + + struct sql_request_t *sql_request = bpf_map_lookup_elem(&sql_events, &key); + if (!sql_request) { + bpf_printk("[uprobe_WithWriter_Returns] map sql_request_t looup failed"); + return 0; + } + + // Only obtain resp buf when necessary + if (should_include_db_statement) { + u8 buf_ele; + for (u64 i = 0; i < MAX_RESP_BUF_SIZE; i++) { + bpf_probe_read(&buf_ele, sizeof(buf_ele), (void *)buf_ele_ptr); + sql_request->resp_msg[i] = buf_ele; + buf_ele_ptr += 1; + } + } + sql_request->segs = segs; + sql_request->end_time = end_time; + output_span_event(ctx, sql_request, sizeof(*sql_request), &sql_request->sc); + stop_tracking_span(&sql_request->sc, &sql_request->psc); + bpf_map_delete_elem(&writer_conn, &key); + + u64 zero = 0; + bpf_map_update_elem(&stmt_segments, &map_id, &zero, BPF_ANY); + return 0; +} diff --git a/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/bpf_arm64_bpfel.go b/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/bpf_arm64_bpfel.go new file mode 100644 index 000000000..b47f59915 --- /dev/null +++ b/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/bpf_arm64_bpfel.go @@ -0,0 +1,197 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build arm64 + +package redis + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type bpfSliceArrayBuff struct{ Buff [1024]uint8 } + +type bpfSpanContext struct { + TraceID [16]uint8 + SpanID [8]uint8 + TraceFlags uint8 + Padding [7]uint8 +} + +type bpfSqlRequestT struct { + StartTime uint64 + EndTime uint64 + Sc bpfSpanContext + Psc bpfSpanContext + RespMsg [256]uint8 + Segs int32 + _ [4]byte +} + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs + bpfVariableSpecs +} + +// bpfProgramSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + UprobeWithWriter *ebpf.ProgramSpec `ebpf:"uprobe_WithWriter"` + UprobeWithWriterReturns *ebpf.ProgramSpec `ebpf:"uprobe_WithWriter_Returns"` + UprobeProcessPipeline *ebpf.ProgramSpec `ebpf:"uprobe_processPipeline"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + AllocMap *ebpf.MapSpec `ebpf:"alloc_map"` + Events *ebpf.MapSpec `ebpf:"events"` + GoContextToSc *ebpf.MapSpec `ebpf:"go_context_to_sc"` + ProbeActiveSamplerMap *ebpf.MapSpec `ebpf:"probe_active_sampler_map"` + SamplersConfigMap *ebpf.MapSpec `ebpf:"samplers_config_map"` + SliceArrayBuffMap *ebpf.MapSpec `ebpf:"slice_array_buff_map"` + SqlEvents *ebpf.MapSpec `ebpf:"sql_events"` + StmtSegments *ebpf.MapSpec `ebpf:"stmt_segments"` + TrackedSpansBySc *ebpf.MapSpec `ebpf:"tracked_spans_by_sc"` + WriterConn *ebpf.MapSpec `ebpf:"writer_conn"` +} + +// bpfVariableSpecs contains global variables before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfVariableSpecs struct { + EndAddr *ebpf.VariableSpec `ebpf:"end_addr"` + Hex *ebpf.VariableSpec `ebpf:"hex"` + IsRegistersAbi *ebpf.VariableSpec `ebpf:"is_registers_abi"` + ShouldIncludeDbStatement *ebpf.VariableSpec `ebpf:"should_include_db_statement"` + StartAddr *ebpf.VariableSpec `ebpf:"start_addr"` + TotalCpus *ebpf.VariableSpec `ebpf:"total_cpus"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps + bpfVariables +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + AllocMap *ebpf.Map `ebpf:"alloc_map"` + Events *ebpf.Map `ebpf:"events"` + GoContextToSc *ebpf.Map `ebpf:"go_context_to_sc"` + ProbeActiveSamplerMap *ebpf.Map `ebpf:"probe_active_sampler_map"` + SamplersConfigMap *ebpf.Map `ebpf:"samplers_config_map"` + SliceArrayBuffMap *ebpf.Map `ebpf:"slice_array_buff_map"` + SqlEvents *ebpf.Map `ebpf:"sql_events"` + StmtSegments *ebpf.Map `ebpf:"stmt_segments"` + TrackedSpansBySc *ebpf.Map `ebpf:"tracked_spans_by_sc"` + WriterConn *ebpf.Map `ebpf:"writer_conn"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.AllocMap, + m.Events, + m.GoContextToSc, + m.ProbeActiveSamplerMap, + m.SamplersConfigMap, + m.SliceArrayBuffMap, + m.SqlEvents, + m.StmtSegments, + m.TrackedSpansBySc, + m.WriterConn, + ) +} + +// bpfVariables contains all global variables after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfVariables struct { + EndAddr *ebpf.Variable `ebpf:"end_addr"` + Hex *ebpf.Variable `ebpf:"hex"` + IsRegistersAbi *ebpf.Variable `ebpf:"is_registers_abi"` + ShouldIncludeDbStatement *ebpf.Variable `ebpf:"should_include_db_statement"` + StartAddr *ebpf.Variable `ebpf:"start_addr"` + TotalCpus *ebpf.Variable `ebpf:"total_cpus"` +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + UprobeWithWriter *ebpf.Program `ebpf:"uprobe_WithWriter"` + UprobeWithWriterReturns *ebpf.Program `ebpf:"uprobe_WithWriter_Returns"` + UprobeProcessPipeline *ebpf.Program `ebpf:"uprobe_processPipeline"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.UprobeWithWriter, + p.UprobeWithWriterReturns, + p.UprobeProcessPipeline, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_arm64_bpfel.o +var _BpfBytes []byte diff --git a/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/bpf_x86_bpfel.go b/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/bpf_x86_bpfel.go new file mode 100644 index 000000000..d49caac18 --- /dev/null +++ b/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/bpf_x86_bpfel.go @@ -0,0 +1,197 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build 386 || amd64 + +package redis + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type bpfSliceArrayBuff struct{ Buff [1024]uint8 } + +type bpfSpanContext struct { + TraceID [16]uint8 + SpanID [8]uint8 + TraceFlags uint8 + Padding [7]uint8 +} + +type bpfSqlRequestT struct { + StartTime uint64 + EndTime uint64 + Sc bpfSpanContext + Psc bpfSpanContext + RespMsg [256]uint8 + Segs int32 + _ [4]byte +} + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs + bpfVariableSpecs +} + +// bpfProgramSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + UprobeWithWriter *ebpf.ProgramSpec `ebpf:"uprobe_WithWriter"` + UprobeWithWriterReturns *ebpf.ProgramSpec `ebpf:"uprobe_WithWriter_Returns"` + UprobeProcessPipeline *ebpf.ProgramSpec `ebpf:"uprobe_processPipeline"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + AllocMap *ebpf.MapSpec `ebpf:"alloc_map"` + Events *ebpf.MapSpec `ebpf:"events"` + GoContextToSc *ebpf.MapSpec `ebpf:"go_context_to_sc"` + ProbeActiveSamplerMap *ebpf.MapSpec `ebpf:"probe_active_sampler_map"` + SamplersConfigMap *ebpf.MapSpec `ebpf:"samplers_config_map"` + SliceArrayBuffMap *ebpf.MapSpec `ebpf:"slice_array_buff_map"` + SqlEvents *ebpf.MapSpec `ebpf:"sql_events"` + StmtSegments *ebpf.MapSpec `ebpf:"stmt_segments"` + TrackedSpansBySc *ebpf.MapSpec `ebpf:"tracked_spans_by_sc"` + WriterConn *ebpf.MapSpec `ebpf:"writer_conn"` +} + +// bpfVariableSpecs contains global variables before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfVariableSpecs struct { + EndAddr *ebpf.VariableSpec `ebpf:"end_addr"` + Hex *ebpf.VariableSpec `ebpf:"hex"` + IsRegistersAbi *ebpf.VariableSpec `ebpf:"is_registers_abi"` + ShouldIncludeDbStatement *ebpf.VariableSpec `ebpf:"should_include_db_statement"` + StartAddr *ebpf.VariableSpec `ebpf:"start_addr"` + TotalCpus *ebpf.VariableSpec `ebpf:"total_cpus"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps + bpfVariables +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + AllocMap *ebpf.Map `ebpf:"alloc_map"` + Events *ebpf.Map `ebpf:"events"` + GoContextToSc *ebpf.Map `ebpf:"go_context_to_sc"` + ProbeActiveSamplerMap *ebpf.Map `ebpf:"probe_active_sampler_map"` + SamplersConfigMap *ebpf.Map `ebpf:"samplers_config_map"` + SliceArrayBuffMap *ebpf.Map `ebpf:"slice_array_buff_map"` + SqlEvents *ebpf.Map `ebpf:"sql_events"` + StmtSegments *ebpf.Map `ebpf:"stmt_segments"` + TrackedSpansBySc *ebpf.Map `ebpf:"tracked_spans_by_sc"` + WriterConn *ebpf.Map `ebpf:"writer_conn"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.AllocMap, + m.Events, + m.GoContextToSc, + m.ProbeActiveSamplerMap, + m.SamplersConfigMap, + m.SliceArrayBuffMap, + m.SqlEvents, + m.StmtSegments, + m.TrackedSpansBySc, + m.WriterConn, + ) +} + +// bpfVariables contains all global variables after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfVariables struct { + EndAddr *ebpf.Variable `ebpf:"end_addr"` + Hex *ebpf.Variable `ebpf:"hex"` + IsRegistersAbi *ebpf.Variable `ebpf:"is_registers_abi"` + ShouldIncludeDbStatement *ebpf.Variable `ebpf:"should_include_db_statement"` + StartAddr *ebpf.Variable `ebpf:"start_addr"` + TotalCpus *ebpf.Variable `ebpf:"total_cpus"` +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + UprobeWithWriter *ebpf.Program `ebpf:"uprobe_WithWriter"` + UprobeWithWriterReturns *ebpf.Program `ebpf:"uprobe_WithWriter_Returns"` + UprobeProcessPipeline *ebpf.Program `ebpf:"uprobe_processPipeline"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.UprobeWithWriter, + p.UprobeWithWriterReturns, + p.UprobeProcessPipeline, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_x86_bpfel.o +var _BpfBytes []byte diff --git a/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/probe.go b/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/probe.go new file mode 100644 index 000000000..c661b2a8e --- /dev/null +++ b/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/probe.go @@ -0,0 +1,144 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package redis + +import ( + "log/slog" + "os" + "strconv" + "strings" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/probe" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" +) + +//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target amd64,arm64 bpf ./bpf/probe.bpf.c + +const ( + // pkg is the package being instrumented. + pkg = "github.com/go-redis/redis/v8" + + // IncludeDBStatementEnvVar is the environment variable to opt-in for sql query inclusion in the trace. + IncludeDBStatementEnvVar = "OTEL_GO_AUTO_INCLUDE_DB_STATEMENT" + + // TODO: + // ParseDBStatementEnvVar is the environment variable to opt-in for sql query operation in the trace. + // ParseDBStatementEnvVar = "OTEL_GO_AUTO_PARSE_DB_STATEMENT". +) + +// New returns a new [probe.Probe]. +func New(logger *slog.Logger, version string) probe.Probe { + id := probe.ID{ + SpanKind: trace.SpanKindClient, + InstrumentedPkg: pkg, + } + return &probe.SpanProducer[bpfObjects, event]{ + Base: probe.Base[bpfObjects, event]{ + ID: id, + Logger: logger, + Consts: []probe.Const{ + probe.RegistersABIConst{}, + probe.AllocationConst{}, + probe.KeyValConst{ + Key: "should_include_db_statement", + Val: shouldIncludeDBStatement(), + }, + }, + Uprobes: []probe.Uprobe{ + { + Sym: "github.com/go-redis/redis/v8/internal/pool.(*Conn).WithWriter", + EntryProbe: "uprobe_WithWriter", + ReturnProbe: "uprobe_WithWriter_Returns", + FailureMode: probe.FailureModeIgnore, + }, + // `go-redis/v9` requires comprehensive testing. + { + Sym: "github.com/redis/go-redis/v9/internal/pool.(*Conn).WithWriter", + EntryProbe: "uprobe_WithWriter", + ReturnProbe: "uprobe_WithWriter_Returns", + FailureMode: probe.FailureModeIgnore, + }, + + // Pipelining mode + { + Sym: "github.com/go-redis/redis/v8.(*baseClient).generalProcessPipeline", + EntryProbe: "uprobe_processPipeline", + ReturnProbe: "uprobe_processPipeline_Returns", + FailureMode: probe.FailureModeIgnore, + }, + { + Sym: "github.com/redis/go-redis/v9.(*baseClient).generalProcessPipeline", + EntryProbe: "uprobe_processPipeline", + ReturnProbe: "uprobe_processPipeline_Returns", + FailureMode: probe.FailureModeIgnore, + }, + }, + + SpecFn: loadBpf, + }, + Version: version, + SchemaURL: semconv.SchemaURL, + ProcessFn: processFn, + } +} + +// event represents an event in an SQL database +// request-response. +type event struct { + context.BaseSpanProperties + RespMsg [256]byte + Segs int32 // Only be set under pipelining mode +} + +func processFn(e *event) ptrace.SpanSlice { + spans := ptrace.NewSpanSlice() + span := spans.AppendEmpty() + span.SetName("DB") + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(e.StartTime)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(e.EndTime)) + span.SetTraceID(pcommon.TraceID(e.SpanContext.TraceID)) + span.SetSpanID(pcommon.SpanID(e.SpanContext.SpanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + + if e.ParentSpanContext.SpanID.IsValid() { + span.SetParentSpanID(pcommon.SpanID(e.ParentSpanContext.SpanID)) + } + shouldIncludeStmt := shouldIncludeDBStatement() + stmts, err := utils.ParseRESP(e.Segs, e.RespMsg[:]) + var query string + if err != nil && shouldIncludeStmt { + query = "[DB STATEMENT NOT SUPPORTED]" + } else if shouldIncludeStmt { + query = strings.Join(stmts, ", ") + } + if query != "" { + span.Attributes().PutStr(string(semconv.DBQueryTextKey), query) + } + + // TODO: + // Add attr semconv.DBOperationNameKey(db.operation.name) & semconv.DBCollectionNameKey(db.collection.name) + // which means to complete the logic like `ParseDBStatementEnvVar` in database/sql + + return spans +} + +// shouldIncludeDBStatement returns if the user has configured SQL queries to be included. +func shouldIncludeDBStatement() bool { + val := os.Getenv(IncludeDBStatementEnvVar) + if val != "" { + boolVal, err := strconv.ParseBool(val) + if err == nil { + return boolVal + } + } + + return false +} diff --git a/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/probe_test.go b/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/probe_test.go new file mode 100644 index 000000000..ebb56af33 --- /dev/null +++ b/internal/pkg/instrumentation/bpf/github.com/redis/go-redis/probe_test.go @@ -0,0 +1,55 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package redis + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + semconv "go.opentelemetry.io/otel/semconv/v1.26.0" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/auto/internal/pkg/instrumentation/context" + "go.opentelemetry.io/auto/internal/pkg/instrumentation/utils" +) + +func TestProbeConvertEvent(t *testing.T) { + t.Setenv(IncludeDBStatementEnvVar, "true") + start := time.Unix(0, time.Now().UnixNano()) // No wall clock. + end := start.Add(1 * time.Second) + + startOffset := utils.TimeToBootOffset(start) + endOffset := utils.TimeToBootOffset(end) + + traceID := trace.TraceID{1} + spanID := trace.SpanID{1} + + got := processFn(&event{ + BaseSpanProperties: context.BaseSpanProperties{ + StartTime: startOffset, + EndTime: endOffset, + SpanContext: context.EBPFSpanContext{TraceID: traceID, SpanID: spanID}, + }, + // "set key value" + RespMsg: [256]byte{0x2A, 0x33, 0x0D, 0x0A, 0x24, 0x33, 0x0D, 0x0A, 0x73, 0x65, 0x74, 0x0D, 0x0A, 0x24, 0x33, 0x0D, 0x0A, 0x6B, 0x65, 0x79, 0x0D, 0x0A, 0x24, 0x35, 0x0D, 0x0A, 0x76, 0x61, 0x6C, 0x75, 0x65, 0x0D, 0x0A}, + }) + + want := func() ptrace.SpanSlice { + spans := ptrace.NewSpanSlice() + span := spans.AppendEmpty() + span.SetName("DB") + span.SetKind(ptrace.SpanKindClient) + span.SetStartTimestamp(utils.BootOffsetToTimestamp(startOffset)) + span.SetEndTimestamp(utils.BootOffsetToTimestamp(endOffset)) + span.SetTraceID(pcommon.TraceID(traceID)) + span.SetSpanID(pcommon.SpanID(spanID)) + span.SetFlags(uint32(trace.FlagsSampled)) + utils.Attributes(span.Attributes(), semconv.DBQueryText("set key value")) + return spans + }() + assert.Equal(t, want, got) +} diff --git a/internal/pkg/instrumentation/manager.go b/internal/pkg/instrumentation/manager.go index 8fcf9e3f2..8f69dd28e 100644 --- a/internal/pkg/instrumentation/manager.go +++ b/internal/pkg/instrumentation/manager.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/otel/trace" dbSql "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/database/sql" + goRedis "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/redis/go-redis" kafkaConsumer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/consumer" kafkaProducer "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/github.com/segmentio/kafka-go/producer" autosdk "go.opentelemetry.io/auto/internal/pkg/instrumentation/bpf/go.opentelemetry.io/auto/sdk" @@ -417,6 +418,7 @@ func (m *Manager) availableProbes() []probe.Probe { httpServer.New(m.logger, m.version), httpClient.New(m.logger, m.version), dbSql.New(m.logger, m.version), + goRedis.New(m.logger, m.version), kafkaProducer.New(m.logger, m.version), kafkaConsumer.New(m.logger, m.version), autosdk.New(m.logger), diff --git a/internal/pkg/instrumentation/utils/resp_proto.go b/internal/pkg/instrumentation/utils/resp_proto.go new file mode 100644 index 000000000..674354e1c --- /dev/null +++ b/internal/pkg/instrumentation/utils/resp_proto.go @@ -0,0 +1,319 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package utils + +import ( + "bytes" + "errors" + "fmt" + "io" + "log" + "strconv" + "strings" + + "golang.org/x/sys/unix" +) + +// parseNextResp parses a RESP command from the beginning of the input string. +// Returns: (command string, number of segments, leftover string, error). +func parseNextResp(input string) (cmd string, segCount int, leftover string, err error) { + if !strings.HasPrefix(input, "*") { + return "", 0, input, errors.New("not a valid RESP array: missing '*'") + } + + firstLineEnd := strings.Index(input, "\r\n") + if firstLineEnd == -1 { + return "", 0, input, errors.New("invalid RESP: missing CRLF after '*' line") + } + + arrayCountStr := input[1:firstLineEnd] + arrayCount, convErr := strconv.Atoi(arrayCountStr) + if convErr != nil { + return "", 0, input, fmt.Errorf("invalid array count: %w", convErr) + } + segCount = arrayCount + + currentPos := firstLineEnd + 2 + for i := 0; i < arrayCount; i++ { + if currentPos >= len(input) || input[currentPos] != '$' { + return "", 0, input, fmt.Errorf("invalid bulk string (missing '$') at index %d", i) + } + + bulkLineEnd := strings.Index(input[currentPos:], "\r\n") + if bulkLineEnd == -1 { + return "", 0, input, errors.New("invalid RESP: missing CRLF in bulk length line") + } + bulkLineEnd += currentPos + + bulkLenStr := input[currentPos+1 : bulkLineEnd] + bulkLen, convErr := strconv.Atoi(bulkLenStr) + if convErr != nil { + return "", 0, input, fmt.Errorf("invalid bulk length: %w", convErr) + } + + // Skip "$... \r\n" + currentPos = bulkLineEnd + 2 + + if currentPos+bulkLen+2 > len(input) { + return "", 0, input, errors.New("not enough data for bulk string content") + } + + currentPos += bulkLen + if input[currentPos:currentPos+2] != "\r\n" { + return "", 0, input, errors.New("missing CRLF after bulk string data") + } + currentPos += 2 + } + + cmd = input[:currentPos] + leftover = input[currentPos:] + return cmd, segCount, leftover, nil +} + +func parseSingleResp(input string) (string, error) { + // Attempt to parse the first command from the input. + cmd, _, _, err := parseNextResp(input) + if err != nil { + return "", err + } + return cmd, nil +} + +func parsePipelineWithTotalSegs(totalSegs int, input string) (string, error) { + if totalSegs < 0 { + return "", errors.New("totalSegs must not be negative") + } + if totalSegs == 0 { + // Parse ONLY the first valid command + return parseSingleResp(input) + } + var sb strings.Builder + parsedSoFar := 0 + rest := input + + for { + if parsedSoFar == totalSegs { + // We have reached the exact required segments. + break + } else if parsedSoFar > totalSegs { + // We have exceeded the required segments. + return "", fmt.Errorf("parsed segments (%d) exceed totalSegs (%d)", parsedSoFar, totalSegs) + } + + cmd, segCount, leftover, err := parseNextResp(rest) + if err != nil { + // The RESP packets obtained from eBPF may not be complete. + // Considering if the valid segments in the packet are fewer than the required segs, + // we should retrieve as many db statements as possible. + // When the leftover starts with *, it indicates that the remaining packet might be truncated. + if len(leftover) > 0 && leftover[0] == '*' { + break + } + return "", fmt.Errorf("cannot parse the next command: %w", err) + } + + _, err = sb.WriteString(cmd) + if err != nil { + return "", fmt.Errorf("resp parse err, failed to write string: %w", err) + } + parsedSoFar += segCount + rest = leftover + + // If there is no more data but we haven't reached the required segments yet, it's a failure. + if strings.TrimSpace(rest) == "" && parsedSoFar < totalSegs { + return "", fmt.Errorf("no more data but totalSegs not reached: need %d, got %d", totalSegs, parsedSoFar) + } + } + + // If we exit the loop with parsedSoFar == totalSegs, everything is correct. + if parsedSoFar <= totalSegs { + return sb.String(), nil + } + return "", fmt.Errorf("parsed segments (%d) != totalSegs (%d)", parsedSoFar, totalSegs) +} + +// ParseAll parses the given RESP data and supports the following types: +// 1. *N -- array +// 2. $N -- bulk string +// 3. +xxx\r\n -- simple string +// 4. -xxx\r\n -- error message +// 5. :xxx\r\n -- integer +// +// Returns something like []string{"set key1 value1", "GET key1", "OK", "1000", "Error message"}, etc. +func ParseRESP(segs int32, respMsg []byte) ([]string, error) { + respData, err := parsePipelineWithTotalSegs(int(segs), unix.ByteSliceToString(respMsg)) + if err != nil { + log.Printf("RESP parse err: %s\n", err.Error()) + return nil, err + } + r := bytes.NewReader([]byte(respData)) + var commands []string + + for { + cmd, err := parseOne(r) + if err != nil { + if errors.Is(err, io.EOF) && cmd == "" { + break + } + return nil, err + } + if cmd == "" { + break + } + commands = append(commands, cmd) + } + return commands, nil +} + +// parseOne parses a single top-level resp data: +// *N -- Array, corresponding to a command (e.g., set key value1) +// $N -- Bulk string +// +xxx -- Simple string (e.g., +OK) +// -xxx -- Error message +// :xxx -- Integer +// +// Returns a readable string, or io.EOF or another error if parsing fails. +func parseOne(r *bytes.Reader) (string, error) { + b, err := r.ReadByte() + if err != nil { + return "", err + } + + switch b { + case '*': + count, err := readIntCRLF(r) + if err != nil { + return "", err + } + parts := make([]string, 0, count) + for i := 0; i < count; i++ { + bulkType, err := r.ReadByte() + if err != nil { + return "", err + } + if bulkType != '$' { + return "", fmt.Errorf("parse error: expected '$' but got '%c'", bulkType) + } + strVal, err := parseBulkString(r) + if err != nil { + return "", err + } + parts = append(parts, strVal) + } + return strings.Join(parts, " "), nil + + case '$': + strVal, err := parseBulkString(r) + return strVal, err + + case '+': + line, err := readLine(r) + if err != nil { + return "", err + } + return line, nil + + case '-': + line, err := readLine(r) + if err != nil { + return "", err + } + return line, nil + + case ':': + line, err := readLine(r) + if err != nil { + return "", err + } + return line, nil + + default: + if err := r.UnreadByte(); err != nil { + return "", err + } + return "", io.EOF + } +} + +// parseBulkString parse "$N\r\n...N bytes...\r\n". +func parseBulkString(r *bytes.Reader) (string, error) { + length, err := readIntCRLF(r) + if err != nil { + return "", err + } + if length < 0 { + return "", nil + } + + buf := make([]byte, length) + n, err := io.ReadFull(r, buf) + if err != nil { + return "", err + } + if n != length { + return "", fmt.Errorf("parse error: not enough bulk data") + } + + if err := discardCRLF(r); err != nil { + return "", err + } + return string(buf), nil +} + +// readIntCRLF parses an integer string in the format "123\r\n". +func readIntCRLF(r *bytes.Reader) (int, error) { + line, err := readLine(r) + if err != nil { + return 0, err + } + i, err := strconv.Atoi(line) + if err != nil { + return 0, fmt.Errorf("parse int error: %v", err) + } + return i, nil +} + +// readLine reads a line until it encounters the "\r\n" sequence, +// and returns the portion of the line excluding the "\r\n". +func readLine(r *bytes.Reader) (string, error) { + var sb strings.Builder + for { + b, err := r.ReadByte() + if err != nil { + return "", err + } + if b == '\r' { + // 检查下一个是否是 '\n' + b2, err := r.ReadByte() + if err != nil { + return "", err + } + if b2 == '\n' { + break + } + return "", fmt.Errorf("parse error: expected LF after CR, got '%c'", b2) + } + _ = sb.WriteByte(b) + } + return sb.String(), nil +} + +// discardCRLF discard "\r\n". +func discardCRLF(r *bytes.Reader) error { + b, err := r.ReadByte() + if err != nil { + return err + } + if b != '\r' { + return fmt.Errorf("expected CR but got '%c'", b) + } + b, err = r.ReadByte() + if err != nil { + return err + } + if b != '\n' { + return fmt.Errorf("expected LF but got '%c'", b) + } + return nil +} diff --git a/internal/pkg/instrumentation/utils/resp_proto_test.go b/internal/pkg/instrumentation/utils/resp_proto_test.go new file mode 100644 index 000000000..aecbf1f47 --- /dev/null +++ b/internal/pkg/instrumentation/utils/resp_proto_test.go @@ -0,0 +1,206 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package utils + +import ( + "strings" + "testing" +) + +// TestParseNextResp tests the parseNextResp function which parses a single RESP command +// from the beginning of the input string. +func TestParseNextResp(t *testing.T) { + tests := []struct { + name string + input string + wantSegCount int + wantErr bool + wantCmdPrefix string // optional check to see if parsed cmd has a certain prefix + }{ + { + name: "Valid single command - 2 segments", + input: "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n", + wantSegCount: 2, + wantErr: false, + wantCmdPrefix: "*2\r\n", + }, + { + name: "Valid single command - 3 segments", + input: "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", + wantSegCount: 3, + wantErr: false, + wantCmdPrefix: "*3\r\n", + }, + { + name: "Missing '*' at the start", + input: "2\r\n$3\r\nGET\r\n$3\r\nkey\r\n", + wantSegCount: 0, + wantErr: true, + }, + { + name: "Missing CRLF after array count", + input: "*3$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", + wantSegCount: 0, + wantErr: true, + }, + { + name: "Invalid bulk length", + input: "*1\r\n$xyz\r\nabcdef\r\n", + wantSegCount: 0, + wantErr: true, + }, + { + name: "Not enough data for bulk string content", + input: "*1\r\n$5\r\nhi\r\n", // bulkLen=5 but only 2 bytes "hi" + wantSegCount: 0, + wantErr: true, + }, + } + + for _, tc := range tests { + tc := tc // capture range variable + t.Run(tc.name, func(t *testing.T) { + cmd, segCount, leftover, err := parseNextResp(tc.input) + if (err != nil) != tc.wantErr { + t.Errorf("parseNextResp() error = %v, wantErr = %v", err, tc.wantErr) + return + } + + if segCount != tc.wantSegCount { + t.Errorf("parseNextResp() segCount = %d, want = %d", segCount, tc.wantSegCount) + } + + if !tc.wantErr { + // Check that the returned command starts with the expected prefix (if any) + if tc.wantCmdPrefix != "" && !strings.HasPrefix(cmd, tc.wantCmdPrefix) { + t.Errorf("parseNextResp() cmd prefix mismatch, got: %q, want prefix: %q", cmd, tc.wantCmdPrefix) + } + // Leftover should be what's left after reading one command + if leftover != "" && leftover == tc.input { + t.Errorf("parseNextResp() leftover not consumed, leftover=%q", leftover) + } + } + }) + } +} + +// TestParsePipelineWithTotalSegs tests the parsePipelineWithTotalSegs function, +// which parses multiple RESP commands in pipeline and checks if the sum of +// all command segments equals a specified totalSegs. +func TestParsePipelineWithTotalSegs(t *testing.T) { + // Construct a pipeline of three commands: + // Cmd1: *2\r\n$3\r\nGET\r\n$3\r\nkey\r\n -> 2 segments + // Cmd2: *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n -> 3 segments + // Cmd3: *2\r\n$3\r\nGET\r\n$3\r\nkey\r\n -> 2 segments + // Total = 7 segments + pipeline := "" + + "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n" + + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n" + + "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n" + + "Some leftover data" + + // A pipeline that extends the above with one more command (2 segments). + // So total = 9 segments now. + extendedPipeline := pipeline + "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n" + truncatedPipeline := "" + + "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n" + + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n" + + "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n" + + "*2\r" // "*2\r" is a truncated stmt + + tests := []struct { + name string + input string + totalSegs int + wantErr bool + equalTo string + }{ + { + name: "Exact match: totalSegs=5 on pipeline of 7 segments", + input: pipeline, + totalSegs: 5, + wantErr: false, + equalTo: "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", + }, + { + name: "Exact match: totalSegs=7 on pipeline of 7 segments", + input: pipeline, + totalSegs: 7, + wantErr: false, + equalTo: "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n", + }, + { + name: "Exact match: totalSegs=8 on pipeline of 7 segments", + input: pipeline + "END", + totalSegs: 8, + wantErr: true, + equalTo: "", + }, + { + name: "Less segs than needed: totalSegs=6 on pipeline of 7", + input: pipeline, + totalSegs: 6, + wantErr: true, + equalTo: "", + }, + { + name: "Extended pipeline total=7, but we only ask for the first 7 segs", + input: extendedPipeline, + totalSegs: 7, + wantErr: false, + equalTo: "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n", + }, + { + name: "Truncated pipeline total=7, but we only ask for the first 8 segs", + input: truncatedPipeline, + totalSegs: 8, + wantErr: false, + equalTo: "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n", + }, + { + name: "Extended pipeline total=7, but we only ask for 8 segs", + input: extendedPipeline, + totalSegs: 8, + wantErr: true, + equalTo: "", + }, + { + name: "Extended pipeline total=7, ask for 9 => success", + input: extendedPipeline, + totalSegs: 9, + wantErr: true, + equalTo: "", + }, + { + name: "Zero totalSegs => error", + input: pipeline, + totalSegs: 0, + wantErr: false, + equalTo: "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n", + }, + { + name: "Negative totalSegs => error", + input: pipeline, + totalSegs: -1, + wantErr: true, + equalTo: "", + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + got, err := parsePipelineWithTotalSegs(tc.totalSegs, tc.input) + if (err != nil) != tc.wantErr { + t.Errorf("parsePipelineWithTotalSegs() error = %v, wantErr = %v", + err, tc.wantErr) + return + } + if !tc.wantErr && tc.equalTo != "" && got != tc.equalTo { + t.Errorf("parsePipelineWithTotalSegs() output does not contain %q\nOutput:\n%q", + tc.equalTo, got) + } + }) + } +}