-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
api: Add remote API with write client; add remote handler. #1658
base: main
Are you sure you want to change the base?
Conversation
17b12a6
to
1985d61
Compare
Signed-off-by: bwplotka <[email protected]>
1985d61
to
bffa922
Compare
// | ||
// It is not safe to use the returned API from multiple goroutines, create a | ||
// separate *API for each goroutine. | ||
func NewAPI(c api.Client, opts ...APIOption) (*API, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could consider removing error from here and from APIOption
as we don't have any option with validation or so. However I added it for future compatibility (we won't be able to easily extend it without breaking compatibility). Note: Currently api/ is experimenta... 🙈 I think I will remove error here and hope for the best, YAGNI.
func NewAPI(c api.Client, opts ...APIOption) (*API, error) { | |
func NewAPI(c api.Client, opts ...APIOption) *API { |
} | ||
|
||
// DesymbolizeLabels decodes label references, with given symbols to labels. | ||
func DesymbolizeLabels(labelRefs []uint32, symbols, buf []string) []string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I just ran into DesymbolizeLabels not being public in prometheus codebase here open-telemetry/opentelemetry-collector-contrib#35751 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you use labels.Labels or some other struct (for labels in Otel)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the translation layer we have createAttributes
https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/translator/prometheusremotewrite/helper.go#L99 which returns []prompb.Label
and here we store them https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/translator/prometheusremotewrite/metrics_to_prw_v2.go#L119
|
||
// NewRemoteWriteHandler returns HTTP handler that receives Remote Write 2.0 | ||
// protocol https://prometheus.io/docs/specs/remote_write_spec_2_0/. | ||
func NewRemoteWriteHandler(logger *slog.Logger, store writeStorage) http.Handler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's bit inconsistent with NewRemoteAPI in terms of options, let's unify one to one or another 🤔
// WriteProtoFullName represents the fully qualified name of the protobuf message | ||
// to use in Remote write 1.0 and 2.0 protocols. | ||
// See https://prometheus.io/docs/specs/remote_write_spec_2_0/#protocol. | ||
type WriteProtoFullName protoreflect.FullName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Change to string, bit too hard to use
const ( | ||
// WriteProtoFullNameV1 represents the `prometheus.WriteRequest` protobuf | ||
// message introduced in the https://prometheus.io/docs/specs/remote_write_spec/. | ||
// DEPRECATED: Use WriteProtoFullNameV2 instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For receivers who wants to support it for longer, deprecation might cause some friction, but maybe worth it? 🤔
Example use of handler in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both API (client) and handler supports 1.0 and 2.0 Proto messages, but I intentionally only host writev2.Request generated Go code as we plan to deprecate 1.0 and users should use writev2 now. Both API and handler also will work against protobuf message types that use custom generators e.g. with gogoproto or anything else.
Given that the spec is only just finalized recently and (afaik) still no receivers like Mimir support v2, we should include v1 request code imo.
} | ||
|
||
func (r *API) attemptWrite(ctx context.Context, compr Compression, proto WriteProtoFullName, payload []byte, attempt int) (WriteResponseStats, error) { | ||
u := r.client.URL("api/v1/write", nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be configurable imo, not all remote write receivers use the same endpoint
rs, err := parseWriteResponseStats(resp) | ||
if err != nil { | ||
r.opts.logger.Warn("parsing rw write statistics failed; partial or no stats", "err", err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we only try to get write response stats/log a warning if the request type was v2?
return &handler{logger: logger, store: store} | ||
} | ||
|
||
func parseProtoMsg(contentType string) (WriteProtoFullName, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a comment with an example of what the string might look like would be useful for people who haven't read or fully understood the v2 spec
// Copyright (c) The EfficientGo Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
// Initially copied from Cortex project. | ||
|
||
// Package backoff implements backoff timers which increases wait time on every retry, incredibly useful | ||
// in distributed system timeout functionalities. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be pulling this via go.mod
as opposed to having copies of the files? or is this the way we include dependencies for the client library?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try as hard as we can to avoid dependencies. I'm in favor of copying other libraries code (respecting copyright) if possible
} | ||
|
||
// Read the request body. | ||
body, err := io.ReadAll(r.Body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be out of scope for Prometheus but it would be nice to limit the request body size to protect the handler. https://github.com/cortexproject/cortex/blob/master/pkg/util/http.go#L180
return | ||
} | ||
|
||
decompressed, err := snappy.Decode(nil, body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here. It would be nice if we can check snappy decoded length and see if it exceeds the max size https://github.com/cortexproject/cortex/blob/master/pkg/util/http.go#L247
return &handler{logger: logger, store: store} | ||
} | ||
|
||
func parseProtoMsg(contentType string) (WriteProtoFullName, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this something we can expose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to exposing this
h.logger.Error("Error decompressing remote write request", "err", err.Error()) | ||
http.Error(w, err.Error(), http.StatusBadRequest) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can expose a hook for the logic of decoding and decompressing part so that downstream project can just use their existing code without many adjustment.
func decodeRequestBody(r *http.Request) (body []byte, err error)
The main logic that can benefit us is handling v1 and v2 proto and response headers. It would be nice to still reuse the other code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup some requestValidatorDecoder interface that can be overridden downstream would be cool to have.
} | ||
|
||
req.Header.Add("Content-Encoding", string(compr)) | ||
req.Header.Set("Content-Type", contentTypeHeader(proto)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe there should be an option to specify custom headers here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do that with tripperware though 🤔
|
||
err = fmt.Errorf("server returned HTTP status %s: %s", resp.Status, body) | ||
if resp.StatusCode/100 == 5 || | ||
(r.opts.retryOnRateLimit && resp.StatusCode == http.StatusTooManyRequests) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should retryOn429 be configurable here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure!
h.logger.Error("Error decompressing remote write request", "err", err.Error()) | ||
http.Error(w, err.Error(), http.StatusBadRequest) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup some requestValidatorDecoder interface that can be overridden downstream would be cool to have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe more of a future comment, but we can probably have some common/model codec here too, since we already have the types, like https://github.com/prometheus/prometheus/blob/main/prompb/io/prometheus/write/v2/codec.go, that would translate to common/model
types. Would be good for downstream clients, that can choose to not import prometheus at all.
return &handler{logger: logger, store: store} | ||
} | ||
|
||
func parseProtoMsg(contentType string) (WriteProtoFullName, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to exposing this
code = http.StatusInternalServerError | ||
} | ||
if code/5 == 100 { // 5xx | ||
h.logger.Error("Error while remote writing the v2 request", "err", storeErr.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
store.Store can ingest both I think
h.logger.Error("Error while remote writing the v2 request", "err", storeErr.Error()) | |
h.logger.Error("Error while storing the remote write request", "err", storeErr.Error()) |
This proposes Remote Write API directly in client_golang main module. No dependency added (some new small packages are added under the internal vendored code).
Old attempt: #1656
Both API (client) and handler supports 1.0 and 2.0 Proto messages, but I intentionally only host
writev2.Request
generated Go code as we plan to deprecate 1.0 and users should usewritev2
now. Both API and handler also will work against protobuf message types that use custom generators e.g. with gogoproto or anything else.TODO: