From 21dead4e7bab12b8c053e21ab3ccc0942b9c1b12 Mon Sep 17 00:00:00 2001 From: brharrington Date: Fri, 15 Mar 2024 14:17:18 -0500 Subject: [PATCH] aggr: set dstype when loading tags (#542) This avoids some allocations to add it in later when the counter or max gauge is created. --- .../src/main/resources/application.conf | 4 --- .../atlas/aggregator/PayloadDecoder.scala | 29 ++++++++++--------- .../atlas/aggregator/UpdateApiSuite.scala | 13 ++++++--- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/atlas-aggregator/src/main/resources/application.conf b/atlas-aggregator/src/main/resources/application.conf index 3ca3afd9..a4ee8a2f 100644 --- a/atlas-aggregator/src/main/resources/application.conf +++ b/atlas-aggregator/src/main/resources/application.conf @@ -29,10 +29,6 @@ atlas.aggregator { // Should the aggregation of gauges be delayed until the final eval step? delay-gauge-aggregation = true - // Determines whether or not to include the aggregator node as a tag on counters. - // If false it will use atlas.dstype=sum instead. - include-aggr-tag = false - allowed-characters = "-._A-Za-z0-9^~" validation { diff --git a/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/PayloadDecoder.scala b/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/PayloadDecoder.scala index 43bd5545..9936e9bb 100644 --- a/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/PayloadDecoder.scala +++ b/atlas-aggregator/src/main/scala/com/netflix/atlas/aggregator/PayloadDecoder.scala @@ -98,17 +98,16 @@ class PayloadDecoder( numDatapoints += 1 val numTags = parser.getIntValue val result = loadTags(numTags, strings, nameIdx, parser) - val op = nextInt(parser) val value = nextDouble(parser) result match { case Left(vr) => validationResults += vr - case Right(id) => + case Right((id, op)) => op match { case ADD => // Add the aggr tag to avoid values getting deduped on the backend logger.debug(s"received updated, ADD $id $value") - aggregator.add(id.withTag(aggrTag), value) + aggregator.add(id, value) case MAX => logger.debug(s"received updated, MAX $id $value") aggregator.max(id, value) @@ -147,11 +146,11 @@ class PayloadDecoder( strings: Array[String], nameIdx: Int, parser: JsonParser - ): Either[ValidationResult, Id] = { + ): Either[ValidationResult, (Id, Int)] = { var result: String = TagRule.Pass var numUserTags = 0 var name: String = null - val tags = new Array[String](2 * n) + val tags = new Array[String](2 * n + 2) var pos = 0 var i = 0 while (i < n) { @@ -179,9 +178,18 @@ class PayloadDecoder( i += 1 } + val op = nextInt(parser) + val dsType = op match { + case ADD => "sum" + case MAX => "gauge" + } + tags(pos) = TagKey.dsType + tags(pos + 1) = dsType + pos += 2 + if (name == null) { // This should never happen if clients are working properly - val builder = new SmallHashMap.Builder[String, String](n) + val builder = new SmallHashMap.Builder[String, String](n + 1) var i = 0 while (i < pos) { builder.add(tags(i), tags(i + 1)) @@ -202,7 +210,7 @@ class PayloadDecoder( ) ) } else { - Right(rollupFunction(id)) + Right(rollupFunction(id) -> op) } } else { // Tag rule check failed @@ -249,13 +257,6 @@ object PayloadDecoder { rollupPolicies = idx } - private val aggrTag = { - if (config.getBoolean("include-aggr-tag")) - Tag.of("atlas.aggr", NetflixEnvironment.instanceId()) - else - Tag.of(TagKey.dsType, "sum") - } - private val maxUserTags = config.getInt("validation.max-user-tags") private val validator = { diff --git a/atlas-aggregator/src/test/scala/com/netflix/atlas/aggregator/UpdateApiSuite.scala b/atlas-aggregator/src/test/scala/com/netflix/atlas/aggregator/UpdateApiSuite.scala index da32912a..0b982df0 100644 --- a/atlas-aggregator/src/test/scala/com/netflix/atlas/aggregator/UpdateApiSuite.scala +++ b/atlas-aggregator/src/test/scala/com/netflix/atlas/aggregator/UpdateApiSuite.scala @@ -240,7 +240,10 @@ class UpdateApiSuite extends FunSuite { val tags = SmallHashMap("foo" -> "bar") val msg = validationTest(tags, StatusCodes.BadRequest) assertEquals(msg.errorCount, 1) - assertEquals(msg.message, List("missing key 'name' (tags={\"foo\":\"bar\"})")) + assertEquals( + msg.message, + List("missing key 'name' (tags={\"foo\":\"bar\",\"atlas.dstype\":\"sum\"})") + ) } test("validation: name too long") { @@ -250,7 +253,9 @@ class UpdateApiSuite extends FunSuite { assertEquals(msg.errorCount, 1) assertEquals( msg.message, - List(s"value too long: name = [$name] (300 > 255) (tags={\"name\":\"$name\"})") + List( + s"value too long: name = [$name] (300 > 255) (tags={\"name\":\"$name\",\"atlas.dstype\":\"sum\"})" + ) ) } @@ -278,7 +283,7 @@ class UpdateApiSuite extends FunSuite { assertEquals( msg.message, List( - "invalid key for reserved prefix 'nf.': nf.foo (tags={\"name\":\"test\",\"nf.foo\":\"bar\"})" + "invalid key for reserved prefix 'nf.': nf.foo (tags={\"name\":\"test\",\"atlas.dstype\":\"sum\",\"nf.foo\":\"bar\"})" ) ) } @@ -293,7 +298,7 @@ class UpdateApiSuite extends FunSuite { assertEquals( msg.message, List( - "invalid key for reserved prefix 'nf.': nf.foo (tags={\"name\":\"test\",\"nf.foo\":\"bar\"})" + "invalid key for reserved prefix 'nf.': nf.foo (tags={\"name\":\"test\",\"atlas.dstype\":\"sum\",\"nf.foo\":\"bar\"})" ) ) }