Skip to content

Commit

Permalink
add api endpoints for clusters (#601)
Browse files Browse the repository at this point in the history
Provide a few additional views of the data, to assist with exploration and
an alternate means of parsing.

Fix warnings for methods that can be marked private.
  • Loading branch information
copperlight authored Nov 2, 2024
1 parent a1b999f commit 4fd634e
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 50 deletions.
18 changes: 14 additions & 4 deletions atlas-slotting/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ point at which slots are calculated. The instance is sized appropriately for the
<td width="30%">Healthcheck
<td><code>GET /healthcheck</code>
<tr>
<td width="30%">List of Available AutoScalingGroups
<td width="30%">List of AutoScalingGroups
<td><code>GET /api/v1/autoScalingGroups</code>
<tr>
<td width="30%">All AutoScalingGroup Details
Expand All @@ -93,10 +93,16 @@ point at which slots are calculated. The instance is sized appropriately for the
<td width="30%">Single AutoScalingGroup Details
<td><code>GET /api/v1/autoScalingGroups/:asgName</code>
<tr>
<td width="30%">List of AutoScalingGroups Matching a Cluster Name
<td width="30%">List of Clusters
<td><code>GET /api/v1/clusters</code>
<tr>
<td width="30%">Map of Clusters to Lists of AutoScalingGroup Details
<td><code>GET /api/v1/clusters?verbose=true</code>
<tr>
<td width="30%">List of AutoScalingGroups Matching a Cluster
<td><code>GET /api/v1/clusters/:clusterName</code>
<tr>
<td width="30%">All AutoScalingGroup Details Matching a Cluster Name
<td width="30%">All AutoScalingGroup Details Matching a Cluster
<td><code>GET /api/v1/clusters/:clusterName?verbose=true</code>
</table>

Expand Down Expand Up @@ -168,7 +174,11 @@ export NETFLIX_STACK="local"
```

```
sbt "project atlas-slotting" clean compile test
# verbose form, from project root
sbt "project atlas-slotting" test
# short form, from project root
sbt atlas-slotting/test
```

Using `sbt` to `run` the project no longer works for version > 1.5.6, due to [sbt/issues/6767](https://github.com/sbt/sbt/issues/6767).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ trait DynamoOps extends StrictLogging {
}
}

def syncCapacity(
private def syncCapacity(
ddbClient: DynamoDbClient,
tableName: String,
desiredRead: Long,
Expand Down Expand Up @@ -284,7 +284,7 @@ trait DynamoOps extends StrictLogging {
table.tableStatusAsString()
}

def createTable(
private def createTable(
ddbClient: DynamoDbClient,
tableName: String,
desiredRead: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ trait Grouping extends StrictLogging {
* @return
* A list of case classes with selected fields representing the Instances.
*/
def mkAsgInstanceDetailsList(instances: java.util.List[AsgInstance]): List[AsgInstanceDetails] = {
private def mkAsgInstanceDetailsList(
instances: java.util.List[AsgInstance]
): List[AsgInstanceDetails] = {
instances.asScala.toList
.map { i =>
AsgInstanceDetails(
Expand Down Expand Up @@ -286,7 +288,7 @@ trait Grouping extends StrictLogging {
* @return
* A map of instanceIds to slot numbers.
*/
def mkSlotMap(asgDetails: SlottedAsgDetails): Map[String, Int] = {
private def mkSlotMap(asgDetails: SlottedAsgDetails): Map[String, Int] = {
asgDetails.instances
.map(i => i.instanceId -> i.slot)
.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ import org.apache.pekko.http.scaladsl.server.Route
import org.apache.pekko.http.scaladsl.server.RouteResult
import org.apache.pekko.http.scaladsl.server.directives.CachingDirectives.*

import scala.util.matching.Regex

class SlottingApi(system: ActorSystem, slottingCache: SlottingCache)
extends WebApi
with StrictLogging {
Expand Down Expand Up @@ -69,7 +67,6 @@ class SlottingApi(system: ActorSystem, slottingCache: SlottingCache)
extractRequest { request =>
val compress = useGzip(request)

// standard endpoints
pathPrefix("api" / "v1") {
endpointPath("autoScalingGroups") {
get {
Expand All @@ -87,13 +84,24 @@ class SlottingApi(system: ActorSystem, slottingCache: SlottingCache)
complete(singleAsg(compress, slottingCache, asgName))
}
} ~
endpointPath("clusters") {
get {
parameters("verbose".as[Boolean].?) { verbose =>
if (verbose.contains(true)) {
complete(verboseClusterList(compress, slottingCache))
} else {
complete(indexClusterList(compress, slottingCache))
}
}
}
} ~
endpointPath("clusters", Remaining) { clusterName =>
get {
parameters("verbose".as[Boolean].?) { verbose =>
if (verbose.contains(true)) {
complete(verboseClusterList(compress, slottingCache, clusterName))
complete(verboseSingleClusterList(compress, slottingCache, clusterName))
} else {
complete(indexClusterList(compress, slottingCache, clusterName))
complete(indexSingleClusterList(compress, slottingCache, clusterName))
}
}
}
Expand All @@ -110,18 +118,14 @@ class SlottingApi(system: ActorSystem, slottingCache: SlottingCache)

object SlottingApi {

val autoScalingGroupsExpand: Regex = "autoScalingGroups(?:;_expand.*|;_pp;_expand.*)".r

val stripEddaArgs: Regex = "(?:;_.*|:\\(.*)".r

private def useGzip(request: HttpRequest): Boolean = {
request.headers.exists {
case enc: `Accept-Encoding` => enc.encodings.exists(_.matches(HttpEncodings.gzip))
case _ => false
}
}

def mkResponse(compress: Boolean, statusCode: StatusCode, data: Any): HttpResponse = {
private def mkResponse(compress: Boolean, statusCode: StatusCode, data: Any): HttpResponse = {
// We compress locally rather than relying on the encodeResponse directive to ensure the
// cache will have a strict entity that can be reused.
if (compress) {
Expand All @@ -138,15 +142,19 @@ object SlottingApi {
}
}

def indexAsgList(compress: Boolean, slottingCache: SlottingCache): HttpResponse = {
private def indexAsgList(compress: Boolean, slottingCache: SlottingCache): HttpResponse = {
mkResponse(compress, StatusCodes.OK, slottingCache.asgs.keySet)
}

def verboseAsgList(compress: Boolean, slottingCache: SlottingCache): HttpResponse = {
private def verboseAsgList(compress: Boolean, slottingCache: SlottingCache): HttpResponse = {
mkResponse(compress, StatusCodes.OK, slottingCache.asgs.values.toList)
}

def singleAsg(compress: Boolean, slottingCache: SlottingCache, asgName: String): HttpResponse = {
private def singleAsg(
compress: Boolean,
slottingCache: SlottingCache,
asgName: String
): HttpResponse = {
slottingCache.asgs.get(asgName) match {
case Some(slottedAsgDetails) =>
mkResponse(compress, StatusCodes.OK, slottedAsgDetails)
Expand All @@ -155,7 +163,31 @@ object SlottingApi {
}
}

def indexClusterList(
private def indexClusterList(
compress: Boolean,
slottingCache: SlottingCache
): HttpResponse = {
mkResponse(
compress,
StatusCodes.OK,
slottingCache.asgs.keySet.map { asgName =>
ServerGroup.parse(asgName).cluster()
}
)
}

private def verboseClusterList(
compress: Boolean,
slottingCache: SlottingCache
): HttpResponse = {
mkResponse(
compress,
StatusCodes.OK,
slottingCache.asgs.values.toList.groupBy(_.cluster)
)
}

private def indexSingleClusterList(
compress: Boolean,
slottingCache: SlottingCache,
clusterName: String
Expand All @@ -169,7 +201,7 @@ object SlottingApi {
)
}

def verboseClusterList(
private def verboseSingleClusterList(
compress: Boolean,
slottingCache: SlottingCache,
clusterName: String
Expand All @@ -183,7 +215,7 @@ object SlottingApi {
)
}

def serviceDescription(request: HttpRequest): HttpResponse = {
private def serviceDescription(request: HttpRequest): HttpResponse = {
val scheme = request.uri.scheme
val host = request.headers.filter(_.name == "Host").map(_.value).head

Expand All @@ -197,6 +229,8 @@ object SlottingApi {
s"$scheme://$host/api/v1/autoScalingGroups",
s"$scheme://$host/api/v1/autoScalingGroups?verbose=true",
s"$scheme://$host/api/v1/autoScalingGroups/:name",
s"$scheme://$host/api/v1/clusters",
s"$scheme://$host/api/v1/clusters?verbose=true",
s"$scheme://$host/api/v1/clusters/:name",
s"$scheme://$host/api/v1/clusters/:name?verbose=true"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package com.netflix.atlas.slotting

import scala.collection.immutable.SortedMap

class SlottingCache() {
class SlottingCache {

@volatile
var asgs: SortedMap[String, SlottedAsgDetails] = SortedMap.empty[String, SlottedAsgDetails]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SlottingService(
with DynamoOps
with StrictLogging {

type Item = java.util.Map[String, AttributeValue]
private type Item = java.util.Map[String, AttributeValue]

private val clock = registry.clock()

Expand Down Expand Up @@ -78,7 +78,7 @@ class SlottingService(
.withId(registry.createId("last.update", "id", "asgs"))
.monitorValue(new AtomicLong(clock.wallTime()), Functions.AGE)

def crawlAsgsTask(): Unit = {
private def crawlAsgsTask(): Unit = {
if (!asgsAvailable) {
val start = registry.clock().monotonicTime()
var elapsed = 0L
Expand Down Expand Up @@ -109,7 +109,7 @@ class SlottingService(
* on the minute boundary when the task runs.
*
*/
def crawlAutoScalingGroups(
private def crawlAutoScalingGroups(
pageSize: Int,
includedApps: Set[String]
): Map[String, AsgDetails] = {
Expand Down Expand Up @@ -148,7 +148,7 @@ class SlottingService(
.withId(registry.createId("last.update", "id", "instances"))
.monitorValue(new AtomicLong(clock.wallTime()), Functions.AGE)

def crawlInstancesTask(): Unit = {
private def crawlInstancesTask(): Unit = {
if (!instanceInfoAvailable) {
val start = registry.clock().monotonicTime()
var elapsed = 0L
Expand Down Expand Up @@ -181,7 +181,7 @@ class SlottingService(
* on the minute boundary when the task runs.
*
*/
def crawlInstances(pageSize: Int): Map[String, Ec2InstanceDetails] = {
private def crawlInstances(pageSize: Int): Map[String, Ec2InstanceDetails] = {
val request = DescribeInstancesRequest
.builder()
.maxResults(pageSize)
Expand Down Expand Up @@ -235,7 +235,7 @@ class SlottingService(

private val cutoffInterval = config.getDuration("slotting.cutoff-interval")

def janitorTask(): Unit = {
private def janitorTask(): Unit = {
var count = 0
val request = oldItemsScanRequest(tableName, cutoffInterval)
ddbClient
Expand Down Expand Up @@ -281,11 +281,11 @@ class SlottingService(

override def stopImpl(): Unit = {}

def allDataAvailable: Boolean = {
private def allDataAvailable: Boolean = {
asgsAvailable && instanceInfoAvailable
}

def fmtTime(elapsed: Long): String = {
private def fmtTime(elapsed: Long): String = {
f"${elapsed / 1000000000d}%.2f seconds"
}

Expand All @@ -301,7 +301,7 @@ class SlottingService(
* set of remaining ASGs is updated while updating slots, this operation will be performed after crawling
* ASGs if the instance data is available.
*/
def updateSlots(): Unit = {
private def updateSlots(): Unit = {
// Instance data is updated in another thread. Capture it to ensure there is a consistent copy used for
// this update operation.
val instanceData = instanceInfo
Expand Down Expand Up @@ -333,7 +333,7 @@ class SlottingService(
private val slotsChangedId = registry.createId("slots.changed")
private val slotsErrorsId = registry.createId("slots.errors")

def updateItem(
private def updateItem(
name: String,
item: Item,
newAsgDetails: AsgDetails,
Expand Down Expand Up @@ -369,7 +369,7 @@ class SlottingService(
remainingAsgs = remainingAsgs - name
}

def deactivateItem(name: String): Unit = {
private def deactivateItem(name: String): Unit = {
try {
logger.info(s"deactivate asg $name")
ddbClient.updateItem(deactivateAsgItemRequest(tableName, name))
Expand All @@ -380,7 +380,7 @@ class SlottingService(
}
}

def addItem(name: String, newAsgDetails: AsgDetails): Unit = {
private def addItem(name: String, newAsgDetails: AsgDetails): Unit = {
val newData = mkNewDataAssignSlots(newAsgDetails, instanceInfo)
try {
logger.info(s"assign slots for asg $name")
Expand Down
40 changes: 40 additions & 0 deletions atlas-slotting/src/test/resources/atlas_app-main-all-v002.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"name": "atlas_app-main-all-v002",
"cluster": "atlas_app-main-all",
"createdTime": 1552023610994,
"desiredCapacity": 3,
"maxSize": 6,
"minSize": 0,
"instances": [
{
"availabilityZone": "us-west-2b",
"imageId": "ami-001",
"instanceId": "i-101",
"instanceType": "r4.large",
"launchTime": 1552023619000,
"lifecycleState": "InService",
"privateIpAddress": "192.168.2.1",
"slot": 0
},
{
"availabilityZone": "us-west-2a",
"imageId": "ami-001",
"instanceId": "i-102",
"instanceType": "r4.large",
"launchTime": 1552023619000,
"lifecycleState": "InService",
"privateIpAddress": "192.168.2.2",
"slot": 1
},
{
"availabilityZone": "us-west-2b",
"imageId": "ami-001",
"instanceId": "i-103",
"instanceType": "r4.large",
"launchTime": 1552023619000,
"lifecycleState": "InService",
"privateIpAddress": "192.168.2.3",
"slot": 2
}
]
}
Loading

0 comments on commit 4fd634e

Please sign in to comment.