From bc21ee9d55d49b32e1258bd2e932f0a7856c0452 Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Tue, 28 Jan 2025 14:18:52 -0800 Subject: [PATCH] atlas cloudwatch: Cache the last timestamp of a high res poll. For some aggregations, we were repeating the same timestamp, resulting in higher than expected values combined in the registry. Now we'll only emit new values. --- .../atlas/cloudwatch/CloudWatchPoller.scala | 50 +++++++++++++------ 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchPoller.scala b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchPoller.scala index 540f37e7..d549829b 100644 --- a/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchPoller.scala +++ b/atlas-cloudwatch/src/main/scala/com/netflix/atlas/cloudwatch/CloudWatchPoller.scala @@ -138,6 +138,8 @@ class CloudWatchPoller( private[cloudwatch] val awsRequestLimit = config.getInt("atlas.cloudwatch.account.polling.requestLimit") + private[cloudwatch] val highResTimeCache = new ConcurrentHashMap[Long, Long]() + { // Scheduling Pollers Based on Offset offsetMap.foreach { @@ -420,17 +422,7 @@ class CloudWatchPoller( if (response.datapoints().isEmpty) { debugger.debugPolled(metric, IncomingMatch.DroppedEmpty, nowMillis, category) - } else { - debugger.debugPolled( - metric, - IncomingMatch.Accepted, - nowMillis, - category, - response.datapoints() - ) - } - - if (category.period < 60) { + } else if (category.period < 60) { response .datapoints() .asScala @@ -443,11 +435,41 @@ class CloudWatchPoller( dimensions.toList, dp ) - val metaData = MetricMetadata(category, definition, toAWSDimensions(firehoseMetric)) - registry.counter(polledPublishPath.withTag("path", "registry")).increment() - processor.sendToRegistry(metaData, firehoseMetric, nowMillis) + + val prev = highResTimeCache.getOrDefault(firehoseMetric.xxHash, 0) + if (dp.timestamp().toEpochMilli > prev) { + highResTimeCache.put(firehoseMetric.xxHash, dp.timestamp().toEpochMilli) + debugger.debugPolled( + metric, + IncomingMatch.Accepted, + nowMillis, + category, + response.datapoints() + ) + + val metaData = + MetricMetadata(category, definition, toAWSDimensions(firehoseMetric)) + registry.counter(polledPublishPath.withTag("path", "registry")).increment() + processor.sendToRegistry(metaData, firehoseMetric, nowMillis) + } else { + debugger.debugPolled( + metric, + IncomingMatch.DroppedEmpty, + nowMillis, + category, + response.datapoints() + ) + } } } else { + debugger.debugPolled( + metric, + IncomingMatch.Accepted, + nowMillis, + category, + response.datapoints() + ) + response .datapoints() .asScala