Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make better use of available cpu on larger VMs #57

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
# -- name to use for the events table.
"table": "events"

# -- name to use for the snowflake channel.
# -- Prefix to use for the snowflake channels.
# -- The full name will be suffixed with a number, e.g. `snowplow-1`
# -- The prefix must be unique per loader VM
"channel": "snowplow"

# -- Timeouts used for JDBC operations
Expand Down Expand Up @@ -75,10 +77,17 @@
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
# - Controls how many batches can we send simultaneously over the network to Snowflake.
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"uploadParallelismFactor": 2.5
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
# -- E.g. If there are 4 available processors, and cpuParallelismFactor = 0.75, then we process 3 batches concurrently.
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"cpuParallelismFactor": 0.75

# Retry configuration for Snowflake operation failures
"retries": {

Expand Down
15 changes: 12 additions & 3 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
# -- name to use for the events table.
"table": "events"

# -- name to use for the snowflake channel.
# -- Prefix to use for the snowflake channels.
# -- The full name will be suffixed with a number, e.g. `snowplow-1`
# -- The prefix must be unique per loader VM
"channel": "snowplow"

# -- Timeouts used for JDBC operations
Expand Down Expand Up @@ -92,10 +94,17 @@
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
# - Controls how many batches can we send simultaneously over the network to Snowflake.
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"uploadParallelismFactor": 2.5
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
# -- E.g. If there are 4 available processors, and cpuParallelismFactor = 0.75, then we process 3 batches concurrently.
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"cpuParallelismFactor": 0.75

# Retry configuration for Snowflake operation failures
"retries": {

Expand Down
15 changes: 12 additions & 3 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
# -- name to use for the events table.
"table": "events"

# -- name to use for the snowflake channel.
# -- Prefix to use for the snowflake channels.
# -- The full name will be suffixed with a number, e.g. `snowplow-1`
# -- The prefix must be unique per loader VM
"channel": "snowplow"

# -- Timeouts used for JDBC operations
Expand Down Expand Up @@ -81,10 +83,17 @@
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
"maxDelay": "1 second"

# - How many batches can we send simultaneously over the network to Snowflake.
"uploadConcurrency": 1
# - Controls how many batches can we send simultaneously over the network to Snowflake.
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"uploadParallelismFactor": 2.5
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
# -- E.g. If there are 4 available processors, and cpuParallelismFactor = 0.75, then we process 3 batches concurrently.
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"cpuParallelismFactor": 0.75

# Retry configuration for Snowflake operation failures
"retries": {

Expand Down
3 changes: 2 additions & 1 deletion modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
"batching": {
"maxBytes": 16000000
"maxDelay": "1 second"
"uploadConcurrency": 3
"uploadParallelismFactor": 2.5
}
"cpuParallelismFactor": 0.75
benjben marked this conversation as resolved.
Show resolved Hide resolved

"retries": {
"setupErrors": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ case class Config[+Source, +Sink](
input: Source,
output: Config.Output[Sink],
batching: Config.Batching,
cpuParallelismFactor: BigDecimal,
retries: Config.Retries,
skipSchemas: List[SchemaCriterion],
telemetry: Telemetry.Config,
Expand Down Expand Up @@ -69,7 +70,7 @@ object Config {
case class Batching(
maxBytes: Long,
maxDelay: FiniteDuration,
uploadConcurrency: Int
uploadParallelismFactor: BigDecimal
)

case class Metrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,27 @@ import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManage
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import org.http4s.client.Client

/**
* Resources and runtime-derived configuration needed for processing events
*
* @param cpuParallelism
* The processing Pipe involves several steps, some of which are cpu-intensive. We run
* cpu-intensive steps in parallel, so that on big instances we can take advantage of all cores.
* For each of those cpu-intensive steps, `cpuParallelism` controls the parallelism of that step.
*
* Other params are self-explanatory
*/
case class Environment[F[_]](
appInfo: AppInfo,
source: SourceAndAck[F],
badSink: Sink[F],
httpClient: Client[F],
tableManager: TableManager[F],
channel: Channel.Provider[F],
channels: Vector[Channel.Provider[F]],
metrics: Metrics[F],
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
batching: Config.Batching,
cpuParallelism: Int,
schemasToSkip: List[SchemaCriterion],
badRowMaxSize: Int
)
Expand All @@ -52,19 +63,47 @@ object Environment {
badSink <- toSink(config.output.bad.sink).onError(_ => Resource.eval(appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink)))
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries))
channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, appHealth)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
cpuParallelism = chooseCpuParallelism(config)
uploadParallelism = chooseUploadParallelism(config)
benjben marked this conversation as resolved.
Show resolved Hide resolved
channelProviders <- Vector.range(0, uploadParallelism).traverse { index =>
for {
channelOpener <- Channel.opener(config.output.good, config.retries, appHealth, index)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
} yield channelProvider
}
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
httpClient = httpClient,
tableManager = tableManager,
channel = channelProvider,
metrics = metrics,
appHealth = appHealth,
batching = config.batching,
schemasToSkip = config.skipSchemas,
badRowMaxSize = config.output.bad.maxRecordSize
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
httpClient = httpClient,
tableManager = tableManager,
channels = channelProviders,
metrics = metrics,
appHealth = appHealth,
batching = config.batching,
cpuParallelism = cpuParallelism,
schemasToSkip = config.skipSchemas,
badRowMaxSize = config.output.bad.maxRecordSize
)

/**
* See the description of `cpuParallelism` on the [[Environment]] class
*
* For bigger instances (more cores) we want more parallelism, so that cpu-intensive steps can
* take advantage of all the cores.
*/
private def chooseCpuParallelism(config: Config[Any, Any]): Int =
multiplyByCpuAndRoundUp(config.cpuParallelismFactor)

/**
* For bigger instances (more cores) we produce batches more quickly, and so need higher upload
* parallelism so that uploading does not become bottleneck
*/
private def chooseUploadParallelism(config: Config[Any, Any]): Int =
multiplyByCpuAndRoundUp(config.batching.uploadParallelismFactor)

private def multiplyByCpuAndRoundUp(factor: BigDecimal): Int =
(Runtime.getRuntime.availableProcessors * factor)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
package com.snowplowanalytics.snowplow.snowflake

import cats.effect.{ExitCode, IO, Resource}
import cats.effect.metrics.CpuStarvationWarningMetrics
import io.circe.Decoder
import com.monovore.decline.effect.CommandIOApp
import com.monovore.decline.Opts
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import scala.concurrent.duration.DurationInt

Expand All @@ -28,6 +31,11 @@ abstract class LoaderApp[SourceConfig: Decoder, SinkConfig: Decoder](
override def runtimeConfig =
super.runtimeConfig.copy(cpuStarvationCheckInterval = 10.seconds)

private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]

override def onCpuStarvationWarn(metrics: CpuStarvationWarningMetrics): IO[Unit] =
Logger[IO].debug(s"Cats Effect measured responsiveness in excess of ${metrics.starvationInterval * metrics.starvationThreshold}")

type SinkProvider = SinkConfig => Resource[IO, Sink[IO]]
type SourceProvider = SourceConfig => IO[SourceAndAck[IO]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ object Channel {

def opener[F[_]: Async](
config: Config.Snowflake,
batchingConfig: Config.Batching,
retriesConfig: Config.Retries,
appHealth: AppHealth.Interface[F, Alert, RuntimeService]
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
index: Int
): Resource[F, Opener[F]] =
for {
client <- createClient(config, batchingConfig, retriesConfig, appHealth)
client <- createClient(config, retriesConfig, appHealth)
} yield new Opener[F] {
def open: F[CloseableChannel[F]] = createChannel[F](config, client).map(impl[F])
def open: F[CloseableChannel[F]] = createChannel[F](config, client, index).map(impl[F])
}

def provider[F[_]: Async](
Expand Down Expand Up @@ -177,23 +177,25 @@ object Channel {

private def createChannel[F[_]: Async](
config: Config.Snowflake,
client: SnowflakeStreamingIngestClient
client: SnowflakeStreamingIngestClient,
index: Int
): F[SnowflakeStreamingIngestChannel] = {
val channelName = s"${config.channel}-$index"
val request = OpenChannelRequest
.builder(config.channel)
.builder(channelName)
.setDBName(config.database)
.setSchemaName(config.schema)
.setTableName(config.table)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.setDefaultTimezone(ZoneOffset.UTC)
.build

Logger[F].info(s"Opening channel ${config.channel}") *>
Logger[F].info(s"Opening channel ${channelName}") *>
Async[F].blocking(client.openChannel(request)) <*
Logger[F].info(s"Successfully opened channel ${config.channel}")
Logger[F].info(s"Successfully opened channel ${channelName}")
}

private def channelProperties(config: Config.Snowflake, batchingConfig: Config.Batching): Properties = {
private def channelProperties(config: Config.Snowflake): Properties = {
val props = new Properties()
props.setProperty("user", config.user)
props.setProperty("private_key", config.privateKey)
Expand All @@ -211,14 +213,13 @@ object Channel {
props.setProperty(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, "0")
props.setProperty(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, "0")
props.setProperty(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, Long.MaxValue.toString)
props.setProperty(ParameterProvider.IO_TIME_CPU_RATIO, batchingConfig.uploadConcurrency.toString)
props.setProperty(ParameterProvider.IO_TIME_CPU_RATIO, "0")

props
}

private def createClient[F[_]: Async](
config: Config.Snowflake,
batchingConfig: Config.Batching,
retriesConfig: Config.Retries,
appHealth: AppHealth.Interface[F, Alert, RuntimeService]
): Resource[F, SnowflakeStreamingIngestClient] = {
Expand All @@ -228,7 +229,7 @@ object Channel {
Sync[F].blocking {
SnowflakeStreamingIngestClientFactory
.builder("Snowplow_Streaming")
.setProperties(channelProperties(config, batchingConfig))
.setProperties(channelProperties(config))
// .setParameterOverrides(Map.empty.asJava) // Not needed, as all params can also be set with Properties
.build
} <*
Expand Down
Loading
Loading