Skip to content

Commit

Permalink
atlas cloudwatch: Normalize high res scrape timestamps
Browse files Browse the repository at this point in the history
Fix backfills to look at the normalized times
Fix some logging to avoid overcounting stale and backfills
  • Loading branch information
manolama committed Jan 30, 2025
1 parent 2171182 commit 21bc5ca
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ class CloudWatchDebugger(
incomingMatch: IncomingMatch,
timestamp: Long,
category: MetricCategory,
dataPoints: java.util.List[Datapoint] = Collections.emptyList()
dataPoints: java.util.List[Datapoint] = Collections.emptyList(),
dpAdded: Option[Datapoint] = None
): Unit = {
if (config.isEmpty) return

Expand Down Expand Up @@ -273,6 +274,9 @@ class CloudWatchDebugger(
category.dimensions.foreach(d => json.writeString(d))
json.writeEndArray()
json.writeEndObject()
dpAdded.map { dp =>
json.writeNumberField("dpAdded", dp.timestamp().toEpochMilli)
}

if (!dataPoints.isEmpty) {
var step = 0L
Expand Down Expand Up @@ -471,7 +475,8 @@ object IncomingMatch extends Enumeration {

type IncomingMatch = Value

val DroppedNS, DroppedMetric, DroppedTag, DroppedFilter, DroppedOld, DroppedEmpty, Accepted =
val Stale, DroppedNS, DroppedMetric, DroppedTag, DroppedFilter, DroppedOld, DroppedEmpty,
Accepted =
Value
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,20 @@ class CloudWatchPoller(
@Override def run(): Unit = {
try {
var start = now.minusSeconds(minCacheEntries * category.period)
if (category.period < 60) {
val request = if (category.period < 60) {
// normalize timestamps for, hopefully, an improved alignment.
var ts = now.toEpochMilli
ts = ts - (ts % (category.period * 1000))
registry
.counter(hrmRequest.withTags("period", category.period.toString))
.increment()
start = now.minusSeconds(2 * category.period)
start = Instant.ofEpochMilli(ts - (2 * category.period * 1000))
MetricMetadata(category, definition, metric.dimensions.asScala.toList)
.toGetRequest(start, Instant.ofEpochMilli(ts))
} else {
MetricMetadata(category, definition, metric.dimensions.asScala.toList)
.toGetRequest(start, now)
}
val request = MetricMetadata(category, definition, metric.dimensions.asScala.toList)
.toGetRequest(start, now)

val response = client.getMetricStatistics(request)
val dimensions = request.dimensions().asScala.toList.toBuffer
Expand All @@ -430,10 +436,19 @@ class CloudWatchPoller(
debugger.debugPolled(metric, IncomingMatch.DroppedEmpty, nowMillis, category)
} else if (category.period < 60) {
// high res path where we publish to the registry for LWC to take it
response
val dpList = response
.datapoints()
.asScala
.foreach { dp =>
if (dpList.isEmpty) {
debugger.debugPolled(
metric,
IncomingMatch.DroppedEmpty,
request.endTime().toEpochMilli,
category
)
} else {
var foundValidData = false
dpList.foreach { dp =>
val firehoseMetric = FirehoseMetric(
"",
metric.namespace(),
Expand All @@ -448,20 +463,21 @@ class CloudWatchPoller(
debugger.debugPolled(
metric,
IncomingMatch.Accepted,
nowMillis,
request.endTime().toEpochMilli,
category,
response.datapoints()
response.datapoints(),
Some(dp)
)

val metaData =
MetricMetadata(category, definition, toAWSDimensions(firehoseMetric))
registry.counter(polledPublishPath.withTag("path", "registry")).increment()
processor.sendToRegistry(metaData, firehoseMetric, nowMillis)
foundValidData = true

// lag tracking
val now = System.currentTimeMillis()
val ts = dp.timestamp().toEpochMilli
if (now - ts > category.period * 1000) {
if (request.endTime().toEpochMilli - ts > category.period * 1000) {
registry
.counter(
hrmBackfill.withTags(
Expand All @@ -482,33 +498,36 @@ class CloudWatchPoller(
definition.name
)
)
.record(now - ts)
} else if (dp.timestamp().toEpochMilli <= prev) {
registry
.counter(
hrmStale.withTags(
"aws.namespace",
category.namespace,
"aws.metric",
definition.name
)
.record(System.currentTimeMillis() - ts)
}
}

if (!foundValidData) {
registry
.counter(
hrmStale.withTags(
"aws.namespace",
category.namespace,
"aws.metric",
definition.name
)
.increment()
} else {
debugger.debugPolled(
metric,
IncomingMatch.DroppedEmpty,
nowMillis,
category,
response.datapoints()
)
}
.increment()
debugger.debugPolled(
metric,
IncomingMatch.Stale,
request.endTime().toEpochMilli,
category,
response.datapoints()
)
}
}

} else {
debugger.debugPolled(
metric,
IncomingMatch.Accepted,
nowMillis,
request.endTime().toEpochMilli,
category,
response.datapoints()
)
Expand Down

0 comments on commit 21bc5ca

Please sign in to comment.