Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Switching backend to akka (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
realstraw authored Aug 18, 2020
1 parent 307d14a commit 5feaff6
Show file tree
Hide file tree
Showing 12 changed files with 501 additions and 347 deletions.
19 changes: 19 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
version = 2.6.3

// Line Length
maxColumn = 100 // Line length 100 characters, please note that long string won't be broken up into new lines

// 2 spaces indent for function param definitions
continuationIndent.defnSite = 2

docstrings = JavaDoc

// No need to always do new lines after message definitions
newlines.alwaysBeforeMultilineDef = false

// No align on case matches
align.preset=none

rewrite.rules = [SortImports]

binPack.parentConstructors = Always
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ language: scala
jdk: openjdk8

scala:
- 2.12.6
- 2.12.12

notifications:
email:
Expand Down
15 changes: 9 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
val awsSdkVersion = "1.11.820"
val awsSdkVersion = "1.11.841"
val slickVersion = "3.3.2"
val akkaVersion = "2.6.8"

val scalaTestArtifact = "org.scalatest" %% "scalatest" % "3.2.0" % Test
val scalaTestArtifact = "org.scalatest" %% "scalatest" % "3.2.1" % Test
val slickArtifact = "com.typesafe.slick" %% "slick" % slickVersion
val slickHikaricpArtifact = "com.typesafe.slick" %% "slick-hikaricp" % slickVersion
val scoptArtifact = "com.github.scopt" %% "scopt" % "3.7.1"
Expand All @@ -16,15 +17,16 @@ val awsSdkSSM = "com.amazonaws" % "aws-java-sdk-ssm"
val awsSdkSNS = "com.amazonaws" % "aws-java-sdk-sns" % awsSdkVersion
val awsSdkCloudWatch = "com.amazonaws" % "aws-java-sdk-cloudwatch" % awsSdkVersion
val stubbornArtifact = "com.krux" %% "stubborn" % "1.4.1"
val metricsGraphite = "io.dropwizard.metrics" % "metrics-graphite" % "4.1.10.1"
val metricsGraphite = "io.dropwizard.metrics" % "metrics-graphite" % "4.1.12.1"
val postgreSqlJdbc = "org.postgresql" % "postgresql" % "42.2.14"
val awsLambdaEvents = "com.amazonaws" % "aws-lambda-java-events" % "3.1.0"
val awsLambdaEvents = "com.amazonaws" % "aws-lambda-java-events" % "3.2.0"
val awsLambdaCore = "com.amazonaws" % "aws-lambda-java-core" % "1.2.1"
val cloudwatchMetrics = "io.github.azagniotov" % "dropwizard-metrics-cloudwatch" % "1.0.13"
val akkaActorArtifact = "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion

lazy val commonSettings = Seq(
scalacOptions ++= Seq("-deprecation", "-feature", "-Xlint", "-Xfatal-warnings"),
scalaVersion := "2.12.11",
scalaVersion := "2.12.12",
libraryDependencies += scalaTestArtifact,
organization := "com.krux",
test in assembly := {}, // skip test during assembly
Expand Down Expand Up @@ -62,7 +64,8 @@ lazy val core = (project in file("starport-core")).
stubbornArtifact,
metricsGraphite,
postgreSqlJdbc,
cloudwatchMetrics
cloudwatchMetrics,
akkaActorArtifact
),
fork := true
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package com.krux.starport

import java.time.LocalDateTime
import java.util.concurrent.{ForkJoinPool, TimeUnit}
import java.util.concurrent.TimeUnit

import com.codahale.metrics.{Counter, MetricRegistry}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration._

import com.codahale.metrics.MetricRegistry
import slick.jdbc.PostgresProfile.api._

import com.krux.hyperion.expression.{Duration => HDuration}
import com.krux.starport.cli.{SchedulerOptionParser, SchedulerOptions}
import com.krux.starport.db.record.{Pipeline, ScheduledPipeline, SchedulerMetric}
import com.krux.starport.db.table.{Pipelines, ScheduleFailureCounters, ScheduledPipelines, SchedulerMetrics}
import com.krux.starport.dispatcher.impl.TaskDispatcherImpl
import com.krux.starport.dispatcher.TaskDispatcher
import com.krux.starport.db.record.{Pipeline, SchedulerMetric}
import com.krux.starport.db.table.{Pipelines, SchedulerMetrics}
import com.krux.starport.metric.{ConstantValueGauge, MetricSettings, SimpleTimerGauge}
import com.krux.starport.util.{ErrorHandler, S3FileHandler}
import com.krux.starport.system.ScheduleService
import com.krux.starport.util.S3FileHandler

object StartScheduledPipelines extends StarportActivity {

Expand All @@ -25,8 +24,6 @@ object StartScheduledPipelines extends StarportActivity {

val scheduleTimer = metrics.timer("timers.pipeline_scheduling_time")

val extraEnvs = conf.extraEnvs.toSeq

/**
* return a map of remote jar to local jar
*/
Expand Down Expand Up @@ -59,128 +56,48 @@ object StartScheduledPipelines extends StarportActivity {
result
}

private def updateScheduledPipelines(scheduledPipelines: Seq[ScheduledPipeline]) = {
scheduledPipelines.isEmpty match {
case true => ()
case false =>
val insertAction = DBIO.seq(ScheduledPipelines() ++= scheduledPipelines)
db.run(insertAction).waitForResult
}
}

private def updateNextRunTime(pipelineRecord: Pipeline, options: SchedulerOptions) = {
// update the next runtime in the database
val newNextRunTime = nextRunTime(pipelineRecord.nextRunTime.get, HDuration(pipelineRecord.period), options.scheduledEnd)
val updateQuery = Pipelines().filter(_.id === pipelineRecord.id).map(_.nextRunTime)
logger.debug(s"Update with query ${updateQuery.updateStatement}")
val updateAction = updateQuery.update(Some(newNextRunTime))
db.run(updateAction).waitForResult
}


private def processPipeline(dispatcher: TaskDispatcher, pipeline: Pipeline, options: SchedulerOptions, jar: String, dispatchedPipelines: Counter, failedPipelines: Counter): Unit = {
val timerInst = scheduleTimer.time()
logger.info(s"Dispatching pipleine ${pipeline.name}")

dispatcher.dispatch(pipeline, options, jar, conf) match {
case Left(ex) =>
ErrorHandler.pipelineScheduleFailed(pipeline, ex.getMessage())
logger.warn(
s"failed to deploy pipeline ${pipeline.name} in ${TimeUnit.SECONDS.convert(timerInst.stop(), TimeUnit.NANOSECONDS)}"
)
failedPipelines.inc()
case Right(r) =>
logger.info(
s"dispatched pipeline ${pipeline.name} in ${TimeUnit.SECONDS.convert(timerInst.stop(), TimeUnit.NANOSECONDS)}"
)
dispatchedPipelines.inc()
// activation successful - delete the failure counter
db.run(ScheduleFailureCounters().filter(_.pipelineId === pipeline.id.get).delete).waitForResult

// IMPORTANT_TODO: this is a temporary fix, the next runtime should really be updated after
// callint `retrieve`, not here. In a distributed setting where dispatcher is remote we
// cannot assume successfully sending to dispatcher results in success schedule. The reason
// that we cannot simply add this to the updated schedule, is that it currently works to
// add the scheduled information in a batch manner, when process get's killed in the middle
// a large number of pipelins might get rescheduled in the next run, which can be very
// problematic.
//
// update the next run time for this pipeline
updateNextRunTime(pipeline, options)
}

}

def run(options: SchedulerOptions): Unit = {

logger.info(s"run with options: $options")

val actualStart = options.actualStart
db.run(DBIO.seq(SchedulerMetrics() += SchedulerMetric(actualStart))).waitForResult

// in case of default taskDispatcher: the dispatchedPipelines and succesfulPipelines metrics should be exactly same.
val dispatchedPipelines = metrics.register("counter.successful-pipeline-dispatch-count", new Counter())
val successfulPipelines = metrics.register("counter.successful-pipeline-deployment-count", new Counter())
val failedPipelines = metrics.register("counter.failed-pipeline-deployment-count", new Counter())

val taskDispatcher: TaskDispatcher = conf.dispatcherType match {
case "default" =>
new TaskDispatcherImpl()
case x =>
// pipelines scheduled in a previous run should be fetched here if the dispatcher is remote
throw new NotImplementedError(s"there is no task dispatcher implementation for $x")
}

// fetch the pipelines which may have been scheduled in a previous runs but are not present in the database yet
// this operation is a no-op in case of a local dispatcher like the TaskDispatcherImpl
val previouslyScheduledPipelines = taskDispatcher.retrieve(conf)
successfulPipelines.inc(previouslyScheduledPipelines.length)
updateScheduledPipelines(previouslyScheduledPipelines)

val pipelineModels = pendingPipelineRecords(options.scheduledEnd)
db.run(DBIO.seq(
db.run(
DBIO.seq(
SchedulerMetrics()
.filter(_.startTime === actualStart)
.map(_.pipelineCount)
.update(Option(pipelineModels.size))
))
.waitForResult
)
).waitForResult
metrics.register("gauges.pipeline_count", new ConstantValueGauge(pipelineModels.size))

// TODO: this variable can be moved to the implementation
lazy val localJars = getLocalJars(pipelineModels)

// execute all jars
val parPipelineModels = pipelineModels.par

if (parallel > 0)
parPipelineModels.tasksupport = new ForkJoinTaskSupport(
new ForkJoinPool(parallel * Runtime.getRuntime.availableProcessors)
)

parPipelineModels.foreach(p =>
processPipeline(
taskDispatcher,
p,
Await.result(
ScheduleService.schedule(
options,
localJars(p.jar),
dispatchedPipelines,
failedPipelines
)
localJars,
parallel * Runtime.getRuntime.availableProcessors,
pipelineModels.toList,
conf,
scheduleTimer
),
1.hour
)

// retrieve the scheduled pipelines and save the information in the db
val scheduledPipelines = taskDispatcher.retrieve(conf)
successfulPipelines.inc(scheduledPipelines.length)
updateScheduledPipelines(scheduledPipelines)

db.run(DBIO.seq(
db.run(
DBIO.seq(
SchedulerMetrics()
.filter(_.startTime === actualStart)
.map(_.endTime)
.update(Option(currentTimeUTC().toLocalDateTime))
))
.waitForResult
)
).waitForResult

ScheduleService.terminate
}

/**
Expand All @@ -200,7 +117,7 @@ object StartScheduledPipelines extends StarportActivity {
case None => ErrorExit.invalidCommandlineArguments(logger)
}

val timeSpan = (System.nanoTime - start) / 1E9
val timeSpan = (System.nanoTime - start) / 1e9
logger.info(s"All pipelines scheduled in $timeSpan seconds")
} finally {
mainTimer.stop()
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 5feaff6

Please sign in to comment.