diff --git a/atlas-slotting/README.md b/atlas-slotting/README.md index f7d66a17..296f15ae 100644 --- a/atlas-slotting/README.md +++ b/atlas-slotting/README.md @@ -84,7 +84,7 @@ point at which slots are calculated. The instance is sized appropriately for the Healthcheck GET /healthcheck - List of Available AutoScalingGroups + List of AutoScalingGroups GET /api/v1/autoScalingGroups All AutoScalingGroup Details @@ -93,10 +93,16 @@ point at which slots are calculated. The instance is sized appropriately for the Single AutoScalingGroup Details GET /api/v1/autoScalingGroups/:asgName - List of AutoScalingGroups Matching a Cluster Name + List of Clusters + GET /api/v1/clusters + + Map of Clusters to Lists of AutoScalingGroup Details + GET /api/v1/clusters?verbose=true + + List of AutoScalingGroups Matching a Cluster GET /api/v1/clusters/:clusterName - All AutoScalingGroup Details Matching a Cluster Name + All AutoScalingGroup Details Matching a Cluster GET /api/v1/clusters/:clusterName?verbose=true @@ -168,7 +174,11 @@ export NETFLIX_STACK="local" ``` ``` -sbt "project atlas-slotting" clean compile test +# verbose form, from project root +sbt "project atlas-slotting" test + +# short form, from project root +sbt atlas-slotting/test ``` Using `sbt` to `run` the project no longer works for version > 1.5.6, due to [sbt/issues/6767](https://github.com/sbt/sbt/issues/6767). diff --git a/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/DynamoOps.scala b/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/DynamoOps.scala index 071c9e9a..1f612288 100644 --- a/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/DynamoOps.scala +++ b/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/DynamoOps.scala @@ -245,7 +245,7 @@ trait DynamoOps extends StrictLogging { } } - def syncCapacity( + private def syncCapacity( ddbClient: DynamoDbClient, tableName: String, desiredRead: Long, @@ -284,7 +284,7 @@ trait DynamoOps extends StrictLogging { table.tableStatusAsString() } - def createTable( + private def createTable( ddbClient: DynamoDbClient, tableName: String, desiredRead: Long, diff --git a/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/Grouping.scala b/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/Grouping.scala index 8695089c..2e1d4a1d 100644 --- a/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/Grouping.scala +++ b/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/Grouping.scala @@ -139,7 +139,9 @@ trait Grouping extends StrictLogging { * @return * A list of case classes with selected fields representing the Instances. */ - def mkAsgInstanceDetailsList(instances: java.util.List[AsgInstance]): List[AsgInstanceDetails] = { + private def mkAsgInstanceDetailsList( + instances: java.util.List[AsgInstance] + ): List[AsgInstanceDetails] = { instances.asScala.toList .map { i => AsgInstanceDetails( @@ -286,7 +288,7 @@ trait Grouping extends StrictLogging { * @return * A map of instanceIds to slot numbers. */ - def mkSlotMap(asgDetails: SlottedAsgDetails): Map[String, Int] = { + private def mkSlotMap(asgDetails: SlottedAsgDetails): Map[String, Int] = { asgDetails.instances .map(i => i.instanceId -> i.slot) .toMap diff --git a/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingApi.scala b/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingApi.scala index 675dee0f..aafc4f95 100644 --- a/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingApi.scala +++ b/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingApi.scala @@ -38,8 +38,6 @@ import org.apache.pekko.http.scaladsl.server.Route import org.apache.pekko.http.scaladsl.server.RouteResult import org.apache.pekko.http.scaladsl.server.directives.CachingDirectives.* -import scala.util.matching.Regex - class SlottingApi(system: ActorSystem, slottingCache: SlottingCache) extends WebApi with StrictLogging { @@ -69,7 +67,6 @@ class SlottingApi(system: ActorSystem, slottingCache: SlottingCache) extractRequest { request => val compress = useGzip(request) - // standard endpoints pathPrefix("api" / "v1") { endpointPath("autoScalingGroups") { get { @@ -87,13 +84,24 @@ class SlottingApi(system: ActorSystem, slottingCache: SlottingCache) complete(singleAsg(compress, slottingCache, asgName)) } } ~ + endpointPath("clusters") { + get { + parameters("verbose".as[Boolean].?) { verbose => + if (verbose.contains(true)) { + complete(verboseClusterList(compress, slottingCache)) + } else { + complete(indexClusterList(compress, slottingCache)) + } + } + } + } ~ endpointPath("clusters", Remaining) { clusterName => get { parameters("verbose".as[Boolean].?) { verbose => if (verbose.contains(true)) { - complete(verboseClusterList(compress, slottingCache, clusterName)) + complete(verboseSingleClusterList(compress, slottingCache, clusterName)) } else { - complete(indexClusterList(compress, slottingCache, clusterName)) + complete(indexSingleClusterList(compress, slottingCache, clusterName)) } } } @@ -110,10 +118,6 @@ class SlottingApi(system: ActorSystem, slottingCache: SlottingCache) object SlottingApi { - val autoScalingGroupsExpand: Regex = "autoScalingGroups(?:;_expand.*|;_pp;_expand.*)".r - - val stripEddaArgs: Regex = "(?:;_.*|:\\(.*)".r - private def useGzip(request: HttpRequest): Boolean = { request.headers.exists { case enc: `Accept-Encoding` => enc.encodings.exists(_.matches(HttpEncodings.gzip)) @@ -121,7 +125,7 @@ object SlottingApi { } } - def mkResponse(compress: Boolean, statusCode: StatusCode, data: Any): HttpResponse = { + private def mkResponse(compress: Boolean, statusCode: StatusCode, data: Any): HttpResponse = { // We compress locally rather than relying on the encodeResponse directive to ensure the // cache will have a strict entity that can be reused. if (compress) { @@ -138,15 +142,19 @@ object SlottingApi { } } - def indexAsgList(compress: Boolean, slottingCache: SlottingCache): HttpResponse = { + private def indexAsgList(compress: Boolean, slottingCache: SlottingCache): HttpResponse = { mkResponse(compress, StatusCodes.OK, slottingCache.asgs.keySet) } - def verboseAsgList(compress: Boolean, slottingCache: SlottingCache): HttpResponse = { + private def verboseAsgList(compress: Boolean, slottingCache: SlottingCache): HttpResponse = { mkResponse(compress, StatusCodes.OK, slottingCache.asgs.values.toList) } - def singleAsg(compress: Boolean, slottingCache: SlottingCache, asgName: String): HttpResponse = { + private def singleAsg( + compress: Boolean, + slottingCache: SlottingCache, + asgName: String + ): HttpResponse = { slottingCache.asgs.get(asgName) match { case Some(slottedAsgDetails) => mkResponse(compress, StatusCodes.OK, slottedAsgDetails) @@ -155,7 +163,31 @@ object SlottingApi { } } - def indexClusterList( + private def indexClusterList( + compress: Boolean, + slottingCache: SlottingCache + ): HttpResponse = { + mkResponse( + compress, + StatusCodes.OK, + slottingCache.asgs.keySet.map { asgName => + ServerGroup.parse(asgName).cluster() + } + ) + } + + private def verboseClusterList( + compress: Boolean, + slottingCache: SlottingCache + ): HttpResponse = { + mkResponse( + compress, + StatusCodes.OK, + slottingCache.asgs.values.toList.groupBy(_.cluster) + ) + } + + private def indexSingleClusterList( compress: Boolean, slottingCache: SlottingCache, clusterName: String @@ -169,7 +201,7 @@ object SlottingApi { ) } - def verboseClusterList( + private def verboseSingleClusterList( compress: Boolean, slottingCache: SlottingCache, clusterName: String @@ -183,7 +215,7 @@ object SlottingApi { ) } - def serviceDescription(request: HttpRequest): HttpResponse = { + private def serviceDescription(request: HttpRequest): HttpResponse = { val scheme = request.uri.scheme val host = request.headers.filter(_.name == "Host").map(_.value).head @@ -197,6 +229,8 @@ object SlottingApi { s"$scheme://$host/api/v1/autoScalingGroups", s"$scheme://$host/api/v1/autoScalingGroups?verbose=true", s"$scheme://$host/api/v1/autoScalingGroups/:name", + s"$scheme://$host/api/v1/clusters", + s"$scheme://$host/api/v1/clusters?verbose=true", s"$scheme://$host/api/v1/clusters/:name", s"$scheme://$host/api/v1/clusters/:name?verbose=true" ) diff --git a/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingCache.scala b/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingCache.scala index cc108997..6b99e762 100644 --- a/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingCache.scala +++ b/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingCache.scala @@ -17,7 +17,7 @@ package com.netflix.atlas.slotting import scala.collection.immutable.SortedMap -class SlottingCache() { +class SlottingCache { @volatile var asgs: SortedMap[String, SlottedAsgDetails] = SortedMap.empty[String, SlottedAsgDetails] diff --git a/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingService.scala b/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingService.scala index cfe83554..12f5e2ed 100644 --- a/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingService.scala +++ b/atlas-slotting/src/main/scala/com/netflix/atlas/slotting/SlottingService.scala @@ -49,7 +49,7 @@ class SlottingService( with DynamoOps with StrictLogging { - type Item = java.util.Map[String, AttributeValue] + private type Item = java.util.Map[String, AttributeValue] private val clock = registry.clock() @@ -78,7 +78,7 @@ class SlottingService( .withId(registry.createId("last.update", "id", "asgs")) .monitorValue(new AtomicLong(clock.wallTime()), Functions.AGE) - def crawlAsgsTask(): Unit = { + private def crawlAsgsTask(): Unit = { if (!asgsAvailable) { val start = registry.clock().monotonicTime() var elapsed = 0L @@ -109,7 +109,7 @@ class SlottingService( * on the minute boundary when the task runs. * */ - def crawlAutoScalingGroups( + private def crawlAutoScalingGroups( pageSize: Int, includedApps: Set[String] ): Map[String, AsgDetails] = { @@ -148,7 +148,7 @@ class SlottingService( .withId(registry.createId("last.update", "id", "instances")) .monitorValue(new AtomicLong(clock.wallTime()), Functions.AGE) - def crawlInstancesTask(): Unit = { + private def crawlInstancesTask(): Unit = { if (!instanceInfoAvailable) { val start = registry.clock().monotonicTime() var elapsed = 0L @@ -181,7 +181,7 @@ class SlottingService( * on the minute boundary when the task runs. * */ - def crawlInstances(pageSize: Int): Map[String, Ec2InstanceDetails] = { + private def crawlInstances(pageSize: Int): Map[String, Ec2InstanceDetails] = { val request = DescribeInstancesRequest .builder() .maxResults(pageSize) @@ -235,7 +235,7 @@ class SlottingService( private val cutoffInterval = config.getDuration("slotting.cutoff-interval") - def janitorTask(): Unit = { + private def janitorTask(): Unit = { var count = 0 val request = oldItemsScanRequest(tableName, cutoffInterval) ddbClient @@ -281,11 +281,11 @@ class SlottingService( override def stopImpl(): Unit = {} - def allDataAvailable: Boolean = { + private def allDataAvailable: Boolean = { asgsAvailable && instanceInfoAvailable } - def fmtTime(elapsed: Long): String = { + private def fmtTime(elapsed: Long): String = { f"${elapsed / 1000000000d}%.2f seconds" } @@ -301,7 +301,7 @@ class SlottingService( * set of remaining ASGs is updated while updating slots, this operation will be performed after crawling * ASGs if the instance data is available. */ - def updateSlots(): Unit = { + private def updateSlots(): Unit = { // Instance data is updated in another thread. Capture it to ensure there is a consistent copy used for // this update operation. val instanceData = instanceInfo @@ -333,7 +333,7 @@ class SlottingService( private val slotsChangedId = registry.createId("slots.changed") private val slotsErrorsId = registry.createId("slots.errors") - def updateItem( + private def updateItem( name: String, item: Item, newAsgDetails: AsgDetails, @@ -369,7 +369,7 @@ class SlottingService( remainingAsgs = remainingAsgs - name } - def deactivateItem(name: String): Unit = { + private def deactivateItem(name: String): Unit = { try { logger.info(s"deactivate asg $name") ddbClient.updateItem(deactivateAsgItemRequest(tableName, name)) @@ -380,7 +380,7 @@ class SlottingService( } } - def addItem(name: String, newAsgDetails: AsgDetails): Unit = { + private def addItem(name: String, newAsgDetails: AsgDetails): Unit = { val newData = mkNewDataAssignSlots(newAsgDetails, instanceInfo) try { logger.info(s"assign slots for asg $name") diff --git a/atlas-slotting/src/test/resources/atlas_app-main-all-v002.json b/atlas-slotting/src/test/resources/atlas_app-main-all-v002.json new file mode 100644 index 00000000..f27bfe57 --- /dev/null +++ b/atlas-slotting/src/test/resources/atlas_app-main-all-v002.json @@ -0,0 +1,40 @@ +{ + "name": "atlas_app-main-all-v002", + "cluster": "atlas_app-main-all", + "createdTime": 1552023610994, + "desiredCapacity": 3, + "maxSize": 6, + "minSize": 0, + "instances": [ + { + "availabilityZone": "us-west-2b", + "imageId": "ami-001", + "instanceId": "i-101", + "instanceType": "r4.large", + "launchTime": 1552023619000, + "lifecycleState": "InService", + "privateIpAddress": "192.168.2.1", + "slot": 0 + }, + { + "availabilityZone": "us-west-2a", + "imageId": "ami-001", + "instanceId": "i-102", + "instanceType": "r4.large", + "launchTime": 1552023619000, + "lifecycleState": "InService", + "privateIpAddress": "192.168.2.2", + "slot": 1 + }, + { + "availabilityZone": "us-west-2b", + "imageId": "ami-001", + "instanceId": "i-103", + "instanceType": "r4.large", + "launchTime": 1552023619000, + "lifecycleState": "InService", + "privateIpAddress": "192.168.2.3", + "slot": 2 + } + ] +} \ No newline at end of file diff --git a/atlas-slotting/src/test/scala/com/netflix/atlas/slotting/SlottingApiSuite.scala b/atlas-slotting/src/test/scala/com/netflix/atlas/slotting/SlottingApiSuite.scala index a1d394a4..df938392 100644 --- a/atlas-slotting/src/test/scala/com/netflix/atlas/slotting/SlottingApiSuite.scala +++ b/atlas-slotting/src/test/scala/com/netflix/atlas/slotting/SlottingApiSuite.scala @@ -58,11 +58,11 @@ class SlottingApiSuite extends MUnitRouteSuite { val endpoints = description("endpoints").asInstanceOf[List[String]] assertResponse(response, StatusCodes.OK) assertEquals(description("description"), "Atlas Slotting Service") - assertEquals(endpoints.size, 6) + assertEquals(endpoints.size, 8) } } - test("bad url parameters (standard api)") { + test("bad url parameters - known parameter with unexpected value is a bad request") { val malformed = "MalformedQueryParamRejection(verbose,'foo' is not a valid Boolean value,None)" Get("/api/v1/autoScalingGroups?verbose=foo") ~> routes ~> check { @@ -70,6 +70,9 @@ class SlottingApiSuite extends MUnitRouteSuite { val res = Json.decode[Message](responseAs[String]) assertEquals(res, Message(malformed)) } + } + + test("bad url parameters - unknown parameter is ignored") { Get("/api/v1/autoScalingGroups?foo=bar") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[String]](responseAs[String]) @@ -77,32 +80,55 @@ class SlottingApiSuite extends MUnitRouteSuite { } } - test("no cache data (standard api)") { + test("no cache data - all autoScalingGroups") { Get("/api/v1/autoScalingGroups") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[String]](responseAs[String]) assertEquals(res, List.empty[String]) } + } + + test("no cache data - all autoScalingGroups, verbose") { Get("/api/v1/autoScalingGroups?verbose=true") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[String]](responseAs[String]) assertEquals(res, List.empty[String]) } + } + + test("no cache data - one autoScalingGroup") { Get("/api/v1/autoScalingGroups/atlas_app-main-all-v001") ~> routes ~> check { assertResponse(response, StatusCodes.NotFound) val res = Json.decode[Message](responseAs[String]) assertEquals(res, Message("Not Found")) } + } + + test("no cache data - all clusters") { Get("/api/v1/clusters") ~> routes ~> check { - assertResponse(response, StatusCodes.NotFound) - val res = Json.decode[Message](responseAs[String]) - assertEquals(res, Message("path not found: /api/v1/clusters")) + assertResponse(response, StatusCodes.OK) + val res = Json.decode[List[String]](responseAs[String]) + assertEquals(res, List.empty[String]) + } + } + + test("no cache data - all clusters, verbose") { + Get("/api/v1/clusters?verbose=true") ~> routes ~> check { + assertResponse(response, StatusCodes.OK) + val res = Json.decode[Map[String, List[SlottedAsgDetails]]](responseAs[String]) + assertEquals(res, Map.empty[String, List[SlottedAsgDetails]]) } + } + + test("no cache data - one cluster") { Get("/api/v1/clusters/atlas_app-main-all") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[String]](responseAs[String]) assertEquals(res, List.empty[String]) } + } + + test("no cache data - one cluster, verbose") { Get("/api/v1/clusters/atlas_app-main-all?verbose=true") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[String]](responseAs[String]) @@ -113,23 +139,44 @@ class SlottingApiSuite extends MUnitRouteSuite { test("load cache") { slottingCache.asgs = SortedMap( "atlas_app-main-all-v001" -> loadSlottedAsgDetails("/atlas_app-main-all-v001.json"), + "atlas_app-main-all-v002" -> loadSlottedAsgDetails("/atlas_app-main-all-v002.json"), "atlas_app-main-none-v001" -> loadSlottedAsgDetails("/atlas_app-main-none-v001.json") ) - assertEquals(slottingCache.asgs.size, 2) + assertEquals(slottingCache.asgs.size, 3) } - test("cache data (standard api)") { + test("cache data - all autoScalingGroups") { Get("/api/v1/autoScalingGroups") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[String]](responseAs[String]) - assertEquals(res, List("atlas_app-main-all-v001", "atlas_app-main-none-v001")) + assertEquals( + res, + List( + "atlas_app-main-all-v001", + "atlas_app-main-all-v002", + "atlas_app-main-none-v001" + ) + ) } + } + + test("cache data - all autoScalingGroups, verbose") { Get("/api/v1/autoScalingGroups?verbose=true") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[SlottedAsgDetails]](responseAs[String]) - assertEquals(res.map(_.name), List("atlas_app-main-all-v001", "atlas_app-main-none-v001")) + assertEquals( + res.map(_.name), + List( + "atlas_app-main-all-v001", + "atlas_app-main-all-v002", + "atlas_app-main-none-v001" + ) + ) } + } + + test("cache data - one autoScalingGroup") { Get("/api/v1/autoScalingGroups/atlas_app-main-all-v001") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[SlottedAsgDetails](responseAs[String]) @@ -151,25 +198,80 @@ class SlottingApiSuite extends MUnitRouteSuite { assertEquals(res.instances.head.instanceType, "r4.large") assertEquals(res.instances.head.lifecycleState, "InService") } - Get("/api/v1/clusters/atlas_app-main") ~> routes ~> check { + } + + test("cache data - all clusters") { + Get("/api/v1/clusters") ~> routes ~> check { + assertResponse(response, StatusCodes.OK) + val res = Json.decode[List[String]](responseAs[String]) + assertEquals( + res, + List( + "atlas_app-main-all", + "atlas_app-main-none" + ) + ) + } + } + + test("cache data - all clusters, verbose") { + Get("/api/v1/clusters?verbose=true") ~> routes ~> check { + assertResponse(response, StatusCodes.OK) + println(responseAs[String]) + val res = Json.decode[Map[String, List[SlottedAsgDetails]]](responseAs[String]) + assertEquals(res.size, 2) + assertEquals(res("atlas_app-main-all").size, 2) + assertEquals( + res("atlas_app-main-all").map(_.name), + List( + "atlas_app-main-all-v001", + "atlas_app-main-all-v002" + ) + ) + } + } + + test("cache data - one cluster, NOT exists") { + Get("/api/v1/clusters/atlas_app-foo") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[String]](responseAs[String]) assertEquals(res, List.empty[String]) } + } + + test("cache data - one cluster, exists") { Get("/api/v1/clusters/atlas_app-main-all") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[String]](responseAs[String]) - assertEquals(res, List("atlas_app-main-all-v001")) + assertEquals( + res, + List( + "atlas_app-main-all-v001", + "atlas_app-main-all-v002" + ) + ) } + } + + test("cache data - another cluster, exists") { Get("/api/v1/clusters/atlas_app-main-none") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[String]](responseAs[String]) assertEquals(res, List("atlas_app-main-none-v001")) } + } + + test("cache data - one cluster, verbose") { Get("/api/v1/clusters/atlas_app-main-all?verbose=true") ~> routes ~> check { assertResponse(response, StatusCodes.OK) val res = Json.decode[List[SlottedAsgDetails]](responseAs[String]) - assertEquals(res.map(_.name), List("atlas_app-main-all-v001")) + assertEquals( + res.map(_.name), + List( + "atlas_app-main-all-v001", + "atlas_app-main-all-v002" + ) + ) } } }