Skip to content

Commit

Permalink
Bump common-streams to 0.10.0 (#66)
Browse files Browse the repository at this point in the history
Common streams 0.10.0 brings significant to changes to the Kinesis and
Pubsub sources:

- PubSub source completely re-written to be a wrapper around UnaryPull
  snowplow-incubator/common-streams#101
- Kinesis source is more efficient when the stream is re-sharded
  snowplow-incubator/common-streams#102
- Kinesis source better tuned for larger deployments
  snowplow-incubator/common-streams#99

And improvements to latency metrics:
- Sources should report stream latency of stuck events
  snowplow-incubator/common-streams#104
  • Loading branch information
spenes committed Feb 14, 2025
1 parent 1f47b05 commit 6232329
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 140 deletions.
5 changes: 5 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
# -- Allows bigger instances to more quickly acquire the shard-leases they need to combat latency
"maxLeasesToStealAtOneTimeFactor": 2.0

# -- Configures how to backoff and retry in case of DynamoDB provisioned throughput limits
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis"
"maxBackoff": "1 second"
}
}

"output": {
Expand Down
31 changes: 15 additions & 16 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,21 @@
# -- The number of threads is equal to this factor multiplied by the number of availble cpu cores
"parallelPullFactor": 0.5

# -- How many bytes can be buffered by the loader app before blocking the pubsub client library
# -- from fetching more events.
# -- This is a balance between memory usage vs how efficiently the app can operate. The default value works well.
"bufferMaxBytes": 1000000

# -- For how long the pubsub client library will continue to re-extend the ack deadline of an unprocessed event.
"maxAckExtensionPeriod": "1 hour"

# -- Sets min/max boundaries on the value by which an ack deadline is extended.
# -- The actual value used is guided by runtime statistics collected by the pubsub client library.
"minDurationPerAckExtension": "60 seconds"
"maxDurationPerAckExtension": "600 seconds"

# -- The maximum number of streaming pulls we allow on a single GRPC transport channel before opening another channel.
# -- This advanced setting is only relevant on extremely large VMs, or with a high value of `parallelPullCount`.
"maxPullsPerTransportChannel": 16
# -- Pubsub ack deadlines are extended for this duration when needed.
"durationPerAckExtension": "60 seconds"

# -- Controls when ack deadlines are re-extended, for a message that is close to exceeding its ack deadline.
# -- For example, if `durationPerAckExtension` is `60 seconds` and `minRemainingAckDeadline` is `0.1` then the Source
# -- will wait until there is `6 seconds` left of the remining deadline, before re-extending the message deadline.
"minRemainingAckDeadline": 0.1

# -- How many pubsub messages to pull from the server in a single request.
"maxMessagesPerPull": 1000

# -- Adds an artifical delay between consecutive requests to pubsub for more messages.
# -- Under some circumstances, this was found to slightly alleviate a problem in which pubsub might re-deliver
# -- the same messages multiple times.
"debounceRequests": "100 millis"
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object Environment {
_ <- HealthProbe.resource(config.monitoring.healthProbe.port, appHealth)
_ <- Webhook.resource(config.monitoring.webhook, appInfo, httpClient, appHealth)
badSink <- toSink(config.output.bad.sink).onError(_ => Resource.eval(appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink)))
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics, sourceAndAck))
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries))
cpuParallelism = chooseCpuParallelism(config)
uploadParallelism = chooseUploadParallelism(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,62 @@

package com.snowplowanalytics.snowplow.snowflake

import cats.Functor
import cats.effect.Async
import cats.effect.kernel.Ref
import cats.implicits._
import fs2.Stream

import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics}

import scala.concurrent.duration.{Duration, FiniteDuration}

trait Metrics[F[_]] {
def addGood(count: Int): F[Unit]
def addBad(count: Int): F[Unit]
def setLatencyMillis(latencyMillis: Long): F[Unit]
def setLatency(latency: FiniteDuration): F[Unit]

def report: Stream[F, Nothing]
}

object Metrics {

def build[F[_]: Async](config: Config.Metrics): F[Metrics[F]] =
Ref[F].of(State.empty).map(impl(config, _))
def build[F[_]: Async](config: Config.Metrics, sourceAndAck: SourceAndAck[F]): F[Metrics[F]] =
Ref.ofEffect(State.initialize(sourceAndAck)).map(impl(config, _, sourceAndAck))

private case class State(
good: Int,
bad: Int,
latencyMillis: Long
latency: FiniteDuration
) extends CommonMetrics.State {
def toKVMetrics: List[CommonMetrics.KVMetric] =
List(
KVMetric.CountGood(good),
KVMetric.CountBad(bad),
KVMetric.LatencyMillis(latencyMillis)
KVMetric.Latency(latency)
)
}

private object State {
def empty: State = State(0, 0, 0L)
def initialize[F[_]: Functor](sourceAndAck: SourceAndAck[F]): F[State] =
sourceAndAck.currentStreamLatency.map { latency =>
State(0, 0, latency.getOrElse(Duration.Zero))
}
}

private def impl[F[_]: Async](config: Config.Metrics, ref: Ref[F, State]): Metrics[F] =
new CommonMetrics[F, State](ref, State.empty, config.statsd) with Metrics[F] {
private def impl[F[_]: Async](
config: Config.Metrics,
ref: Ref[F, State],
sourceAndAck: SourceAndAck[F]
): Metrics[F] =
new CommonMetrics[F, State](ref, State.initialize(sourceAndAck), config.statsd) with Metrics[F] {
def addGood(count: Int): F[Unit] =
ref.update(s => s.copy(good = s.good + count))
def addBad(count: Int): F[Unit] =
ref.update(s => s.copy(bad = s.bad + count))
def setLatencyMillis(latencyMillis: Long): F[Unit] =
ref.update(s => s.copy(latencyMillis = s.latencyMillis.max(latencyMillis)))
def setLatency(latency: FiniteDuration): F[Unit] =
ref.update(s => s.copy(latency = s.latency.max(latency)))
}

private object KVMetric {
Expand All @@ -71,9 +82,9 @@ object Metrics {
val metricType = CommonMetrics.MetricType.Count
}

final case class LatencyMillis(v: Long) extends CommonMetrics.KVMetric {
final case class Latency(v: FiniteDuration) extends CommonMetrics.KVMetric {
val key = "latency_millis"
val value = v.toString
val value = v.toMillis.toString
val metricType = CommonMetrics.MetricType.Gauge
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Pr
import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload}
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.sinks.ListOfList
import com.snowplowanalytics.snowplow.snowflake.{Environment, Metrics, RuntimeService}
import com.snowplowanalytics.snowplow.snowflake.{Environment, RuntimeService}
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
import com.snowplowanalytics.snowplow.loaders.transform.{BadRowsSerializer, Transform}
Expand All @@ -40,7 +40,7 @@ object Processing {
private implicit def logger[F[_]: Sync] = Slf4jLogger.getLogger[F]

def stream[F[_]: Async](env: Environment[F]): Stream[F, Nothing] = {
val eventProcessingConfig = EventProcessingConfig(EventProcessingConfig.NoWindowing)
val eventProcessingConfig = EventProcessingConfig(EventProcessingConfig.NoWindowing, env.metrics.setLatency)
Stream.eval(env.tableManager.initializeEventsTable()) *>
Stream.eval(env.channels.head.opened.use_) *>
env.source.stream(eventProcessingConfig, eventProcessor(env))
Expand Down Expand Up @@ -120,31 +120,16 @@ object Processing {
private def eventProcessor[F[_]: Async](env: Environment[F]): EventProcessor[F] = { in =>
val badProcessor = BadRowProcessor(env.appInfo.name, env.appInfo.version)

in.through(setLatency(env.metrics))
.through(parseAndTransform(env, badProcessor))
in.through(parseAndTransform(env, badProcessor))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.through(writeToSnowflake(env, badProcessor))
.through(sendFailedEvents(env, badProcessor))
.through(sendMetrics(env))
.through(emitTokens)
}

private def setLatency[F[_]: Sync](metrics: Metrics[F]): Pipe[F, TokenedEvents, TokenedEvents] =
_.evalTap {
_.earliestSourceTstamp match {
case Some(t) =>
for {
now <- Sync[F].realTime
latencyMillis = now.toMillis - t.toEpochMilli
_ <- metrics.setLatencyMillis(latencyMillis)
} yield ()
case None =>
Applicative[F].unit
}
}

private def parseAndTransform[F[_]: Async](env: Environment[F], badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, TransformedBatch] =
_.parEvalMap(env.cpuParallelism) { case TokenedEvents(chunk, token, _) =>
_.parEvalMap(env.cpuParallelism) { case TokenedEvents(chunk, token) =>
for {
numBytes <- Sync[F].delay(Foldable[Chunk].sumBytes(chunk))
(badRows, events) <- Foldable[Chunk].traverseSeparateUnordered(chunk) { bytes =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object MockEnvironment {
case class WroteRowsToSnowflake(rowCount: Int) extends Action
case class AddedGoodCountMetric(count: Int) extends Action
case class AddedBadCountMetric(count: Int) extends Action
case class SetLatencyMetric(millis: Long) extends Action
case class SetLatencyMetric(latency: FiniteDuration) extends Action
case class BecameUnhealthy(service: RuntimeService) extends Action
case class BecameHealthy(service: RuntimeService) extends Action
}
Expand Down Expand Up @@ -104,7 +104,7 @@ object MockEnvironment {

private def testSourceAndAck(inputs: List[TokenedEvents], state: Ref[IO, Vector[Action]]): SourceAndAck[IO] =
new SourceAndAck[IO] {
def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): Stream[IO, Nothing] =
def stream(config: EventProcessingConfig[IO], processor: EventProcessor[IO]): Stream[IO, Nothing] =
Stream
.emits(inputs)
.through(processor)
Expand All @@ -116,6 +116,9 @@ object MockEnvironment {

override def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] =
IO.pure(SourceAndAck.Healthy)

def currentStreamLatency: IO[Option[FiniteDuration]] =
IO.pure(None)
}

private def testBadSink(mockedResponse: Response[Unit], state: Ref[IO, Vector[Action]]): Sink[IO] =
Expand Down Expand Up @@ -177,8 +180,8 @@ object MockEnvironment {
def addGood(count: Int): IO[Unit] =
ref.update(_ :+ AddedGoodCountMetric(count))

def setLatencyMillis(latencyMillis: Long): IO[Unit] =
ref.update(_ :+ SetLatencyMetric(latencyMillis))
def setLatency(latency: FiniteDuration): IO[Unit] =
ref.update(_ :+ SetLatencyMetric(latency))

def report: Stream[IO, Nothing] = Stream.never[IO]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package com.snowplowanalytics.snowplow.snowflake.processing

import cats.effect.IO
import cats.effect.testing.specs2.CatsEffect
import cats.effect.testkit.TestControl
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.snowflake.{MockEnvironment, RuntimeService}
import com.snowplowanalytics.snowplow.snowflake.MockEnvironment.{Action, Mocks, Response}
Expand All @@ -23,8 +22,6 @@ import org.specs2.Specification

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.time.Instant
import scala.concurrent.duration.DurationLong

class ProcessingSpec extends Specification with CatsEffect {
import ProcessingSpec._
Expand All @@ -38,10 +35,9 @@ class ProcessingSpec extends Specification with CatsEffect {
Emit BadRows when the Channel reports a problem with the data $e5
Abort processing and don't ack events when the Channel reports a runtime error $e6
Reset the Channel when the Channel reports the channel has become invalid $e7
Set the latency metric based off the message timestamp $e8
Mark app as unhealthy when sinking badrows fails $e9
Mark app as unhealthy when writing to the Channel fails with runtime exception $e10
Mark app as unhealthy when writing to the Channel fails SDK internal error exception $e11
Mark app as unhealthy when sinking badrows fails $e8
Mark app as unhealthy when writing to the Channel fails with runtime exception $e9
Mark app as unhealthy when writing to the Channel fails SDK internal error exception $e10
"""

def e1 =
Expand Down Expand Up @@ -83,7 +79,7 @@ class ProcessingSpec extends Specification with CatsEffect {
bads <- inputEvents(count = 3, badlyFormatted)
goods <- inputEvents(count = 3, good)
} yield bads.zip(goods).map { case (bad, good) =>
TokenedEvents(bad.events ++ good.events, good.ack, None)
TokenedEvents(bad.events ++ good.events, good.ack)
}
runTest(toInputs) { case (inputs, control) =>
for {
Expand Down Expand Up @@ -227,39 +223,6 @@ class ProcessingSpec extends Specification with CatsEffect {
}

def e8 = {
val messageTime = Instant.parse("2023-10-24T10:00:00.000Z")
val processTime = Instant.parse("2023-10-24T10:00:42.123Z")

val toInputs = inputEvents(count = 2, good).map {
_.map {
_.copy(earliestSourceTstamp = Some(messageTime))
}
}

val io = runTest(toInputs) { case (inputs, control) =>
for {
_ <- IO.sleep(processTime.toEpochMilli.millis)
_ <- Processing.stream(control.environment).compile.drain
state <- control.state.get
} yield state should beEqualTo(
Vector(
Action.InitEventsTable,
Action.OpenedChannel,
Action.SetLatencyMetric(42123),
Action.SetLatencyMetric(42123),
Action.WroteRowsToSnowflake(4),
Action.AddedGoodCountMetric(4),
Action.AddedBadCountMetric(0),
Action.Checkpointed(List(inputs(0).ack, inputs(1).ack))
)
)
}

TestControl.executeEmbed(io)

}

def e9 = {
val mocks = Mocks.default.copy(
badSinkResponse = Response.ExceptionThrown(new RuntimeException("Some error when sinking bad data"))
)
Expand All @@ -278,7 +241,7 @@ class ProcessingSpec extends Specification with CatsEffect {
}
}

def e10 = {
def e9 = {
val mocks = Mocks.default.copy(
channelResponses = List(Response.ExceptionThrown(new RuntimeException("Some error when writing to the Channel")))
)
Expand All @@ -297,7 +260,7 @@ class ProcessingSpec extends Specification with CatsEffect {
}
}

def e11 = {
def e10 = {
val mocks = Mocks.default.copy(
channelResponses = List(
Response.Success(
Expand Down Expand Up @@ -361,13 +324,13 @@ object ProcessingSpec {
val serialized = Chunk(event1, event2).map { e =>
ByteBuffer.wrap(e.toTsv.getBytes(StandardCharsets.UTF_8))
}
TokenedEvents(serialized, ack, None)
TokenedEvents(serialized, ack)
}

def badlyFormatted: IO[TokenedEvents] =
IO.unique.map { token =>
val serialized = Chunk("nonsense1", "nonsense2").map(s => ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
TokenedEvents(serialized, token, None)
TokenedEvents(serialized, token)
}

// Helper to create a SFException, and remove the stacktrace so we don't clutter our test logs.
Expand Down
Loading

0 comments on commit 6232329

Please sign in to comment.