Skip to content

Commit

Permalink
Add metadata on Kafka sink metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Jan 6, 2025
1 parent e9e2b16 commit fc93823
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ message MonitoringInfo {
SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }];
SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }];
SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }];
PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }];
}

// A set of key and value labels which define the scope of the metric. For
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,10 @@ public MetricUpdates getUpdates() {
if (metricName instanceof MonitoringInfoMetricName) {
MonitoringInfoMetricName monitoringInfoName = (MonitoringInfoMetricName) metricName;
// Represents a specific MonitoringInfo for a specific URN.
LOG.info("xxx parse labels");
builder.setUrn(monitoringInfoName.getUrn());
for (Entry<String, String> e : monitoringInfoName.getLabels().entrySet()) {
LOG.info("xxx {} key: {} value: {}", metricName.getName(), e.getKey(), e.getValue());
builder.setLabel(e.getKey(), e.getValue());
}
} else { // Represents a user counter.
Expand All @@ -268,6 +270,14 @@ public MetricUpdates getUpdates() {
.setLabel(MonitoringInfoConstants.Labels.NAME, metricKey.metricName().getName())
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricKey.stepName());
}

// Based on namespace, add per worker metrics label to enable separate runner based sink based
// processing.
if (metricName.getNamespace().equals("BigQuerySink")
|| metricName.getNamespace().equals("KafkaSink")) {
LOG.info("xxx added labels");
builder.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true");
}
return builder;
}

Expand Down Expand Up @@ -420,6 +430,7 @@ public Map<String, ByteString> getMonitoringData(ShortIdMap shortIds) {
String shortId = getShortId(metricName, this::gaugeToMonitoringMetadata, shortIds);
if (shortId != null) {
builder.put(shortId, encodeInt64Gauge(gaugeCell.getCumulative()));
LOG.info("xxx metricName {} builder {}", metricName.toString(), builder.toString());
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public static final class Labels {
public static final String SPANNER_DATABASE_ID = "SPANNER_DATABASE_ID";
public static final String SPANNER_INSTANCE_ID = "SPANNER_INSTANCE_ID";
public static final String SPANNER_QUERY_NAME = "SPANNER_QUERY_NAME";
public static final String PER_WORKER_METRIC = "PER_WORKER_METRIC";

static {
// Validate that compile time constants match the values stored in the protos.
Expand Down Expand Up @@ -148,6 +149,7 @@ public static final class Labels {
SPANNER_INSTANCE_ID.equals(extractLabel(MonitoringInfoLabels.SPANNER_INSTANCE_ID)));
checkArgument(
SPANNER_QUERY_NAME.equals(extractLabel(MonitoringInfoLabels.SPANNER_QUERY_NAME)));
checkArgument(PER_WORKER_METRIC.equals(extractLabel(MonitoringInfoLabels.PER_WORKER_METRIC)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

/** A set of functions used to encode and decode common monitoring info types. */
public class MonitoringInfoEncodings {

private static final Coder<Long> VARINT_CODER = VarLongCoder.of();
private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
private static final IterableCoder<String> STRING_SET_CODER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,54 @@ public void testMonitoringInfosArePopulatedForUserCounters() {
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
}

@Test
public void testMonitoringInfosLabelsArePopulatedForSinkCounter() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
CounterCell c1 = testObject.getCounter(MetricName.named("KafkaSink", "name1"));
CounterCell c2 = testObject.getCounter(MetricName.named("BigQuerySink", "name2"));
CounterCell c3 = testObject.getCounter(MetricName.named("PS", "name3"));

c1.inc(2L);
c2.inc(4L);
c3.inc(5L);

SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
builder1
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "KafkaSink")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name1")
.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true")
.setInt64SumValue(2)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
builder2
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "BigQuerySink")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true")
.setInt64SumValue(4)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

// Not in an supported namespace, so extra metadata isn't added.
SimpleMonitoringInfoBuilder builder3 = new SimpleMonitoringInfoBuilder();
builder3
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "PS")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name3")
.setInt64SumValue(5)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

ArrayList<MonitoringInfo> actualMonitoringInfos = new ArrayList<MonitoringInfo>();
for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
actualMonitoringInfos.add(mi);
}

assertThat(
actualMonitoringInfos,
containsInAnyOrder(builder1.build(), builder2.build(), builder3.build()));
}

@Test
public void testMonitoringInfosArePopulatedForUserDistributions() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,6 +41,8 @@ public interface KafkaMetrics {

void updateKafkaMetrics();

void recordBacklogBytesNumber(String topic, int partitionId, long backlog);

/** No-op implementation of {@code KafkaResults}. */
class NoOpKafkaMetrics implements KafkaMetrics {
private NoOpKafkaMetrics() {}
Expand All @@ -53,6 +56,9 @@ public void updateBacklogBytes(String topic, int partitionId, long elapsedTime)
@Override
public void updateKafkaMetrics() {}

@Override
public void recordBacklogBytesNumber(String topic, int partitionId, long backlog) {};

private static NoOpKafkaMetrics singleton = new NoOpKafkaMetrics();

static NoOpKafkaMetrics getInstance() {
Expand Down Expand Up @@ -108,6 +114,8 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
}

/**
* This is for tracking backlog bytes to be added to the Metric Container at a later time.
*
* @param topicName topicName
* @param partitionId partitionId for the topic Only included in the metric key if
* 'supportsMetricsDeletion' is enabled.
Expand Down Expand Up @@ -151,6 +159,23 @@ private void recordBacklogBytes() {
}
}

/**
* This is for recording backlog bytes on the current thread.
*
* @param topicName topicName
* @param partitionId partitionId for the topic Only included in the metric key if
* 'supportsMetricsDeletion' is enabled.
* @param backlogBytes backlog for the topic Only included in the metric key if
* 'supportsMetricsDeletion' is enabled.
*/
@Override
public void recordBacklogBytesNumber(String topicName, int partitionId, long backlogBytes) {
Gauge perPartion =
Metrics.gauge(
"KafkaSink", KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId).getName());
perPartion.set(backlogBytes);
}

/**
* Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics}
* containers. This function will only report metrics once per instance. Subsequent calls to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ public ProcessContinuation processElement(
}
}
}

// // this seems incorrect, get negative values
backlogBytes.set(
(long)
(BigDecimal.valueOf(
Expand All @@ -558,6 +558,19 @@ public ProcessContinuation processElement(
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
.doubleValue()
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();

kafkaResults.recordBacklogBytesNumber(
kafkaSourceDescriptor.getTopic(),
kafkaSourceDescriptor.getPartition(),
(long)
(BigDecimal.valueOf(
Preconditions.checkStateNotNull(
offsetEstimatorCache.get(kafkaSourceDescriptor).estimate()))
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
.doubleValue()
* avgRecordSize
.estimateRecordByteSizeToOffsetCountRatio())); // is this correct?
}
}
}
Expand Down

0 comments on commit fc93823

Please sign in to comment.