Skip to content

Commit

Permalink
Configurable number of self-describing context entities (close #74)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter authored and colmsnowplow committed Apr 17, 2024
1 parent f46eaf4 commit abb99a0
Show file tree
Hide file tree
Showing 60 changed files with 1,923 additions and 618 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}

- name: Run tests
run: sbt clean test
run: sbt clean scalafmtCheckAll test

docker-release:
if: startsWith(github.ref, 'refs/tags/')
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ Aside from "output" configuration, all fields in the configuration file are opti
}
}
// Configures the limits on how many self-describing entities can be attached to each event.
// The actual number will be randomly chosen for each event.
"contexts": {
"minPerEvent": 0
"maxPerEvent": 10
}
// HTTP output only - set weights for the distributions of request methods.
// Setting a fequency to 0 results in that method not being produced at all
// Note that head requests are currently not implemented and will result in an exception.
Expand Down
5 changes: 2 additions & 3 deletions config/config.example.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
"at": "2022-02-01T01:01:01z"
}
"output": {
"file": {
"uri": "file:///tmp"
}
"type": "File"
"path": "file:///tmp"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,13 @@ object CollectorPayload {
eventPerPayloadMin: Int,
eventPerPayloadMax: Int,
now: Instant,
frequencies: EventFrequencies
frequencies: EventFrequencies,
contexts: Context.ContextsConfig
): Gen[CollectorPayload] =
genWithBody(
eventPerPayloadMin,
eventPerPayloadMax,
Body.genDup(natProb, synProb, natTotal, synTotal, now, frequencies),
Body.genDup(natProb, synProb, natTotal, synTotal, now, frequencies, contexts),
now
)

Expand All @@ -143,9 +144,10 @@ object CollectorPayload {
eventPerPayloadMin: Int,
eventPerPayloadMax: Int,
now: Instant,
frequencies: EventFrequencies
frequencies: EventFrequencies,
contexts: Context.ContextsConfig
): Gen[CollectorPayload] =
genWithBody(eventPerPayloadMin, eventPerPayloadMax, Body.gen(now, frequencies), now)
genWithBody(eventPerPayloadMin, eventPerPayloadMax, Body.gen(now, frequencies, contexts), now)

val IgluUri: SchemaKey =
SchemaKey("com.snowplowanalytics.snowplow", "CollectorPayload", "thrift", SchemaVer.Full(1, 0, 0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
*/
package com.snowplowanalytics.snowplow.eventgen.enrich

import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.{Contexts, UnstructEvent}
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent.UnstructEvent
import com.snowplowanalytics.snowplow.eventgen.collector.CollectorPayload
import com.snowplowanalytics.snowplow.eventgen.protocol.Body
import com.snowplowanalytics.snowplow.eventgen.protocol.{Body, Context}
import com.snowplowanalytics.snowplow.eventgen.protocol.common.Web
import com.snowplowanalytics.snowplow.eventgen.protocol.event._
import com.snowplowanalytics.snowplow.eventgen.protocol.enrichment.Enrichments
import io.circe.Json
import org.scalacheck.Gen

import java.time.Instant
Expand All @@ -44,21 +42,25 @@ object SdkEvent {
case _ => None
}

private def eventFromColPayload(p: CollectorPayload, fallbackEid: UUID, enrichments: Option[Enrichments]): List[Event] =
private def eventFromColPayload(
p: CollectorPayload,
fallbackEid: UUID,
enrichments: Option[Enrichments]
): List[Event] =
p.payload.map { el =>
val evnt = Some(el.e match {
case EventType.Struct => "struct"
case EventType.Unstruct => "unstruct"
case EventType.PageView => "page_view"
case EventType.PagePing => "page_ping"
case EventType.Transaction => "transaction"
case EventType.Struct => "struct"
case EventType.Unstruct => "unstruct"
case EventType.PageView => "page_view"
case EventType.PagePing => "page_ping"
case EventType.Transaction => "transaction"
case EventType.TransactionItem => "transaction_item"
})

val (ue, eName, ueVendor, ueFormat, ueVersion) = el.event match {
case UnstructEventWrapper(event, _) =>
val sk = event.schema
(event.toUnstructEvent, Some(sk.name), Some(sk.vendor), Some(sk.format), Some(sk.version.asString))
(UnstructEvent(Some(event)), Some(sk.name), Some(sk.vendor), Some(sk.format), Some(sk.version.asString))
case _ =>
(UnstructEvent(data = None), evnt, None, None, None)
}
Expand All @@ -83,15 +85,15 @@ object SdkEvent {
case _ => None
}

val defaultEnrichment = enrichments.flatMap(_.defaultEnrichment)
val ipEnrichment = enrichments.flatMap(_.ipEnrichment)
val urlEnrichment = enrichments.flatMap(_.urlEnrichment)
val refererEnrichment = enrichments.flatMap(_.refererEnrichment)
val defaultEnrichment = enrichments.flatMap(_.defaultEnrichment)
val ipEnrichment = enrichments.flatMap(_.ipEnrichment)
val urlEnrichment = enrichments.flatMap(_.urlEnrichment)
val refererEnrichment = enrichments.flatMap(_.refererEnrichment)
val campaignAttributionEnrichment = enrichments.flatMap(_.campaignAttributionEnrichment)
val currencyConversionEnrichment = enrichments.flatMap(_.currencyConversionEnrichment)
val crossDomainEnrichment = enrichments.flatMap(_.crossDomainEnrichment)
val eventFingerprintEnrichment = enrichments.flatMap(_.eventFingerprintEnrichment)
val deprecatedFields = enrichments.flatMap(_.deprecatedFields)
val currencyConversionEnrichment = enrichments.flatMap(_.currencyConversionEnrichment)
val crossDomainEnrichment = enrichments.flatMap(_.crossDomainEnrichment)
val eventFingerprintEnrichment = enrichments.flatMap(_.eventFingerprintEnrichment)
val deprecatedFields = enrichments.flatMap(_.deprecatedFields)

Event(
app_id = el.app.aid,
Expand Down Expand Up @@ -151,7 +153,7 @@ object SdkEvent {
mkt_term = campaignAttributionEnrichment.flatMap(_.mkt_term),
mkt_content = campaignAttributionEnrichment.flatMap(_.mkt_content),
mkt_campaign = campaignAttributionEnrichment.flatMap(_.mkt_campaign),
contexts = el.context.map(_.contexts).getOrElse(Contexts(List.empty[SelfDescribingData[Json]])),
contexts = el.contexts.forSdkEvent,
se_category = structEventOpt.flatMap(_.se_ca),
se_action = structEventOpt.flatMap(_.se_ac),
se_label = structEventOpt.flatMap(_.se_la),
Expand Down Expand Up @@ -221,7 +223,7 @@ object SdkEvent {
dvce_sent_tstamp = el.dt.flatMap(_.dtm),
refr_domain_userid = crossDomainEnrichment.flatMap(_.refr_domain_userid),
refr_dvce_tstamp = crossDomainEnrichment.flatMap(_.refr_dvce_tstamp),
derived_contexts = Contexts(List.empty[SelfDescribingData[Json]]),
derived_contexts = el.derivedContexts.forSdkEvent,
domain_sessionid = el.u.flatMap(_.sid.map(_.toString)),
derived_tstamp = defaultEnrichment.flatMap(_.derived_tstamp),
event_vendor = ueVendor,
Expand All @@ -239,9 +241,12 @@ object SdkEvent {
eventPerPayloadMax: Int,
now: Instant,
frequencies: EventFrequencies,
contexts: Context.ContextsConfig,
generateEnrichments: Boolean = false
): Gen[List[Event]] =
genPair(eventPerPayloadMin, eventPerPayloadMax, now, frequencies, generateEnrichments).map(_._2)
genPair(eventPerPayloadMin, eventPerPayloadMax, now, frequencies, contexts, generateEnrichments).map(
_._2
)

def genPairDup(
natProb: Float,
Expand All @@ -252,6 +257,7 @@ object SdkEvent {
eventPerPayloadMax: Int,
now: Instant,
frequencies: EventFrequencies,
contexts: Context.ContextsConfig,
generateEnrichments: Boolean
): Gen[(CollectorPayload, List[Event])] =
for {
Expand All @@ -263,23 +269,25 @@ object SdkEvent {
eventPerPayloadMin,
eventPerPayloadMax,
now,
frequencies
frequencies,
contexts
)
enrichments <- if (generateEnrichments) Enrichments.gen.map(Some(_)) else Gen.const(None)
eid <- Gen.uuid
eid <- Gen.uuid
} yield (cp, eventFromColPayload(cp, eid, enrichments))

def genPair(
eventPerPayloadMin: Int,
eventPerPayloadMax: Int,
now: Instant,
frequencies: EventFrequencies,
contexts: Context.ContextsConfig,
generateEnrichments: Boolean
): Gen[(CollectorPayload, List[Event])] =
for {
cp <- CollectorPayload.gen(eventPerPayloadMin, eventPerPayloadMax, now, frequencies)
cp <- CollectorPayload.gen(eventPerPayloadMin, eventPerPayloadMax, now, frequencies, contexts)
enrichments <- if (generateEnrichments) Enrichments.gen.map(Some(_)) else Gen.const(None)
eid <- Gen.uuid
eid <- Gen.uuid
} yield (cp, eventFromColPayload(cp, eid, enrichments))

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ package object primitives {

def genStringOpt(prefix: String, len: Int): Gen[Option[String]] = Gen.option(genString(prefix, len))

def strGen(len: Int, g: Gen[Char]): Gen[String] = Gen.chooseNum(1, len).flatMap { x =>
Gen.stringOfN(x, g)
def strGen(minLength: Int, maxLength: Int): Gen[String] = Gen.chooseNum(minLength, maxLength).flatMap { x =>
Gen.stringOfN(x, Gen.alphaNumChar)
}

def genInstant(now: Instant): Gen[Instant] = Gen.chooseNum(0, 10000000).map(m => now.minusMillis(m.toLong))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package com.snowplowanalytics.snowplow.eventgen.protocol

import com.snowplowanalytics.snowplow.eventgen.protocol.Context.ContextsWrapper
import com.snowplowanalytics.snowplow.eventgen.protocol.Context.{ContextsWrapper, DerivedContextsWrapper}
import com.snowplowanalytics.snowplow.eventgen.protocol.event._
import com.snowplowanalytics.snowplow.eventgen.protocol.common._
import io.circe.Json
Expand All @@ -35,7 +35,8 @@ final case class Body(
et: EventTransaction,
u: Option[User],
event: BodyEvent,
context: Option[ContextsWrapper]
contexts: ContextsWrapper,
derivedContexts: DerivedContextsWrapper
) extends Protocol {
override def toProto: List[BasicNameValuePair] =
asKV("e", Some(e)) ++
Expand All @@ -46,7 +47,7 @@ final case class Body(
dev.fold(List.empty[BasicNameValuePair])(_.toProto) ++
tv.toProto ++
u.fold(List.empty[BasicNameValuePair])(_.toProto) ++
context.fold(List.empty[BasicNameValuePair])(_.toProto)
contexts.toProto

def toPayloadElement: Json =
toProto.foldLeft(Map.empty[String, String])((acc, kv) => acc ++ Map(kv.getName -> kv.getValue)).asJson
Expand All @@ -62,17 +63,23 @@ object Body {
natTotal: Int,
synTotal: Int,
now: Instant,
frequencies: EventFrequencies
frequencies: EventFrequencies,
contexts: Context.ContextsConfig
): Gen[Body] =
genWithEt(EventTransaction.genDup(synProb, synTotal), now, frequencies).withPerturb(in =>
genWithEt(EventTransaction.genDup(synProb, synTotal), now, frequencies, contexts).withPerturb(in =>
if (natProb == 0f | natTotal == 0)
in
else if (dupRng.nextInt(10000) < (natProb * 10000))
Seed(dupRng.nextInt(natTotal).toLong)
else
in
);
private def genWithEt(etGen: Gen[EventTransaction], now: Instant, frequencies: EventFrequencies) =
private def genWithEt(
etGen: Gen[EventTransaction],
now: Instant,
frequencies: EventFrequencies,
contexts: Context.ContextsConfig
) =
for {
e <- EventType.gen(frequencies)
app <- Application.gen
Expand All @@ -82,17 +89,19 @@ object Body {
tv <- TrackerVersion.gen
u <- User.genOpt
event <- e match {
case EventType.Struct => StructEvent.gen
case EventType.Unstruct => UnstructEventWrapper.gen(frequencies.unstructEventFrequencies)
case EventType.PageView => PageView.gen
case EventType.PagePing => PagePing.gen
case EventType.Transaction => TransactionEvent.gen
case EventType.TransactionItem => TransactionItemEvent.gen
case EventType.Struct => StructEvent.gen
case EventType.Unstruct => UnstructEventWrapper.gen(now, frequencies.unstructEventFrequencies)
case EventType.PageView => PageView.gen
case EventType.PagePing => PagePing.gen
case EventType.Transaction => TransactionEvent.gen
case EventType.TransactionItem => TransactionItemEvent.gen
}
context <- Context.ContextsWrapper.genOps
} yield Body(e, app, dt, dev, tv, et, u, event, context)
contexts <- Context.ContextsWrapper.gen(now, contexts)
derivedContexts <- Context.DerivedContextsWrapper.gen(now)
} yield Body(e, app, dt, dev, tv, et, u, event, contexts, derivedContexts)

def gen(now: Instant, frequencies: EventFrequencies): Gen[Body] = genWithEt(EventTransaction.gen, now, frequencies)
def gen(now: Instant, frequencies: EventFrequencies, contexts: Context.ContextsConfig): Gen[Body] =
genWithEt(EventTransaction.gen, now, frequencies, contexts)

def encodeValue(value: String) = URLEncoder.encode(value, StandardCharsets.UTF_8.toString)
}
Loading

0 comments on commit abb99a0

Please sign in to comment.