Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update OpenTelemetryEcto attributes to conform with semantic conventions 1.27 #430

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 121 additions & 86 deletions instrumentation/opentelemetry_ecto/lib/opentelemetry_ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ defmodule OpentelemetryEcto do

require OpenTelemetry.Tracer

alias OpenTelemetry.SemConv.Incubating.DBAttributes
alias OpenTelemetry.SemConv.Incubating.{DBAttributes, Metrics.DBMetrics}
alias OpenTelemetry.SemConv.{ErrorAttributes, ServerAttributes}

@typedoc """
Option that you can pass to `setup/2`.
"""
@typedoc since: "1.3.0"
@type setup_option() ::
{:time_unit, System.time_unit()}
| {:span_prefix, String.t()}
| {:additional_attributes, %{String.t() => term()}}
| {:db_statement, :enabled | :disabled | (String.t() -> String.t())}
{:additional_attributes, %{String.t() => term()}}
| {:db_query, :enabled | :disabled | (String.t() -> String.t())}

@doc """
Attaches the `OpentelemetryEcto` handler to your repo events.
Expand All @@ -56,18 +55,12 @@ defmodule OpentelemetryEcto do

You may also supply the following options in the second argument:

* `:time_unit` - a time unit used to convert the values of query phase
timings, defaults to `:microsecond`. See `System.convert_time_unit/3`.
* `:span_prefix` - the first part of the span name.
Defaults to the concatenation of the event name with periods, such as
`"blog.repo.query"`. This will always be followed with a colon and the
source (the table name for SQL adapters). For example: `"blog.repo.query:users"`.
* `:additional_attributes` - additional attributes to include in the span. If there
are conflicts with default provided attributes, the ones provided with
this config will have precedence.
* `:db_statement` - `:disabled` (default), `:enabled`, or a function.
* `:db_query` - `:disabled` (default), `:enabled`, or a function.
Whether or not to include DB statements in the **span attributes** (as the
`#{DBAttributes.db_statement()}` attribute).
`#{DBAttributes.db_query_text()}` attribute).
Optionally provide a function that takes a query string and returns a
sanitized version of it. This is useful for removing sensitive information from the
query string. Unless this option is `:enabled` or a function,
Expand All @@ -82,9 +75,9 @@ defmodule OpentelemetryEcto do

@doc false
def handle_event(
event,
_event,
measurements,
%{query: query, source: source, result: query_result, repo: repo, type: type},
%{query: query, source: source, result: query_result, repo: repo},
config
) do
# Doing all this even if the span isn't sampled so the sampler
Expand All @@ -94,55 +87,30 @@ defmodule OpentelemetryEcto do
end_time = :opentelemetry.timestamp()
start_time = end_time - total_time
measurements = Map.put(measurements, :total_time, total_time)
database = repo.config()[:database]
repo_config = Keyword.take(repo.config(), [:database, :hostname, :port])

url =
case repo.config()[:url] do
nil ->
# TODO: add port
URI.to_string(%URI{scheme: "ecto", host: repo.config()[:hostname]})

url ->
url
end

span_prefix =
case Keyword.fetch(config, :span_prefix) do
{:ok, prefix} -> prefix
:error -> Enum.join(event, ".")
end

span_suffix = if source != nil, do: ":#{source}", else: ""
span_name = span_prefix <> span_suffix

time_unit = Keyword.get(config, :time_unit, :microsecond)
additional_attributes = Keyword.get(config, :additional_attributes, %{})

db_type =
case type do
:ecto_sql_query -> :sql
_ -> type
end
db_statement_config = Keyword.get(config, :db_query, :disabled)

# TODO: need connection information to complete the required attributes
# net.peer.name or net.peer.ip and net.peer.port
base_attributes = %{
:source => source,
:"db.instance" => database,
:"db.type" => db_type,
unquote(DBAttributes.db_name()) => database,
:"db.url" => url
}

db_statement_config = Keyword.get(config, :db_statement, :disabled)

attributes =
base_attributes
|> add_measurements(measurements, time_unit)
|> maybe_add_db_statement(db_statement_config, query)
|> maybe_add_db_system(repo.__adapter__())
%{
unquote(DBAttributes.db_system()) => db_system(repo.__adapter__()),
unquote(DBAttributes.db_namespace()) => repo_config[:database],
unquote(ServerAttributes.server_address()) => repo_config[:hostname]
}
|> maybe_add_db_collection_name(source)
|> maybe_add_server_port(repo_config)
|> maybe_add_db_operation_name(repo.__adapter__(), query)
|> maybe_add_error_type(repo.__adapter__(), query_result)
|> maybe_add_db_query_text(db_statement_config, query)
|> add_measurements(measurements)
|> add_additional_attributes(additional_attributes)

span_name = span_name(attributes)

parent_context =
case OpentelemetryProcessPropagator.fetch_ctx(self()) do
:undefined ->
Expand Down Expand Up @@ -187,59 +155,126 @@ defmodule OpentelemetryEcto do

defp format_error(_), do: ""

defp add_measurements(attributes, measurements, time_unit) do
measurements
|> Enum.reduce(attributes, fn
{k, v}, acc
when not is_nil(v) and k in [:total_time, :decode_time, :query_time, :queue_time, :idle_time] ->
Map.put(
acc,
String.to_atom("#{k}_#{time_unit}s"),
System.convert_time_unit(v, :native, time_unit)
)

_, acc ->
acc
end)
@db_systems [
{Ecto.Adapters.Postgres, DBAttributes.db_system_values().postgresql},
{Ecto.Adapters.MyXQL, DBAttributes.db_system_values().mysql},
{Ecto.Adapters.SQLite3, DBAttributes.db_system_values().sqlite},
{Ecto.Adapters.Tds, DBAttributes.db_system_values().mssql}
]

for {adapter, system} <- @db_systems do
defp db_system(unquote(adapter)), do: unquote(system)
end

defp maybe_add_db_statement(attributes, :enabled, query) do
Map.put(attributes, unquote(DBAttributes.db_statement()), query)
# NOTE: This is the catch-all clause where we use other_sql as the db.system value, but it may not be a SQL based database.
defp db_system(_), do: unquote(DBAttributes.db_system_values().other_sql)

defp maybe_add_db_collection_name(attributes, nil), do: attributes

defp maybe_add_db_collection_name(attributes, source) do
Map.put(attributes, unquote(DBAttributes.db_collection_name()), source)
end

defp maybe_add_db_statement(attributes, :disabled, _query) do
attributes
defp maybe_add_server_port(attributes, repo_config) do
case Keyword.has_key?(repo_config, :port) do
false -> attributes
true -> Map.put(attributes, unquote(ServerAttributes.server_port()), repo_config[:port])
end
end

defp maybe_add_db_statement(attributes, sanitizer, query) when is_function(sanitizer, 1) do
Map.put(attributes, unquote(DBAttributes.db_statement()), sanitizer.(query))
defp maybe_add_db_operation_name(attributes, adapter, query) do
case get_db_operation_name(adapter, query) do
nil -> attributes
operation_name -> Map.put(attributes, unquote(DBAttributes.db_operation_name()), operation_name)
end
end

defp maybe_add_db_statement(attributes, _, _query) do
attributes
# NOTE: Postgres does set a `:command` attribute on the result, but since there is no command for the
# error struct we will parse it all the same here.
defp get_db_operation_name(Ecto.Adapters.Postgres, query), do: sql_command(query)
defp get_db_operation_name(Ecto.Adapters.MyXQL, query), do: sql_command(query)
defp get_db_operation_name(Ecto.Adapters.SQLite3, query), do: sql_command(query)
defp get_db_operation_name(Ecto.Adapters.Tds, query), do: sql_command(query)
defp get_db_operation_name(_, _), do: nil

defp sql_command(query) when is_binary(query) do
query
|> String.split(" ", trim: true)
|> sql_command()
end

@sql_commands ~w(select insert update delete begin commit)

defp sql_command([raw_command | _rest]) do
case String.downcase(raw_command) do
command when command in @sql_commands -> raw_command
_ -> nil
end
end

defp maybe_add_error_type(attributes, _adapter, {:ok, _}), do: attributes

defp maybe_add_error_type(attributes, adapter, {:error, error}) do
case get_error_type(adapter, error) do
nil -> attributes
error_type -> Map.put(attributes, unquote(ErrorAttributes.error_type()), error_type)
end
end

defp maybe_add_db_system(attributes, Ecto.Adapters.Postgres) do
Map.put(attributes, unquote(DBAttributes.db_system()), :postgresql)
defp get_error_type(Ecto.Adapters.Postgres, %{postgres: %{code: code}}), do: code
defp get_error_type(Ecto.Adapters.MyXQL, %{postgres: %{name: name}}), do: name
# NOTE: Exqlite.Error does not have an error type
# TODO: Normalize error type from the error message?
defp get_error_type(Ecto.Adapters.SQLite3, _), do: nil
defp get_error_type(Ecto.Adapters.Tds, %{mssql: %{number: number}}), do: number
defp get_error_type(_adapter, _), do: nil

@measurements [
idle_time: DBMetrics.db_client_connection_create_time(),
total_time: DBMetrics.db_client_operation_duration(),
queue_time: DBMetrics.db_client_connection_wait_time(),
query_time: DBMetrics.db_client_connection_use_time()
]

defp add_measurements(attributes, measurements) do
Enum.reduce(@measurements, attributes, fn {telemetry_key, attribute_key}, attributes ->
case Map.get(measurements, telemetry_key) do
nil -> attributes
value -> Map.put(attributes, attribute_key, System.convert_time_unit(value, :native, :microsecond) / 1_000_000)
end
end)
end

defp maybe_add_db_system(attributes, Ecto.Adapters.MyXQL) do
Map.put(attributes, unquote(DBAttributes.db_system()), :mysql)
defp maybe_add_db_query_text(attributes, :enabled, query) do
Map.put(attributes, unquote(DBAttributes.db_query_text()), query)
end

defp maybe_add_db_system(attributes, Ecto.Adapters.SQLite3) do
Map.put(attributes, unquote(DBAttributes.db_system()), :sqlite)
defp maybe_add_db_query_text(attributes, :disabled, _query) do
attributes
end

defp maybe_add_db_system(attributes, Ecto.Adapters.Tds) do
Map.put(attributes, unquote(DBAttributes.db_system()), :mssql)
defp maybe_add_db_query_text(attributes, sanitizer, query) when is_function(sanitizer, 1) do
Map.put(attributes, unquote(DBAttributes.db_query_text()), sanitizer.(query))
end

defp maybe_add_db_system(attributes, _) do
defp maybe_add_db_query_text(attributes, _, _query) do
attributes
end

defp add_additional_attributes(attributes, additional_attributes) do
Map.merge(attributes, additional_attributes)
end

# SHOULD be `{db.operation.name} {target}` if there is a (low-cardinality) {db.operation.name} available.
defp span_name(%{unquote(DBAttributes.db_operation_name()) => db_operation_name} = attributes),
do: "#{db_operation_name} #{target(attributes)}"

# If there is no (low-cardinality) `db.operation.name` available, database span names SHOULD be `{target}`.
defp span_name(attributes), do: target(attributes)

# `db.collection.name` SHOULD be used for data manipulation operations or operations on database collections.
defp target(%{unquote(DBAttributes.db_collection_name()) => db_collection_name}), do: db_collection_name

# `db.namespace` SHOULD be used for operations on a specific database namespace.
defp target(%{unquote(DBAttributes.db_namespace()) => db_namespace}), do: db_namespace
end
Loading
Loading