Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Fixes the issue starport incorrect next_run_time when scheduling fails (
Browse files Browse the repository at this point in the history
  • Loading branch information
realstraw authored Aug 13, 2020
1 parent e450896 commit 307d14a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,19 @@ object StartScheduledPipelines extends StarportActivity {
dispatchedPipelines.inc()
// activation successful - delete the failure counter
db.run(ScheduleFailureCounters().filter(_.pipelineId === pipeline.id.get).delete).waitForResult

// IMPORTANT_TODO: this is a temporary fix, the next runtime should really be updated after
// callint `retrieve`, not here. In a distributed setting where dispatcher is remote we
// cannot assume successfully sending to dispatcher results in success schedule. The reason
// that we cannot simply add this to the updated schedule, is that it currently works to
// add the scheduled information in a batch manner, when process get's killed in the middle
// a large number of pipelins might get rescheduled in the next run, which can be very
// problematic.
//
// update the next run time for this pipeline
updateNextRunTime(pipeline, options)
}

// update the next run time for this pipeline
updateNextRunTime(pipeline, options)
}

def run(options: SchedulerOptions): Unit = {
Expand All @@ -115,7 +124,8 @@ object StartScheduledPipelines extends StarportActivity {
val failedPipelines = metrics.register("counter.failed-pipeline-deployment-count", new Counter())

val taskDispatcher: TaskDispatcher = conf.dispatcherType match {
case "default" => new TaskDispatcherImpl()
case "default" =>
new TaskDispatcherImpl()
case x =>
// pipelines scheduled in a previous run should be fetched here if the dispatcher is remote
throw new NotImplementedError(s"there is no task dispatcher implementation for $x")
Expand Down Expand Up @@ -148,7 +158,16 @@ object StartScheduledPipelines extends StarportActivity {
new ForkJoinPool(parallel * Runtime.getRuntime.availableProcessors)
)

parPipelineModels.foreach(p => processPipeline(taskDispatcher, p, options, localJars(p.jar), dispatchedPipelines, failedPipelines))
parPipelineModels.foreach(p =>
processPipeline(
taskDispatcher,
p,
options,
localJars(p.jar),
dispatchedPipelines,
failedPipelines
)
)

// retrieve the scheduled pipelines and save the information in the db
val scheduledPipelines = taskDispatcher.retrieve(conf)
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "7.0.0"
version in ThisBuild := "7.0.1"

0 comments on commit 307d14a

Please sign in to comment.