Skip to content

Commit

Permalink
feat: Add gRPC server metrics (#1031)
Browse files Browse the repository at this point in the history
Add server metrics defined in gRFC A66: OpenTelemetry Metrics https://github.com/grpc/proposal/blob/master/A66-otel-stats.md
  • Loading branch information
DNVindhya authored Jan 22, 2024
1 parent f72eda3 commit a188d8a
Show file tree
Hide file tree
Showing 5 changed files with 603 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright (c) 2016-2023 The gRPC-Spring Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.devh.boot.grpc.server.metrics;

import java.time.Duration;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.BaseUnits;

/*
* The instruments used to record metrics on server.
*/
public final class MetricsServerInstruments {

private MetricsServerInstruments() {}

/*
* Server side metrics defined in gRFC <a
* href="https://github.com/grpc/proposal/blob/master/A66-otel-stats.md">A66</a>. Please note that these are the
* names used for instrumentation and can be changed by exporters in an unpredictable manner depending on the
* destination.
*/
private static final String SERVER_CALL_STARTED = "grpc.server.call.started";
private static final String SERVER_SENT_COMPRESSED_MESSAGE_SIZE =
"grpc.server.call.sent_total_compressed_message_size";
private static final String SERVER_RECEIVED_COMPRESSED_MESSAGE_SIZE =
"grpc.server.call.rcvd_total_compressed_message_size";
private static final String SERVER_CALL_DURATION =
"grpc.server.call.duration";
private static final double[] DEFAULT_SIZE_BUCKETS =
new double[] {1024d, 2048d, 4096d, 16384d, 65536d, 262144d, 1048576d,
4194304d, 16777216d, 67108864d, 268435456d, 1073741824d, 4294967296d};
private static final Duration[] DEFAULT_LATENCY_BUCKETS =
new Duration[] {Duration.ofNanos(10000), Duration.ofNanos(50000), Duration.ofNanos(100000),
Duration.ofNanos(300000), Duration.ofNanos(600000), Duration.ofNanos(800000),
Duration.ofMillis(1), Duration.ofMillis(2), Duration.ofMillis(3), Duration.ofMillis(4),
Duration.ofMillis(5), Duration.ofMillis(6), Duration.ofMillis(8), Duration.ofMillis(10),
Duration.ofMillis(13), Duration.ofMillis(16), Duration.ofMillis(20), Duration.ofMillis(25),
Duration.ofMillis(30), Duration.ofMillis(40), Duration.ofMillis(50), Duration.ofMillis(65),
Duration.ofMillis(80), Duration.ofMillis(100), Duration.ofMillis(130), Duration.ofMillis(160),
Duration.ofMillis(200), Duration.ofMillis(250), Duration.ofMillis(300), Duration.ofMillis(400),
Duration.ofMillis(500), Duration.ofMillis(650), Duration.ofMillis(800),
Duration.ofSeconds(1), Duration.ofSeconds(2), Duration.ofSeconds(5), Duration.ofSeconds(10),
Duration.ofSeconds(20), Duration.ofSeconds(50), Duration.ofSeconds(100)};

static MetricsServerMeters newServerMetricsMeters(MeterRegistry registry) {
MetricsServerMeters.Builder builder = MetricsServerMeters.newBuilder();

builder.setServerCallCounter(Counter.builder(SERVER_CALL_STARTED)
.description("The total number of RPC attempts started from the server side, including "
+ "those that have not completed.")
.baseUnit("call")
.withRegistry(registry));

builder.setSentMessageSizeDistribution(DistributionSummary.builder(
SERVER_SENT_COMPRESSED_MESSAGE_SIZE)
.description("Compressed message bytes sent per server call")
.baseUnit(BaseUnits.BYTES)
.serviceLevelObjectives(DEFAULT_SIZE_BUCKETS)
.withRegistry(registry));

builder.setReceivedMessageSizeDistribution(DistributionSummary.builder(
SERVER_RECEIVED_COMPRESSED_MESSAGE_SIZE)
.description("Compressed message bytes received per server call")
.baseUnit(BaseUnits.BYTES)
.serviceLevelObjectives(DEFAULT_SIZE_BUCKETS)
.withRegistry(registry));

builder.setServerCallDuration(Timer.builder(SERVER_CALL_DURATION)
.description("Time taken to complete a call from server transport's perspective")
.serviceLevelObjectives(DEFAULT_LATENCY_BUCKETS)
.withRegistry(registry));

return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2016-2023 The gRPC-Spring Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.devh.boot.grpc.server.metrics;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Meter.MeterProvider;
import io.micrometer.core.instrument.Timer;

/*
* Collection of server metrics meters.
*/
public class MetricsServerMeters {

private MeterProvider<Counter> serverCallCounter;
private MeterProvider<DistributionSummary> sentMessageSizeDistribution;
private MeterProvider<DistributionSummary> receivedMessageSizeDistribution;
private MeterProvider<Timer> serverCallDuration;

private MetricsServerMeters(Builder builder) {
this.serverCallCounter = builder.serverCallCounter;
this.sentMessageSizeDistribution = builder.sentMessageSizeDistribution;
this.receivedMessageSizeDistribution = builder.receivedMessageSizeDistribution;
this.serverCallDuration = builder.serverCallDuration;
}

public MeterProvider<Counter> getServerCallCounter() {
return this.serverCallCounter;
}

public MeterProvider<DistributionSummary> getSentMessageSizeDistribution() {
return this.sentMessageSizeDistribution;
}

public MeterProvider<DistributionSummary> getReceivedMessageSizeDistribution() {
return this.receivedMessageSizeDistribution;
}

public MeterProvider<Timer> getServerCallDuration() {
return this.serverCallDuration;
}

public static Builder newBuilder() {
return new Builder();
}

static class Builder {

private MeterProvider<Counter> serverCallCounter;
private MeterProvider<DistributionSummary> sentMessageSizeDistribution;
private MeterProvider<DistributionSummary> receivedMessageSizeDistribution;
private MeterProvider<Timer> serverCallDuration;

private Builder() {}

public Builder setServerCallCounter(MeterProvider<Counter> counter) {
this.serverCallCounter = counter;
return this;
}

public Builder setSentMessageSizeDistribution(MeterProvider<DistributionSummary> distribution) {
this.sentMessageSizeDistribution = distribution;
return this;
}

public Builder setReceivedMessageSizeDistribution(MeterProvider<DistributionSummary> distribution) {
this.receivedMessageSizeDistribution = distribution;
return this;
}

public Builder setServerCallDuration(MeterProvider<Timer> timer) {
this.serverCallDuration = timer;
return this;
}

public MetricsServerMeters build() {
return new MetricsServerMeters(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright (c) 2016-2023 The gRPC-Spring Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.devh.boot.grpc.server.metrics;

import static com.google.common.base.Preconditions.checkNotNull;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Supplier;

import com.google.common.base.Stopwatch;

import io.grpc.Metadata;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;

/**
* Provides factories for {@link io.grpc.StreamTracer} that records metrics.
*
* <p>
* On the server-side, there is only one ServerStream per each ServerCall, and ServerStream starts earlier than the
* ServerCall. Therefore, only one tracer is created per stream/call and it's the tracer that reports the metrics
* summary.
*
* <b>Note:</b> This class uses experimental grpc-java-API features.
*/
public final class MetricsServerStreamTracers {

private static final Supplier<Stopwatch> STOPWATCH_SUPPLIER = Stopwatch::createUnstarted;
private final Supplier<Stopwatch> stopwatchSupplier;

public MetricsServerStreamTracers() {
this(STOPWATCH_SUPPLIER);
}

public MetricsServerStreamTracers(Supplier<Stopwatch> stopwatchSupplier) {
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
}

/**
* Returns a {@link io.grpc.ServerStreamTracer.Factory} with default metrics definitions.
*
* @param registry The MeterRegistry used to create the metrics.
*/
public ServerStreamTracer.Factory getMetricsServerTracerFactory(MeterRegistry registry) {
return new MetricsServerTracerFactory(registry);
}

/**
* Returns a {@link io.grpc.ServerStreamTracer.Factory} with metrics definitions from custom
* {@link MetricsServerMeters}.
*
* @param meters The MetricsServerMeters used to configure the metrics definitions.
*/
public ServerStreamTracer.Factory getMetricsServerTracerFactory(MetricsServerMeters meters) {
return new MetricsServerTracerFactory(meters);
}

private static final class ServerTracer extends ServerStreamTracer {
private final MetricsServerStreamTracers tracer;
private final String fullMethodName;
private final MetricsServerMeters metricsServerMeters;
private final Stopwatch stopwatch;
private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater =
AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater =
AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
private volatile long outboundWireSize;
private volatile long inboundWireSize;
private volatile int streamClosed;


ServerTracer(MetricsServerStreamTracers tracer, String fullMethodName, MetricsServerMeters meters) {
this.tracer = checkNotNull(tracer, "tracer");
this.fullMethodName = fullMethodName;
this.metricsServerMeters = meters;
// start stopwatch
this.stopwatch = tracer.stopwatchSupplier.get().start();
}

@Override
public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
this.metricsServerMeters.getServerCallCounter()
.withTags(Tags.of("grpc.method", this.fullMethodName))
.increment();
}

@Override
public void outboundWireSize(long bytes) {
outboundWireSizeUpdater.getAndAdd(this, bytes);
}

@Override
public void inboundWireSize(long bytes) {
inboundWireSizeUpdater.getAndAdd(this, bytes);
}

@Override
public void streamClosed(Status status) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
}
long callLatencyNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);

Tags serverMetricTags =
Tags.of("grpc.method", this.fullMethodName, "grpc.status", status.getCode().toString());
this.metricsServerMeters.getServerCallDuration()
.withTags(serverMetricTags)
.record(callLatencyNanos, TimeUnit.NANOSECONDS);
this.metricsServerMeters.getSentMessageSizeDistribution()
.withTags(serverMetricTags)
.record(outboundWireSize);
this.metricsServerMeters.getReceivedMessageSizeDistribution()
.withTags(serverMetricTags)
.record(inboundWireSize);
}
}

final class MetricsServerTracerFactory extends ServerStreamTracer.Factory {

private final MetricsServerMeters metricsServerMeters;

MetricsServerTracerFactory(MeterRegistry registry) {
this(MetricsServerInstruments.newServerMetricsMeters(registry));
}

MetricsServerTracerFactory(MetricsServerMeters metricsServerMeters) {
this.metricsServerMeters = metricsServerMeters;
}

@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
return new ServerTracer(MetricsServerStreamTracers.this, fullMethodName, this.metricsServerMeters);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2016-2023 The gRPC-Spring Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.devh.boot.grpc.server.metrics;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;

/**
* A manipulated clock that exports a {@link com.google.common.base.Ticker}.
*/
public final class FakeClock {
private long currentTimeNanos;
private final Ticker ticker =
new Ticker() {
@Override
public long read() {
return currentTimeNanos;
}
};

private final Supplier<Stopwatch> stopwatchSupplier = () -> Stopwatch.createUnstarted(ticker);

/**
* Forward the time by the given duration.
*/
public void forwardTime(long value, TimeUnit unit) {
currentTimeNanos += unit.toNanos(value);
}

/**
* Provides a stopwatch instance that uses the fake clock ticker.
*/
public Supplier<Stopwatch> getStopwatchSupplier() {
return stopwatchSupplier;
}
}
Loading

0 comments on commit a188d8a

Please sign in to comment.