Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Make prefix available in failure notifications (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
realstraw authored Aug 25, 2020
1 parent 5feaff6 commit a767b24
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
val awsSdkVersion = "1.11.841"
val awsSdkVersion = "1.11.848"
val slickVersion = "3.3.2"
val akkaVersion = "2.6.8"

val scalaTestArtifact = "org.scalatest" %% "scalatest" % "3.2.1" % Test
val scalaTestArtifact = "org.scalatest" %% "scalatest" % "3.2.2" % Test
val slickArtifact = "com.typesafe.slick" %% "slick" % slickVersion
val slickHikaricpArtifact = "com.typesafe.slick" %% "slick-hikaricp" % slickVersion
val scoptArtifact = "com.github.scopt" %% "scopt" % "3.7.1"
Expand All @@ -18,7 +18,7 @@ val awsSdkSNS = "com.amazonaws" % "aws-java-sdk-sns"
val awsSdkCloudWatch = "com.amazonaws" % "aws-java-sdk-cloudwatch" % awsSdkVersion
val stubbornArtifact = "com.krux" %% "stubborn" % "1.4.1"
val metricsGraphite = "io.dropwizard.metrics" % "metrics-graphite" % "4.1.12.1"
val postgreSqlJdbc = "org.postgresql" % "postgresql" % "42.2.14"
val postgreSqlJdbc = "org.postgresql" % "postgresql" % "42.2.16"
val awsLambdaEvents = "com.amazonaws" % "aws-lambda-java-events" % "3.2.0"
val awsLambdaCore = "com.amazonaws" % "aws-lambda-java-core" % "1.2.1"
val cloudwatchMetrics = "io.github.azagniotov" % "dropwizard-metrics-cloudwatch" % "1.0.13"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.krux.starport.db.WaitForIt
import com.krux.starport.Logging
import com.krux.starport.util.notification.Notify


/**
* Handles database related logics for handling piplines
*/
Expand All @@ -27,30 +26,33 @@ object ErrorHandler extends Logging with WaitForIt {
*
* @return the SES send ID
*/
def pipelineScheduleFailed(pipeline: Pipeline, allOutput: String)
(implicit conf: StarportSettings, ec: ExecutionContext): String = {
def pipelineScheduleFailed(pipeline: Pipeline, allOutput: String)(implicit
conf: StarportSettings,
ec: ExecutionContext
): String = {

val db = conf.jdbc.db

val pipelineId = pipeline.id.get

def handlePipelineAndNotify(failureCount: Int): Future[String] = {
if (failureCount >= MaxSchedulingFailure) { // deactivate the pipeline if it reaches max # of failures
if (failureCount >= MaxSchedulingFailure) { // deactivate the pipeline if it reaches max # of failures
val deactivatePipelineQuery =
Pipelines().filter(_.id === pipelineId).map(_.isActive).update(false)

db.run(deactivatePipelineQuery).map { _ =>
Notify(
s"[ACTION NEEDED] Pipeline ${pipeline.name} has been deactivated due to scheduling failure",
s"[ACTION NEEDED] (${conf.pipelinePrefix}) Pipeline ${pipeline.name} has been deactivated due to scheduling failure",
allOutput,
pipeline
)
}
} else { // increment the count, and send the notification
} else { // increment the count, and send the notification
val newCount = failureCount + 1
val setScheduleFailureCountQuery = ScheduleFailureCounters()
.insertOrUpdate(
ScheduleFailureCounter(pipelineId,
ScheduleFailureCounter(
pipelineId,
newCount,
DateTimeFunctions.currentTimeUTC().toLocalDateTime()
)
Expand All @@ -59,7 +61,7 @@ object ErrorHandler extends Logging with WaitForIt {
// Set schedule failure count
db.run(setScheduleFailureCountQuery).map { _ =>
Notify(
s"[ACTION NEEDED] Pipeline ${pipeline.name} failed to schedule ($newCount/$MaxSchedulingFailure)",
s"[ACTION NEEDED] (${conf.pipelinePrefix}) Pipeline ${pipeline.name} failed to schedule ($newCount/$MaxSchedulingFailure)",
s"It will be deactivated after the number of schedule failures reach $MaxSchedulingFailure\n\n$allOutput",
pipeline
)
Expand All @@ -68,7 +70,8 @@ object ErrorHandler extends Logging with WaitForIt {
}

val result: Future[String] = for {
failureCounts <- db.run(ScheduleFailureCounters().filter(c => c.pipelineId === pipelineId).take(1).result)
failureCounts <-
db.run(ScheduleFailureCounters().filter(c => c.pipelineId === pipelineId).take(1).result)
failureCount = failureCounts.headOption.map(_.failureCount).getOrElse(0)
notifyReqId <- handlePipelineAndNotify(failureCount)
} yield notifyReqId
Expand All @@ -80,12 +83,13 @@ object ErrorHandler extends Logging with WaitForIt {
/**
* @return the SES send ID
*/
def cleanupActivityFailed(pipeline: Pipeline, stackTrace: Array[StackTraceElement])
(implicit conf: StarportSettings): String = {
def cleanupActivityFailed(pipeline: Pipeline, stackTrace: Array[StackTraceElement])(implicit
conf: StarportSettings
): String = {
val stackTraceMessage = stackTrace.mkString("\n")
logger.warn(s"cleanup activity failed for pipeline ${pipeline.id}, because: $stackTraceMessage")
Notify(
s"[Starport Cleanup Failure] cleanup activity failed for ${pipeline.name}",
s"[Starport Cleanup Failure] (${conf.pipelinePrefix}) cleanup activity failed for ${pipeline.name}",
stackTraceMessage,
pipeline
)
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "7.1.0"
version in ThisBuild := "7.1.1"

0 comments on commit a767b24

Please sign in to comment.