Skip to content

Commit

Permalink
Add e2e_latency_millis metric (#69)
Browse files Browse the repository at this point in the history
This PR adds a new metric `e2e_latency_millis` which measures the
difference between an event's `collector_tstamp` and when it was
written to Snowflake.

The metric is emitted to statsd only on minutes in which some
events are written to Snowflake.

The metric measures the worst-case latency for a batch, i.e. latency
for the earliest seen `collector_tstamp`.
  • Loading branch information
spenes committed Feb 19, 2025
1 parent 7da7dfc commit c36c754
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ trait Metrics[F[_]] {
def addGood(count: Int): F[Unit]
def addBad(count: Int): F[Unit]
def setLatency(latency: FiniteDuration): F[Unit]
def setE2ELatency(e2eLatency: FiniteDuration): F[Unit]

def report: Stream[F, Nothing]
}
Expand All @@ -37,20 +38,21 @@ object Metrics {
private case class State(
good: Int,
bad: Int,
latency: FiniteDuration
latency: FiniteDuration,
e2eLatency: Option[FiniteDuration]
) extends CommonMetrics.State {
def toKVMetrics: List[CommonMetrics.KVMetric] =
List(
KVMetric.CountGood(good),
KVMetric.CountBad(bad),
KVMetric.Latency(latency)
)
) ++ e2eLatency.map(KVMetric.E2ELatency(_))
}

private object State {
def initialize[F[_]: Functor](sourceAndAck: SourceAndAck[F]): F[State] =
sourceAndAck.currentStreamLatency.map { latency =>
State(0, 0, latency.getOrElse(Duration.Zero))
State(0, 0, latency.getOrElse(Duration.Zero), None)
}
}

Expand All @@ -66,6 +68,11 @@ object Metrics {
ref.update(s => s.copy(bad = s.bad + count))
def setLatency(latency: FiniteDuration): F[Unit] =
ref.update(s => s.copy(latency = s.latency.max(latency)))
def setE2ELatency(e2eLatency: FiniteDuration): F[Unit] =
ref.update { state =>
val newLatency = state.e2eLatency.fold(e2eLatency)(_.max(e2eLatency))
state.copy(e2eLatency = Some(newLatency))
}
}

private object KVMetric {
Expand All @@ -88,5 +95,11 @@ object Metrics {
val metricType = CommonMetrics.MetricType.Gauge
}

final case class E2ELatency(v: FiniteDuration) extends CommonMetrics.KVMetric {
val key = "e2e_latency_millis"
val value = v.toMillis.toString
val metricType = CommonMetrics.MetricType.Gauge
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import net.snowflake.ingest.utils.{ErrorCode, SFException}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration.DurationLong
import java.nio.charset.StandardCharsets
import java.time.OffsetDateTime
import java.time.Instant

import com.snowplowanalytics.iglu.schemaddl.parquet.Caster
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
Expand Down Expand Up @@ -53,7 +55,8 @@ object Processing {
transformFailures: List[BadRow],
countBytes: Long,
countItems: Int,
token: Unique.Token
token: Unique.Token,
earliestCollectorTstamp: Option[Instant]
)

type EventWithTransform = (Event, Map[String, AnyRef])
Expand All @@ -78,7 +81,8 @@ object Processing {
origBatchBytes: Long,
origBatchCount: Int,
badAccumulated: ListOfList[BadRow],
tokens: Vector[Unique.Token]
tokens: Vector[Unique.Token],
earliestCollectorTstamp: Option[Instant]
)

/**
Expand Down Expand Up @@ -122,11 +126,25 @@ object Processing {
in.through(parseAndTransform(env, badProcessor))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.through(writeToSnowflake(env, badProcessor))
.through(setE2ELatencyMetric(env))
.through(sendFailedEvents(env, badProcessor))
.through(sendMetrics(env))
.through(emitTokens)
}

private def setE2ELatencyMetric[F[_]: Sync](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.evalTap {
_.earliestCollectorTstamp match {
case Some(t) =>
for {
now <- Sync[F].realTime
e2eLatency = now - t.toEpochMilli.millis
_ <- env.metrics.setE2ELatency(e2eLatency)
} yield ()
case None => Sync[F].unit
}
}

private def parseAndTransform[F[_]: Async](env: Environment[F], badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, TransformedBatch] =
_.parEvalMap(env.cpuParallelism) { case TokenedEvents(chunk, token) =>
for {
Expand All @@ -142,7 +160,8 @@ object Processing {
now <- Sync[F].realTimeInstant
loadTstamp = SnowflakeCaster.timestampValue(now)
(transformBad, transformed) <- transformBatch(badProcessor, loadTstamp, events, env.schemasToSkip)
} yield TransformedBatch(transformed, transformBad, badRows, numBytes, chunk.size, token)
earliestCollectorTstamp = events.view.map(_.collector_tstamp).minOption
} yield TransformedBatch(transformed, transformBad, badRows, numBytes, chunk.size, token, earliestCollectorTstamp)
}

private def transformBatch[F[_]: Sync](
Expand Down Expand Up @@ -361,7 +380,8 @@ object Processing {
origBatchBytes = b.origBatchBytes + a.countBytes,
origBatchCount = b.origBatchCount + a.countItems,
badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
tokens = b.tokens :+ a.token
tokens = b.tokens :+ a.token,
chooseEarliestTstamp(a.earliestCollectorTstamp, b.earliestCollectorTstamp)
)

def single(a: TransformedBatch): BatchAfterTransform =
Expand All @@ -370,11 +390,20 @@ object Processing {
a.countBytes,
a.countItems,
ListOfList.ofLists(a.parseFailures, a.transformFailures),
Vector(a.token)
Vector(a.token),
a.earliestCollectorTstamp
)

def weightOf(a: TransformedBatch): Long =
a.countBytes
}

private def chooseEarliestTstamp(o1: Option[Instant], o2: Option[Instant]): Option[Instant] =
(o1, o2)
.mapN { case (t1, t2) =>
if (t1.isBefore(t2)) t1 else t2
}
.orElse(o1)
.orElse(o2)

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object MockEnvironment {
case class AddedGoodCountMetric(count: Int) extends Action
case class AddedBadCountMetric(count: Int) extends Action
case class SetLatencyMetric(latency: FiniteDuration) extends Action
case class SetE2ELatencyMetric(e2eLatency: FiniteDuration) extends Action
case class BecameUnhealthy(service: RuntimeService) extends Action
case class BecameHealthy(service: RuntimeService) extends Action
}
Expand Down Expand Up @@ -183,6 +184,9 @@ object MockEnvironment {
def setLatency(latency: FiniteDuration): IO[Unit] =
ref.update(_ :+ SetLatencyMetric(latency))

def setE2ELatency(e2eLatency: FiniteDuration): IO[Unit] =
ref.update(_ :+ SetE2ELatencyMetric(e2eLatency))

def report: Stream[IO, Nothing] = Stream.never[IO]
}

Expand Down
Loading

0 comments on commit c36c754

Please sign in to comment.