Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
⏭ update to hyperion6 (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
realstraw authored Jul 13, 2020
1 parent 3d2fb6b commit e450896
Show file tree
Hide file tree
Showing 39 changed files with 282 additions and 272 deletions.
30 changes: 15 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
val awsSdkVersion = "1.11.618"
val slickVersion = "3.2.3"
val awsSdkVersion = "1.11.820"
val slickVersion = "3.3.2"

val scalaTestArtifact = "org.scalatest" %% "scalatest" % "3.0.5" % "test"
val scalaTestArtifact = "org.scalatest" %% "scalatest" % "3.2.0" % Test
val slickArtifact = "com.typesafe.slick" %% "slick" % slickVersion
val slickHikaricpArtifact = "com.typesafe.slick" %% "slick-hikaricp" % slickVersion
val scoptArtifact = "com.github.scopt" %% "scopt" % "3.7.0"
val configArtifact = "com.typesafe" % "config" % "1.3.3"
val nscalaTimeArtifact = "com.github.nscala-time" %% "nscala-time" % "2.18.0"
val hyperionArtifact = "com.krux" %% "hyperion" % "5.4.1"
val slf4jApiArtifact = "org.slf4j" % "slf4j-api" % "1.7.12"
val logbackClassicArtifact = "ch.qos.logback" % "logback-classic" % "1.1.7"
val scoptArtifact = "com.github.scopt" %% "scopt" % "3.7.1"
val configArtifact = "com.typesafe" % "config" % "1.4.0"
val nscalaTimeArtifact = "com.github.nscala-time" %% "nscala-time" % "2.24.0"
val hyperionArtifact = "com.krux" %% "hyperion" % "6.0.0"
val slf4jApiArtifact = "org.slf4j" % "slf4j-api" % "1.7.30"
val logbackClassicArtifact = "ch.qos.logback" % "logback-classic" % "1.2.3"
val awsSdkS3 = "com.amazonaws" % "aws-java-sdk-s3" % awsSdkVersion
val awsSdkSES = "com.amazonaws" % "aws-java-sdk-ses" % awsSdkVersion
val awsSdkSSM = "com.amazonaws" % "aws-java-sdk-ssm" % awsSdkVersion
val awsSdkSNS = "com.amazonaws" % "aws-java-sdk-sns" % awsSdkVersion
val awsSdkCloudWatch = "com.amazonaws" % "aws-java-sdk-cloudwatch" % awsSdkVersion
val stubbornArtifact = "com.krux" %% "stubborn" % "1.3.0"
val metricsGraphite = "io.dropwizard.metrics" % "metrics-graphite" % "4.0.2"
val postgreSqlJdbc = "org.postgresql" % "postgresql" % "42.2.4"
val awsLambdaEvents = "com.amazonaws" % "aws-lambda-java-events" % "2.2.1"
val awsLambdaCore = "com.amazonaws" % "aws-lambda-java-core" % "1.2.0"
val stubbornArtifact = "com.krux" %% "stubborn" % "1.4.1"
val metricsGraphite = "io.dropwizard.metrics" % "metrics-graphite" % "4.1.10.1"
val postgreSqlJdbc = "org.postgresql" % "postgresql" % "42.2.14"
val awsLambdaEvents = "com.amazonaws" % "aws-lambda-java-events" % "3.1.0"
val awsLambdaCore = "com.amazonaws" % "aws-lambda-java-core" % "1.2.1"
val cloudwatchMetrics = "io.github.azagniotov" % "dropwizard-metrics-cloudwatch" % "1.0.13"

lazy val commonSettings = Seq(
scalacOptions ++= Seq("-deprecation", "-feature", "-Xlint", "-Xfatal-warnings"),
scalaVersion := "2.12.10",
scalaVersion := "2.12.11",
libraryDependencies += scalaTestArtifact,
organization := "com.krux",
test in assembly := {}, // skip test during assembly
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.3.5
sbt.version=1.3.13
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.krux.starport

import java.time.LocalDateTime

import com.codahale.metrics.MetricRegistry
import com.github.nscala_time.time.Imports._
import slick.jdbc.PostgresProfile.api._

import com.krux.hyperion.client.{AwsClient, AwsClientForId}
import com.krux.starport.db.record.FailedPipeline
import com.krux.starport.db.table.{FailedPipelines, Pipelines, ScheduledPipelines}
Expand Down Expand Up @@ -104,7 +106,7 @@ object CleanupExistingPipelines extends StarportActivity {
healthyPipelines
.groupBy(_.actualStart)
.toSeq
.sortBy(_._1)(Ordering[DateTime].reverse)
.sortBy(_._1)(Ordering[LocalDateTime].reverse)
.drop(pipelineRecord.retention)
.flatMap(_._2)
.foreach { sp =>
Expand All @@ -116,7 +118,7 @@ object CleanupExistingPipelines extends StarportActivity {
val (toBeKeptFailedMap, toBeDeletedFailedMap) = failedPipelines
.groupBy(_.actualStart)
.toSeq
.sortBy(_._1)(Ordering[DateTime].reverse)
.sortBy(_._1)(Ordering[LocalDateTime].reverse)
.splitAt(pipelineRecord.retention)

val toBeKeptFailed = toBeKeptFailedMap.flatMap(_._2)
Expand All @@ -133,7 +135,7 @@ object CleanupExistingPipelines extends StarportActivity {
.size == 0
}
.foreach { sp =>
val failedPipeline = FailedPipeline(sp.awsId, sp.pipelineId, false, DateTime.now)
val failedPipeline = FailedPipeline(sp.awsId, sp.pipelineId, false, currentTimeUTC().toLocalDateTime)
logger.info(s"insert ${failedPipeline.awsId} to failed pipelines")
db.run(DBIO.seq(FailedPipelines() += failedPipeline)).waitForResult
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
package com.krux.starport

import com.github.nscala_time.time.Imports._
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat => JodaDateTimeFormat}
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
import java.time.temporal.ChronoUnit

import slick.jdbc.PostgresProfile.api._

import com.krux.starport.cli.{CleanupUnmanagedOptionParser, CleanupUnmanagedOptions}
import com.krux.starport.db.table.ScheduledPipelines
import com.krux.starport.util.{AwsDataPipeline, PipelineStatus, PipelineState}


object CleanupUnmanagedPipelines extends StarportActivity {
final val AwsDateTimeFormat = "yyyy-MM-dd'T'HH:mm:ss"

def pipelineIdsToDelete(
excludePrefixes: Seq[String],
pipelineState: PipelineState.State,
cutoffDate: DateTime,
cutoffDate: LocalDateTime,
force: Boolean
): Set[String] = {

logger.info(s"Getting list of old ${pipelineState} unmanaged pipelines from AWS to delete...")
val dateTimeFormatter = JodaDateTimeFormat.forPattern(AwsDateTimeFormat)
val dateTimeFormatter = DateTimeFormatter.ofPattern(AwsDateTimeFormat)

def shouldPipelineBeDeleted(pipelineStatus: Option[PipelineStatus]): Boolean = {
val nst = for {
Expand All @@ -33,7 +35,7 @@ object CleanupUnmanagedPipelines extends StarportActivity {
nst.exists { case (n, s, t) =>
(force && n.startsWith(conf.pipelinePrefix) || !excludePrefixes.exists(n.startsWith)) &&
s == pipelineState &&
dateTimeFormatter.parseDateTime(t) < cutoffDate.withTimeAtStartOfDay
LocalDateTime.parse(t, dateTimeFormatter).isBefore(cutoffDate.truncatedTo(ChronoUnit.DAYS))
}
}

Expand Down
10 changes: 4 additions & 6 deletions starport-core/src/main/scala/com/krux/starport/Starport.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.krux.starport

import com.github.nscala_time.time.Imports.{DateTime, DateTimeZone}

import com.krux.hyperion.action.SnsAlarm
import com.krux.hyperion.activity.{JarActivity, MainClass}
import com.krux.hyperion.adt.HString
Expand All @@ -27,7 +25,7 @@ object Starport extends DataPipelineDef with HyperionCli {
val schedulerClass: MainClass = com.krux.starport.StartScheduledPipelines
val cleanupClass: MainClass = com.krux.starport.CleanupExistingPipelines

def currentHour = DateTime.now.withZone(DateTimeZone.UTC).getHourOfDay()
def currentHour = DateTimeFunctions.currentTimeUTC.getHour()

def schedule = Schedule.cron.startTodayAt(currentHour, 0, 0).every(1.hour)

Expand All @@ -38,9 +36,9 @@ object Starport extends DataPipelineDef with HyperionCli {

val ec2 = Ec2Resource()

val scheduledStart = RunnableObject.ScheduledStartTime.format(DateTimeFunctions.DateTimeFormat)
val scheduledEnd = RunnableObject.ScheduledEndTime.format(DateTimeFunctions.DateTimeFormat)
val actualStart = RunnableObject.ActualStartTime.format(DateTimeFunctions.DateTimeFormat)
val scheduledStart = RunnableObject.ScheduledStartTime.format(DateTimeFunctions.DateTimeFormatPattern)
val scheduledEnd = RunnableObject.ScheduledEndTime.format(DateTimeFunctions.DateTimeFormatPattern)
val actualStart = RunnableObject.ActualStartTime.format(DateTimeFunctions.DateTimeFormatPattern)

val schedulerArgs = Seq[HString](
"--start", scheduledStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import slick.jdbc.PostgresProfile.api._

import com.krux.starport.config.StarportSettings
import com.krux.starport.db.table.ScheduledPipelines
import com.krux.starport.db.{DateTimeMapped, WaitForIt}
import com.krux.starport.db.WaitForIt
import com.krux.starport.util.DateTimeFunctions


trait StarportActivity extends DateTimeMapped with WaitForIt with Logging with DateTimeFunctions {
trait StarportActivity extends WaitForIt with Logging with DateTimeFunctions {

// use -Dconf.resource=application.dev.conf for testing
implicit val conf = StarportSettings()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package com.krux.starport

import java.time.LocalDateTime
import java.util.concurrent.{ForkJoinPool, TimeUnit}

import com.codahale.metrics.{Counter, MetricRegistry}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.ExecutionContext.Implicits.global
import com.codahale.metrics.{Counter, MetricRegistry}
import com.github.nscala_time.time.Imports._
import slick.jdbc.PostgresProfile.api._

import com.krux.hyperion.expression.{Duration => HDuration}
import com.krux.starport.cli.{SchedulerOptionParser, SchedulerOptions}
import com.krux.starport.db.record.{Pipeline, ScheduledPipeline, SchedulerMetric}
import com.krux.starport.db.table.{Pipelines, ScheduleFailureCounters, ScheduledPipelines, SchedulerMetrics}
import com.krux.starport.dispatcher.TaskDispatcher
import com.krux.starport.dispatcher.impl.TaskDispatcherImpl
import com.krux.starport.dispatcher.TaskDispatcher
import com.krux.starport.metric.{ConstantValueGauge, MetricSettings, SimpleTimerGauge}
import com.krux.starport.util.{ErrorHandler, S3FileHandler}

Expand All @@ -38,7 +39,7 @@ object StartScheduledPipelines extends StarportActivity {
}
.toMap

def pendingPipelineRecords(scheduledEnd: DateTime): Seq[Pipeline] = {
def pendingPipelineRecords(scheduledEnd: LocalDateTime): Seq[Pipeline] = {
logger.info("Retriving pending pipelines..")

// get all jobs to be scheduled
Expand Down Expand Up @@ -158,7 +159,7 @@ object StartScheduledPipelines extends StarportActivity {
SchedulerMetrics()
.filter(_.startTime === actualStart)
.map(_.endTime)
.update(Option(DateTime.now))
.update(Option(currentTimeUTC().toLocalDateTime))
))
.waitForResult
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.krux.starport.cli

import org.joda.time.DateTime
import java.time.LocalDateTime

import scopt.OptionParser

import com.krux.starport.util.PipelineState
Expand All @@ -24,7 +25,7 @@ object CleanupUnmanagedOptionParser extends Reads {
opt[PipelineState.State]("pipelineState").valueName("<pipelineState>")
.action((x, c) => c.copy(pipelineState = x))

opt[DateTime]("cutoffDate").valueName("<cutoffDate> default value is 2 months before")
opt[LocalDateTime]("cutoffDate").valueName("<cutoffDate> default value is 2 months before")
.action((x, c) => c.copy(cutoffDate = x))

opt[Unit]("dryRun").valueName("<dryRun>")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.krux.starport.cli

import org.joda.time.DateTime
import java.time.LocalDateTime
import java.time.temporal.ChronoUnit

import com.krux.starport.util.PipelineState
import com.krux.starport.util.{PipelineState, DateTimeFunctions}

case class CleanupUnmanagedOptions(
excludePrefixes: Seq[String] = Seq(),
pipelineState: PipelineState.State = PipelineState.FINISHED,
cutoffDate: DateTime = DateTime.now.minusMonths(2).withTimeAtStartOfDay,
cutoffDate: LocalDateTime = DateTimeFunctions.currentTimeUTC().minusMonths(2).toLocalDateTime().truncatedTo(ChronoUnit.DAYS),
force: Boolean = false,
dryRun: Boolean = false
)
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package com.krux.starport.cli

import org.joda.time.DateTime
import java.time.{LocalDateTime, ZonedDateTime, ZoneOffset}

import scopt.Read
import scopt.Read.reads

import com.krux.starport.util.PipelineState

trait Reads {

implicit val dateTimeRead: Read[DateTime] = reads(new DateTime(_))
implicit val dateTimeRead: Read[LocalDateTime] = reads { r =>
ZonedDateTime.parse(r).withZoneSameInstant(ZoneOffset.UTC).toLocalDateTime()
}

implicit val pipelineStateRead: scopt.Read[PipelineState.State] = scopt.Read.reads(PipelineState.withName)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.krux.starport.cli

import org.joda.time.DateTime
import java.time.LocalDateTime

import scopt.OptionParser

object SchedulerOptionParser extends Reads {
Expand All @@ -12,13 +13,13 @@ object SchedulerOptionParser extends Reads {
head(programName)
help("help").text("prints this usage text")

opt[DateTime]("start").valueName("<scheduledStart>")
opt[LocalDateTime]("start").valueName("<scheduledStart>")
.action((x, c) => c.copy(scheduledStart = x))

opt[DateTime]("end").valueName("<scheduledEnd>")
opt[LocalDateTime]("end").valueName("<scheduledEnd>")
.action((x, c) => c.copy(scheduledEnd = x))

opt[DateTime]("actual-start").valueName("<actualStart>")
opt[LocalDateTime]("actual-start").valueName("<actualStart>")
.action((x, c) => c.copy(actualStart = x))

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.krux.starport.cli

import org.joda.time.DateTime
import java.time.LocalDateTime

import com.krux.starport.util.DateTimeFunctions


case class SchedulerOptions(
scheduledStart: DateTime = DateTime.now,
scheduledEnd: DateTime = DateTime.now,
actualStart: DateTime = DateTime.now
scheduledStart: LocalDateTime = DateTimeFunctions.currentTimeUTC().toLocalDateTime,
scheduledEnd: LocalDateTime = DateTimeFunctions.currentTimeUTC().toLocalDateTime,
actualStart: LocalDateTime = DateTimeFunctions.currentTimeUTC().toLocalDateTime
)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.krux.starport.db.record

import org.joda.time.DateTime
import java.time.LocalDateTime

case class FailedPipeline(
awsId: String,
pipelineId: Int,
resolved: Boolean,
checkedTime: DateTime
checkedTime: LocalDateTime
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.krux.starport.db.record

import org.joda.time.DateTime
import java.time.LocalDateTime


case class Pipeline(
Expand All @@ -10,10 +10,10 @@ case class Pipeline(
`class`: String,
isActive: Boolean,
retention: Int,
start: DateTime,
start: LocalDateTime,
period: String, // TODO make this of type period
end: Option[DateTime],
nextRunTime: Option[DateTime],
end: Option[LocalDateTime],
nextRunTime: Option[LocalDateTime],
backfill: Boolean,
owner: Option[String]
)
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.krux.starport.db.record

import org.joda.time.DateTime
import java.time.LocalDateTime


case class ScheduleFailureCounter(
pipelineId: Int,
failureCount: Int,
updatedAt: DateTime
updatedAt: LocalDateTime
)
Loading

0 comments on commit e450896

Please sign in to comment.