Skip to content

Commit

Permalink
lwc-cloudwatch: use async AWS client (#598)
Browse files Browse the repository at this point in the history
Switch to using async client for CloudWatch puts.
  • Loading branch information
brharrington authored Oct 29, 2024
1 parent f8be43f commit 1f8ab1d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.patterns.PolledMeter
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.cloudwatch.model.Dimension
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataRequest
Expand Down Expand Up @@ -118,12 +118,13 @@ class ForwardingService(
region: String,
account: String,
request: PutMetricDataRequest
): PutMetricDataResponse = {
): Future[PutMetricDataResponse] = {
import scala.jdk.FutureConverters.*
if (putEnabled) {
val cwClient = clientFactory.getInstance(region, classOf[CloudWatchClient], account)
cwClient.putMetricData(request)
val cwClient = clientFactory.getInstance(region, classOf[CloudWatchAsyncClient], account)
cwClient.putMetricData(request).asScala
} else {
PutMetricDataResponse.builder().build()
Future.successful(PutMetricDataResponse.builder().build())
}
}

Expand All @@ -144,7 +145,7 @@ class ForwardingService(
numDataSources.set(dss.sources().size())
dss
}
.via(Flow.fromProcessor(() => evaluator.createStreamsProcessor()))
.via(evaluator.createStreamsFlow)
.via(toMetricDatum(config, registry, configStats))
.via(sendToCloudWatch(lastSuccessfulPutTime, namespace, put))
.via(sendToAdmin(adminUri, client))
Expand Down Expand Up @@ -173,11 +174,6 @@ class ForwardingService(

object ForwardingService extends StrictLogging {

// Pool for CloudWatch requests
private val executor =
Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("aws-publisher").factory())
private val awsEC: ExecutionContext = ExecutionContext.fromExecutor(executor)

//
// Constants for interacting with CloudWatch. For more details see:
//
Expand Down Expand Up @@ -206,7 +202,7 @@ object ForwardingService extends StrictLogging {
// Helpers for constructing parts of the stream
//

type PutFunction = (String, String, PutMetricDataRequest) => PutMetricDataResponse
type PutFunction = (String, String, PutMetricDataRequest) => Future[PutMetricDataResponse]

type Client = Flow[(HttpRequest, AccessLogger), (Try[HttpResponse], AccessLogger), NotUsed]

Expand Down Expand Up @@ -439,7 +435,7 @@ object ForwardingService extends StrictLogging {

val region = data.head.accountDatum.get.region
val account = data.head.accountDatum.get.account
val future = Future(doPut(region, account, request))(awsEC)
val future = doPut(region, account, request)
Source
.future(future)
.map { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,9 @@ class ForwardingServiceSuite extends FunSuite {
region: String,
account: String,
request: PutMetricDataRequest
): PutMetricDataResponse = {
): Future[PutMetricDataResponse] = {
requests += AccountRequest(region, account, request)
PutMetricDataResponse.builder().build()
Future.successful(PutMetricDataResponse.builder().build())
}
val future = Source(msgs)
.via(sendToCloudWatch(new AtomicLong(), ns, doPut))
Expand Down

0 comments on commit 1f8ab1d

Please sign in to comment.