Skip to content

Commit

Permalink
Rename ktor tracing to ktor telemetry (#12855)
Browse files Browse the repository at this point in the history
  • Loading branch information
trask authored Dec 17, 2024
1 parent b08d272 commit 9865c17
Show file tree
Hide file tree
Showing 64 changed files with 1,297 additions and 99 deletions.
9 changes: 5 additions & 4 deletions instrumentation/ktor/ktor-1.0/library/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Library Instrumentation for Ktor version 1.x

This package contains libraries to help instrument Ktor. Currently, only server instrumentation is supported.
This package contains libraries to help instrument Ktor.
Currently, only server instrumentation is supported.

## Quickstart

Expand Down Expand Up @@ -29,14 +30,14 @@ implementation("io.opentelemetry.instrumentation:opentelemetry-ktor-1.0:OPENTELE

## Usage

Initialize instrumentation by installing the `KtorServerTracing` feature. You must set the `OpenTelemetry` to use with
the feature.
Initialize instrumentation by installing the `KtorServerTelemetry` feature.
You must set the `OpenTelemetry` to use with the feature.

```kotlin
OpenTelemetry openTelemetry = ...

embeddedServer(Netty, 8080) {
install(KtorServerTracing) {
install(KtorServerTelemetry) {
setOpenTelemetry(openTelemetry)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.ktor.v1_0

import io.ktor.application.*
import io.ktor.request.*
import io.ktor.response.*
import io.ktor.routing.*
import io.ktor.util.*
import io.ktor.util.pipeline.*
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.api.incubator.builder.internal.DefaultHttpServerInstrumenterBuilder
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor
import io.opentelemetry.instrumentation.api.instrumenter.SpanStatusBuilder
import io.opentelemetry.instrumentation.api.instrumenter.SpanStatusExtractor
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource
import kotlinx.coroutines.withContext

class KtorServerTelemetry private constructor(
private val instrumenter: Instrumenter<ApplicationRequest, ApplicationResponse>,
) {

class Configuration {
internal lateinit var builder: DefaultHttpServerInstrumenterBuilder<ApplicationRequest, ApplicationResponse>

internal var spanKindExtractor:
(SpanKindExtractor<ApplicationRequest>) -> SpanKindExtractor<ApplicationRequest> = { a -> a }

fun setOpenTelemetry(openTelemetry: OpenTelemetry) {
this.builder =
DefaultHttpServerInstrumenterBuilder.create(
INSTRUMENTATION_NAME,
openTelemetry,
KtorHttpServerAttributesGetter.INSTANCE
)
}

fun setStatusExtractor(
extractor: (SpanStatusExtractor<in ApplicationRequest, in ApplicationResponse>) -> SpanStatusExtractor<in ApplicationRequest, in ApplicationResponse>
) {
builder.setStatusExtractor { prevExtractor ->
SpanStatusExtractor { spanStatusBuilder: SpanStatusBuilder,
request: ApplicationRequest,
response: ApplicationResponse?,
throwable: Throwable? ->
extractor(prevExtractor).extract(spanStatusBuilder, request, response, throwable)
}
}
}

fun setSpanKindExtractor(extractor: (SpanKindExtractor<ApplicationRequest>) -> SpanKindExtractor<ApplicationRequest>) {
this.spanKindExtractor = extractor
}

fun addAttributesExtractor(extractor: AttributesExtractor<in ApplicationRequest, in ApplicationResponse>) {
builder.addAttributesExtractor(extractor)
}

fun setCapturedRequestHeaders(requestHeaders: List<String>) {
builder.setCapturedRequestHeaders(requestHeaders)
}

fun setCapturedResponseHeaders(responseHeaders: List<String>) {
builder.setCapturedResponseHeaders(responseHeaders)
}

fun setKnownMethods(knownMethods: Set<String>) {
builder.setKnownMethods(knownMethods)
}

internal fun isOpenTelemetryInitialized(): Boolean = this::builder.isInitialized
}

private fun start(call: ApplicationCall): Context? {
val parentContext = Context.current()
if (!instrumenter.shouldStart(parentContext, call.request)) {
return null
}

return instrumenter.start(parentContext, call.request)
}

private fun end(context: Context, call: ApplicationCall, error: Throwable?) {
instrumenter.end(context, call.request, call.response, error)
}

companion object Feature : ApplicationFeature<Application, Configuration, KtorServerTelemetry> {
private const val INSTRUMENTATION_NAME = "io.opentelemetry.ktor-1.0"

private val contextKey = AttributeKey<Context>("OpenTelemetry")
private val errorKey = AttributeKey<Throwable>("OpenTelemetryException")

override val key: AttributeKey<KtorServerTelemetry> = AttributeKey("OpenTelemetry")

override fun install(pipeline: Application, configure: Configuration.() -> Unit): KtorServerTelemetry {
val configuration = Configuration().apply(configure)

if (!configuration.isOpenTelemetryInitialized()) {
throw IllegalArgumentException("OpenTelemetry must be set")
}

val instrumenter = InstrumenterUtil.buildUpstreamInstrumenter(
configuration.builder.instrumenterBuilder(),
ApplicationRequestGetter,
configuration.spanKindExtractor(SpanKindExtractor.alwaysServer())
)

val feature = KtorServerTelemetry(instrumenter)

val startPhase = PipelinePhase("OpenTelemetry")
pipeline.insertPhaseBefore(ApplicationCallPipeline.Monitoring, startPhase)
pipeline.intercept(startPhase) {
val context = feature.start(call)

if (context != null) {
call.attributes.put(contextKey, context)
withContext(context.asContextElement()) {
try {
proceed()
} catch (err: Throwable) {
// Stash error for reporting later since need ktor to finish setting up the response
call.attributes.put(errorKey, err)
throw err
}
}
} else {
proceed()
}
}

val postSendPhase = PipelinePhase("OpenTelemetryPostSend")
pipeline.sendPipeline.insertPhaseAfter(ApplicationSendPipeline.After, postSendPhase)
pipeline.sendPipeline.intercept(postSendPhase) {
val context = call.attributes.getOrNull(contextKey)
if (context != null) {
var error: Throwable? = call.attributes.getOrNull(errorKey)
try {
proceed()
} catch (t: Throwable) {
error = t
throw t
} finally {
feature.end(context, call, error)
}
} else {
proceed()
}
}

pipeline.environment.monitor.subscribe(Routing.RoutingCallStarted) { call ->
val context = call.attributes.getOrNull(contextKey)
if (context != null) {
HttpServerRoute.update(context, HttpServerRouteSource.SERVER, { _, arg -> arg.route.parent.toString() }, call)
}
}

return feature
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource
import kotlinx.coroutines.withContext

@Deprecated("Use KtorServerTelemetry instead", ReplaceWith("KtorServerTelemetry"))
class KtorServerTracing private constructor(
private val instrumenter: Instrumenter<ApplicationRequest, ApplicationResponse>,
) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.ktor.v1_0

import io.ktor.application.*
import io.ktor.http.*
import io.ktor.request.*
import io.ktor.response.*
import io.ktor.routing.*
import io.ktor.server.engine.*
import io.ktor.server.netty.*
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.context.Context
import io.opentelemetry.extension.kotlin.asContextElement
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint
import io.opentelemetry.semconv.ServerAttributes
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.extension.RegisterExtension
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit

class KtorHttpServerOldTest : AbstractHttpServerTest<ApplicationEngine>() {

companion object {
@JvmStatic
@RegisterExtension
val testing = HttpServerInstrumentationExtension.forLibrary()
}

override fun setupServer(): ApplicationEngine {
return embeddedServer(Netty, port = port) {
KtorOldTestUtil.installOpenTelemetry(this, testing.openTelemetry)

routing {
get(ServerEndpoint.SUCCESS.path) {
controller(ServerEndpoint.SUCCESS) {
call.respondText(ServerEndpoint.SUCCESS.body, status = HttpStatusCode.fromValue(ServerEndpoint.SUCCESS.status))
}
}

get(ServerEndpoint.REDIRECT.path) {
controller(ServerEndpoint.REDIRECT) {
call.respondRedirect(ServerEndpoint.REDIRECT.body)
}
}

get(ServerEndpoint.ERROR.path) {
controller(ServerEndpoint.ERROR) {
call.respondText(ServerEndpoint.ERROR.body, status = HttpStatusCode.fromValue(ServerEndpoint.ERROR.status))
}
}

get(ServerEndpoint.EXCEPTION.path) {
controller(ServerEndpoint.EXCEPTION) {
throw IllegalStateException(ServerEndpoint.EXCEPTION.body)
}
}

get("/query") {
controller(ServerEndpoint.QUERY_PARAM) {
call.respondText("some=${call.request.queryParameters["some"]}", status = HttpStatusCode.fromValue(ServerEndpoint.QUERY_PARAM.status))
}
}

get("/path/{id}/param") {
controller(ServerEndpoint.PATH_PARAM) {
call.respondText(
call.parameters["id"]
?: "",
status = HttpStatusCode.fromValue(ServerEndpoint.PATH_PARAM.status),
)
}
}

get("/child") {
controller(ServerEndpoint.INDEXED_CHILD) {
ServerEndpoint.INDEXED_CHILD.collectSpanAttributes { call.request.queryParameters[it] }
call.respondText(ServerEndpoint.INDEXED_CHILD.body, status = HttpStatusCode.fromValue(ServerEndpoint.INDEXED_CHILD.status))
}
}

get("/captureHeaders") {
controller(ServerEndpoint.CAPTURE_HEADERS) {
call.response.header("X-Test-Response", call.request.header("X-Test-Request") ?: "")
call.respondText(ServerEndpoint.CAPTURE_HEADERS.body, status = HttpStatusCode.fromValue(ServerEndpoint.CAPTURE_HEADERS.status))
}
}
}
}.start()
}

override fun stopServer(server: ApplicationEngine) {
server.stop(0, 10, TimeUnit.SECONDS)
}

// Copy in HttpServerTest.controller but make it a suspending function
private suspend fun controller(endpoint: ServerEndpoint, wrapped: suspend () -> Unit) {
assert(Span.current().spanContext.isValid, { "Controller should have a parent span. " })
if (endpoint == ServerEndpoint.NOT_FOUND) {
wrapped()
}
val span = testing.openTelemetry.getTracer("test").spanBuilder("controller").setSpanKind(SpanKind.INTERNAL).startSpan()
try {
withContext(Context.current().with(span).asContextElement()) {
wrapped()
}
span.end()
} catch (e: Exception) {
span.setStatus(StatusCode.ERROR)
span.recordException(if (e is ExecutionException) e.cause ?: e else e)
span.end()
throw e
}
}

override fun configure(options: HttpServerTestOptions) {
options.setTestPathParam(true)

options.setHttpAttributes {
HttpServerTestOptions.DEFAULT_HTTP_ATTRIBUTES - ServerAttributes.SERVER_PORT
}

options.setExpectedHttpRoute { endpoint, method ->
when (endpoint) {
ServerEndpoint.PATH_PARAM -> "/path/{id}/param"
else -> expectedHttpRoute(endpoint, method)
}
}
// ktor does not have a controller lifecycle so the server span ends immediately when the
// response is sent, which is before the controller span finishes.
options.setVerifyServerSpanEndTime(false)

options.setResponseCodeOnNonStandardHttpMethod(404)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.ktor.v1_0

import io.ktor.application.*
import io.opentelemetry.api.OpenTelemetry
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest

class KtorOldTestUtil {
companion object {
fun installOpenTelemetry(application: Application, openTelemetry: OpenTelemetry) {
application.install(KtorServerTracing) {
setOpenTelemetry(openTelemetry)
setCapturedRequestHeaders(listOf(AbstractHttpServerTest.TEST_REQUEST_HEADER))
setCapturedResponseHeaders(listOf(AbstractHttpServerTest.TEST_RESPONSE_HEADER))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class KtorServerSpanKindExtractorTest : AbstractHttpServerUsingTest<ApplicationE

override fun setupServer(): ApplicationEngine {
return embeddedServer(Netty, port = port) {
install(KtorServerTracing) {
install(KtorServerTelemetry) {
setOpenTelemetry(testing.openTelemetry)
setSpanKindExtractor {
SpanKindExtractor { req ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTes
class KtorTestUtil {
companion object {
fun installOpenTelemetry(application: Application, openTelemetry: OpenTelemetry) {
application.install(KtorServerTracing) {
application.install(KtorServerTelemetry) {
setOpenTelemetry(openTelemetry)
setCapturedRequestHeaders(listOf(AbstractHttpServerTest.TEST_REQUEST_HEADER))
setCapturedResponseHeaders(listOf(AbstractHttpServerTest.TEST_RESPONSE_HEADER))
Expand Down
Loading

0 comments on commit 9865c17

Please sign in to comment.