-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka add gauge v1 #33408
base: master
Are you sure you want to change the base?
Kafka add gauge v1 #33408
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #33408 +/- ##
============================================
+ Coverage 58.86% 58.95% +0.08%
- Complexity 3112 3183 +71
============================================
Files 1130 1133 +3
Lines 174419 174576 +157
Branches 3343 3366 +23
============================================
+ Hits 102680 102916 +236
+ Misses 68392 68312 -80
- Partials 3347 3348 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
2791b23
to
b91a77f
Compare
R: @sjvanrossum for the kafka io part, thanks in advance! |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
b91a77f
to
948dfe6
Compare
Run Java PreCommit |
R: @johnjcasey for the sdk portion of it. |
.../java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java
Outdated
Show resolved
Hide resolved
Run Java_Pulsar_IO_Direct PreCommit |
11bee27
to
b9d2f2b
Compare
Run Java PreCommit |
Run Java_GCP_IO_Direct PreCommit |
Run Java PreCommit |
Run Java PreCommit |
Run Java_GCP_IO_Direct PreCommit |
1 similar comment
Run Java_GCP_IO_Direct PreCommit |
Run Java PreCommit |
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java
Outdated
Show resolved
Hide resolved
...a/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java
Outdated
Show resolved
Hide resolved
/** | ||
* @param topicName topicName | ||
* @param partitionId partitionId for the topic Only included in the metric key if | ||
* 'supportsMetricsDeletion' is enabled. | ||
* @param backlog backlog for the topic Only included in the metric key if | ||
* 'supportsMetricsDeletion' is enabled. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
" Only" -> ". Only"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, thanks for catching that.
@@ -71,11 +79,17 @@ abstract class KafkaMetricsImpl implements KafkaMetrics { | |||
|
|||
abstract HashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies(); | |||
|
|||
static ConcurrentHashMap<String, Gauge> backlogGauges = new ConcurrentHashMap<String, Gauge>(); | |||
|
|||
abstract HashMap<String, Long> perTopicPartitionBacklogs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an instance of this class may be concurrently updated, then HashMap
needs to be replaced (ditto for the existing HashMap
fields). Use ConcurrentHashMap
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly unrelated, but why doesn't perTopicRpcLatencies
use a gauge or sum as the value type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would the sum represent? the sum of latencies? but each individual one is important, and a sum would lose information.
A gauge isn't quite clear either, if you have two concurrent rpcs that completed, what value do you return?
A histogram of values provides more information (and allows us to see the spread of values)
@@ -40,6 +42,7 @@ public class KafkaSinkMetrics { | |||
|
|||
// Base Metric names | |||
private static final String RPC_LATENCY = "RpcLatency"; | |||
private static final String ESTIAMTED_BACKLOG_SIZE = "EstimatedBacklogSize"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: ESTIAMTED_BACKLOG_SIZE
-> ESTIMATED_BACKLOG_SIZE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java
Outdated
Show resolved
Hide resolved
for (PartitionState<K, V> p : partitionStates) { | ||
long pBacklog = p.approxBacklogInBytes(); | ||
if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { | ||
return UnboundedReader.BACKLOG_UNKNOWN; | ||
} | ||
backlogBytes += pBacklog; | ||
} | ||
|
||
return backlogBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should remain as it is, right? getSplitBacklogBytes
and may affect autoscaling behavior according to the docs and this change would have this override return either 0 or UnboundedReader.BACKLOG_UNKNOWN
instead of the split's approximate backlog bytes across assigned partitions or UnboundedReader.BACKLOG_UNKNOWN
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I agree, this was deleted accidentally. Reverted that.
@@ -743,6 +747,16 @@ private void reportBacklog() { | |||
backlogElementsOfSplit.set(splitBacklogMessages); | |||
} | |||
|
|||
private void reportBacklogMetrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this can be merged with reportBacklog
(potentially rename that method to reportBacklogMetrics
updateBacklogMetrics
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I explicitly moved it out to be separate, since reportBacklog() is called twice, and we only need to do this once (when we advance to the next record).
042a247
to
9e11bb6
Compare
9e11bb6
to
8f9a278
Compare
Flink test is failing, likely due to #33510, |
Add per worker gauge support to add per backlog partition for kafka with java legacy worker for Dataflow
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.