Skip to content

Commit

Permalink
Switch to semantic convention names
Browse files Browse the repository at this point in the history
  • Loading branch information
danschultzer committed Dec 14, 2024
1 parent b8062a0 commit 519f936
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 72 deletions.
1 change: 1 addition & 0 deletions instrumentation/opentelemetry_ecto/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ config :opentelemetry_ecto,

config :opentelemetry_ecto, OpentelemetryEcto.TestRepo,
hostname: "localhost",
port: 5432,
username: "postgres",
password: "postgres",
database: "opentelemetry_ecto_test",
Expand Down
77 changes: 28 additions & 49 deletions instrumentation/opentelemetry_ecto/lib/opentelemetry_ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ defmodule OpentelemetryEcto do

require OpenTelemetry.Tracer

alias OpenTelemetry.SemConv.Incubating.DBAttributes
alias OpenTelemetry.SemConv.Incubating.{DBAttributes, Metrics.DBMetrics}
alias OpenTelemetry.SemConv.ServerAttributes

@typedoc """
Option that you can pass to `setup/2`.
"""
@typedoc since: "1.3.0"
@type setup_option() ::
{:time_unit, System.time_unit()}
| {:span_prefix, String.t()}
{:span_prefix, String.t()}
| {:additional_attributes, %{String.t() => term()}}
| {:db_statement, :enabled | :disabled | (String.t() -> String.t())}
| {:db_query, :enabled | :disabled | (String.t() -> String.t())}

@doc """
Attaches the `OpentelemetryEcto` handler to your repo events.
Expand All @@ -56,18 +56,16 @@ defmodule OpentelemetryEcto do
You may also supply the following options in the second argument:
* `:time_unit` - a time unit used to convert the values of query phase
timings, defaults to `:microsecond`. See `System.convert_time_unit/3`.
* `:span_prefix` - the first part of the span name.
Defaults to the concatenation of the event name with periods, such as
`"blog.repo.query"`. This will always be followed with a colon and the
source (the table name for SQL adapters). For example: `"blog.repo.query:users"`.
* `:additional_attributes` - additional attributes to include in the span. If there
are conflicts with default provided attributes, the ones provided with
this config will have precedence.
* `:db_statement` - `:disabled` (default), `:enabled`, or a function.
* `:db_query` - `:disabled` (default), `:enabled`, or a function.
Whether or not to include DB statements in the **span attributes** (as the
`#{DBAttributes.db_statement()}` attribute).
`#{DBAttributes.db_query_text()}` attribute).
Optionally provide a function that takes a query string and returns a
sanitized version of it. This is useful for removing sensitive information from the
query string. Unless this option is `:enabled` or a function,
Expand All @@ -84,7 +82,7 @@ defmodule OpentelemetryEcto do
def handle_event(
event,
measurements,
%{query: query, source: source, result: query_result, repo: repo, type: type},
%{query: query, source: source, result: query_result, repo: repo},
config
) do
# Doing all this even if the span isn't sampled so the sampler
Expand All @@ -94,17 +92,7 @@ defmodule OpentelemetryEcto do
end_time = :opentelemetry.timestamp()
start_time = end_time - total_time
measurements = Map.put(measurements, :total_time, total_time)
database = repo.config()[:database]

url =
case repo.config()[:url] do
nil ->
# TODO: add port
URI.to_string(%URI{scheme: "ecto", host: repo.config()[:hostname]})

url ->
url
end
repo_config = Keyword.take(repo.config(), [:database, :hostname, :port])

span_prefix =
case Keyword.fetch(config, :span_prefix) do
Expand All @@ -115,30 +103,21 @@ defmodule OpentelemetryEcto do
span_suffix = if source != nil, do: ":#{source}", else: ""
span_name = span_prefix <> span_suffix

time_unit = Keyword.get(config, :time_unit, :microsecond)
additional_attributes = Keyword.get(config, :additional_attributes, %{})

db_type =
case type do
:ecto_sql_query -> :sql
_ -> type
end

# TODO: need connection information to complete the required attributes
# net.peer.name or net.peer.ip and net.peer.port
base_attributes = %{
:source => source,
:"db.instance" => database,
:"db.type" => db_type,
unquote(DBAttributes.db_name()) => database,
:"db.url" => url
unquote(DBAttributes.db_namespace()) => "#{repo_config[:database]}.#{source}",
unquote(ServerAttributes.server_address()) => repo_config[:hostname],
unquote(ServerAttributes.server_port()) => repo_config[:port]
}

db_statement_config = Keyword.get(config, :db_statement, :disabled)
db_statement_config = Keyword.get(config, :db_query, :disabled)

attributes =
base_attributes
|> add_measurements(measurements, time_unit)
|> add_measurements(measurements)
|> maybe_add_db_statement(db_statement_config, query)
|> maybe_add_db_system(repo.__adapter__())
|> add_additional_attributes(additional_attributes)
Expand Down Expand Up @@ -187,32 +166,32 @@ defmodule OpentelemetryEcto do

defp format_error(_), do: ""

defp add_measurements(attributes, measurements, time_unit) do
measurements
|> Enum.reduce(attributes, fn
{k, v}, acc
when not is_nil(v) and k in [:total_time, :decode_time, :query_time, :queue_time, :idle_time] ->
Map.put(
acc,
String.to_atom("#{k}_#{time_unit}s"),
System.convert_time_unit(v, :native, time_unit)
)

_, acc ->
acc
@measurements [
idle_time: DBMetrics.db_client_connection_create_time(),
total_time: DBMetrics.db_client_operation_duration(),
queue_time: DBMetrics.db_client_connection_wait_time(),
query_time: DBMetrics.db_client_connection_use_time(),
]

defp add_measurements(attributes, measurements) do
Enum.reduce(@measurements, attributes, fn {telemetry_key, attribute_key}, attributes ->
case Map.get(measurements, telemetry_key) do
nil -> attributes
value -> Map.put(attributes, attribute_key, System.convert_time_unit(value, :native, :microsecond) / 1_000_000)
end
end)
end

defp maybe_add_db_statement(attributes, :enabled, query) do
Map.put(attributes, unquote(DBAttributes.db_statement()), query)
Map.put(attributes, unquote(DBAttributes.db_query_text()), query)
end

defp maybe_add_db_statement(attributes, :disabled, _query) do
attributes
end

defp maybe_add_db_statement(attributes, sanitizer, query) when is_function(sanitizer, 1) do
Map.put(attributes, unquote(DBAttributes.db_statement()), sanitizer.(query))
Map.put(attributes, unquote(DBAttributes.db_query_text()), sanitizer.(query))
end

defp maybe_add_db_statement(attributes, _, _query) do
Expand Down
41 changes: 18 additions & 23 deletions instrumentation/opentelemetry_ecto/test/opentelemetry_ecto_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ defmodule OpentelemetryEctoTest do

assert %{
"db.system": :postgresql,
"db.instance": "opentelemetry_ecto_test",
"db.type": :sql,
"db.url": "ecto://localhost",
decode_time_microseconds: _,
query_time_microseconds: _,
queue_time_microseconds: _,
source: "users",
total_time_microseconds: _
"db.namespace": "opentelemetry_ecto_test.users",
"server.address": "localhost",
"server.port": 5432,
"db.client.operation.duration": _,
"db.client.connection.wait_time": _,
"db.client.connection.use_time": _,
} = :otel_attributes.map(attributes)
end

Expand All @@ -66,37 +64,37 @@ defmodule OpentelemetryEctoTest do
Repo.all(User)

assert_receive {:span, span(attributes: attributes)}
assert !Map.has_key?(:otel_attributes.map(attributes), :"db.statement")
assert !Map.has_key?(:otel_attributes.map(attributes), :"db.query.text")
end

test "include unsanitized query when enabled" do
attach_handler(db_statement: :enabled)
attach_handler(db_query: :enabled)
Repo.all(User)

assert_receive {:span, span(attributes: attributes)}

assert %{"db.statement": "SELECT u0.\"id\", u0.\"email\" FROM \"users\" AS u0"} =
assert %{"db.query.text": "SELECT u0.\"id\", u0.\"email\" FROM \"users\" AS u0"} =
:otel_attributes.map(attributes)
end

test "include sanitized query with sanitizer function" do
attach_handler(db_statement: fn str -> String.replace(str, "SELECT", "") end)
attach_handler(db_query: fn str -> String.replace(str, "SELECT", "") end)
Repo.all(User)

assert_receive {:span, span(attributes: attributes)}

assert %{"db.statement": " u0.\"id\", u0.\"email\" FROM \"users\" AS u0"} =
assert %{"db.query.text": " u0.\"id\", u0.\"email\" FROM \"users\" AS u0"} =
:otel_attributes.map(attributes)
end

test "include additional_attributes" do
attach_handler(additional_attributes: %{"config.attribute": "special value", "db.instance": "my_instance"})
attach_handler(additional_attributes: %{"config.attribute": "special value", "db.system": "my_system"})

Repo.all(User)

assert_receive {:span, span(attributes: attributes)}

assert %{"config.attribute": "special value", "db.instance": "my_instance"} =
assert %{"config.attribute": "special value", "db.system": "my_system"} =
:otel_attributes.map(attributes)
end

Expand All @@ -113,14 +111,11 @@ defmodule OpentelemetryEctoTest do

assert %{
"db.system": :postgresql,
"db.instance": "opentelemetry_ecto_test",
"db.type": :sql,
"db.url": "ecto://localhost",
decode_time_milliseconds: _,
query_time_milliseconds: _,
queue_time_milliseconds: _,
source: "posts",
total_time_milliseconds: _
"db.namespace": "opentelemetry_ecto_test.posts",
"server.address": "localhost",
"db.client.operation.duration": _,
"db.client.connection.wait_time": _,
"db.client.connection.use_time": _,
} = :otel_attributes.map(attributes)
end

Expand Down

0 comments on commit 519f936

Please sign in to comment.