Skip to content

Commit

Permalink
Move OpenTelemetry context manager to main client (#2490)
Browse files Browse the repository at this point in the history
That way, the deserialization will be part of the span.
  • Loading branch information
pquentin authored Mar 26, 2024
1 parent 209ea6c commit b9aa69e
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 24 deletions.
32 changes: 30 additions & 2 deletions elasticsearch/_async/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
ListApiResponse,
NodeConfig,
ObjectApiResponse,
OpenTelemetrySpan,
SniffOptions,
TextApiResponse,
)
from elastic_transport.client_utils import DEFAULT, DefaultType

from ..._otel import OpenTelemetry
from ..._version import __versionstr__
from ...compat import warn_stacklevel
from ...exceptions import (
Expand Down Expand Up @@ -244,6 +246,7 @@ def __init__(self, _transport: AsyncTransport) -> None:
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
self._verified_elasticsearch = False
self._otel = OpenTelemetry()

@property
def transport(self) -> AsyncTransport:
Expand All @@ -259,6 +262,32 @@ async def perform_request(
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
with self._otel.span(
method,
endpoint_id=endpoint_id,
path_parts=path_parts or {},
) as otel_span:
response = await self._perform_request(
method,
path,
params=params,
headers=headers,
body=body,
otel_span=otel_span,
)
otel_span.set_elastic_cloud_metadata(response.meta.headers)
return response

async def _perform_request(
self,
method: str,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
otel_span: OpenTelemetrySpan,
) -> ApiResponse[Any]:
if headers:
request_headers = self._headers.copy()
Expand Down Expand Up @@ -294,8 +323,7 @@ def mimetype_header_to_compat(header: str) -> None:
retry_on_status=self._retry_on_status,
retry_on_timeout=self._retry_on_timeout,
client_meta=self._client_meta,
endpoint_id=endpoint_id,
path_parts=path_parts,
otel_span=otel_span,
)

# HEAD with a 404 is returned as a normal response
Expand Down
92 changes: 92 additions & 0 deletions elasticsearch/_otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import contextlib
import os
from typing import TYPE_CHECKING, Generator, Mapping

if TYPE_CHECKING:
from typing import Literal

try:
from opentelemetry import trace

_tracer: trace.Tracer | None = trace.get_tracer("elasticsearch-api")
except ModuleNotFoundError:
_tracer = None

from elastic_transport import OpenTelemetrySpan

# Valid values for the enabled config are 'true' and 'false'. Default is 'true'.
ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"
# Describes how to handle search queries in the request body when assigned to
# a span attribute.
# Valid values are 'omit' and 'raw'.
# Default is 'omit' as 'raw' has security implications.
BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY"
DEFAULT_BODY_STRATEGY = "omit"


class OpenTelemetry:
def __init__(
self,
enabled: bool | None = None,
tracer: trace.Tracer | None = None,
# TODO import Literal at the top-level when dropping Python 3.7
body_strategy: 'Literal["omit", "raw"]' | None = None,
):
if enabled is None:
enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false"
self.tracer = tracer or _tracer
self.enabled = enabled and self.tracer is not None

if body_strategy is not None:
self.body_strategy = body_strategy
else:
self.body_strategy = os.environ.get(
BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY
) # type: ignore[assignment]
assert self.body_strategy in ("omit", "raw")

@contextlib.contextmanager
def span(
self,
method: str,
*,
endpoint_id: str | None,
path_parts: Mapping[str, str],
) -> Generator[OpenTelemetrySpan, None, None]:
if not self.enabled or self.tracer is None:
yield OpenTelemetrySpan(None)
return

span_name = endpoint_id or method
with self.tracer.start_as_current_span(span_name) as otel_span:
otel_span.set_attribute("http.request.method", method)
otel_span.set_attribute("db.system", "elasticsearch")
if endpoint_id is not None:
otel_span.set_attribute("db.operation", endpoint_id)
for key, value in path_parts.items():
otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)

yield OpenTelemetrySpan(
otel_span,
endpoint_id=endpoint_id,
body_strategy=self.body_strategy,
)
32 changes: 30 additions & 2 deletions elasticsearch/_sync/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@
ListApiResponse,
NodeConfig,
ObjectApiResponse,
OpenTelemetrySpan,
SniffOptions,
TextApiResponse,
Transport,
)
from elastic_transport.client_utils import DEFAULT, DefaultType

from ..._otel import OpenTelemetry
from ..._version import __versionstr__
from ...compat import warn_stacklevel
from ...exceptions import (
Expand Down Expand Up @@ -244,6 +246,7 @@ def __init__(self, _transport: Transport) -> None:
self._retry_on_timeout: Union[DefaultType, bool] = DEFAULT
self._retry_on_status: Union[DefaultType, Collection[int]] = DEFAULT
self._verified_elasticsearch = False
self._otel = OpenTelemetry()

@property
def transport(self) -> Transport:
Expand All @@ -259,6 +262,32 @@ def perform_request(
body: Optional[Any] = None,
endpoint_id: Optional[str] = None,
path_parts: Optional[Mapping[str, Any]] = None,
) -> ApiResponse[Any]:
with self._otel.span(
method,
endpoint_id=endpoint_id,
path_parts=path_parts or {},
) as otel_span:
response = self._perform_request(
method,
path,
params=params,
headers=headers,
body=body,
otel_span=otel_span,
)
otel_span.set_elastic_cloud_metadata(response.meta.headers)
return response

def _perform_request(
self,
method: str,
path: str,
*,
params: Optional[Mapping[str, Any]] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Any] = None,
otel_span: OpenTelemetrySpan,
) -> ApiResponse[Any]:
if headers:
request_headers = self._headers.copy()
Expand Down Expand Up @@ -294,8 +323,7 @@ def mimetype_header_to_compat(header: str) -> None:
retry_on_status=self._retry_on_status,
retry_on_timeout=self._retry_on_timeout,
client_meta=self._client_meta,
endpoint_id=endpoint_id,
path_parts=path_parts,
otel_span=otel_span,
)

# HEAD with a 404 is returned as a normal response
Expand Down
31 changes: 11 additions & 20 deletions test_elasticsearch/test_client/test_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.

import pytest
from elastic_transport import OpenTelemetrySpan
from elastic_transport.client_utils import DEFAULT

from elasticsearch import AsyncElasticsearch, Elasticsearch
Expand Down Expand Up @@ -137,13 +138,12 @@ def test_options_passed_to_perform_request(self):
assert call.pop("retry_on_timeout") is DEFAULT
assert call.pop("retry_on_status") is DEFAULT
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
}

# Can be overwritten with .options()
Expand All @@ -157,13 +157,12 @@ def test_options_passed_to_perform_request(self):
calls = client.transport.calls
call = calls[("GET", "/test")][1]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand All @@ -184,13 +183,12 @@ def test_options_passed_to_perform_request(self):
calls = client.transport.calls
call = calls[("GET", "/test")][0]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand All @@ -213,13 +211,12 @@ async def test_options_passed_to_async_perform_request(self):
assert call.pop("retry_on_timeout") is DEFAULT
assert call.pop("retry_on_status") is DEFAULT
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
}

# Can be overwritten with .options()
Expand All @@ -233,13 +230,12 @@ async def test_options_passed_to_async_perform_request(self):
calls = client.transport.calls
call = calls[("GET", "/test")][1]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand All @@ -260,13 +256,12 @@ async def test_options_passed_to_async_perform_request(self):
calls = client.transport.calls
call = calls[("GET", "/test")][0]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand Down Expand Up @@ -397,13 +392,12 @@ def test_options_timeout_parameters(self):
calls = client.transport.calls
call = calls[("GET", "/test")][0]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand All @@ -428,13 +422,12 @@ def test_options_timeout_parameters(self):
calls = client.transport.calls
call = calls[("GET", "/test")][0]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 2,
"max_retries": 3,
"retry_on_status": (400,),
Expand All @@ -454,13 +447,12 @@ def test_options_timeout_parameters(self):
assert call.pop("retry_on_timeout") is DEFAULT
assert call.pop("retry_on_status") is DEFAULT
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
}

client = Elasticsearch(
Expand All @@ -477,13 +469,12 @@ def test_options_timeout_parameters(self):
calls = client.transport.calls
call = calls[("GET", "/test")][0]
assert call.pop("client_meta") is DEFAULT
assert isinstance(call.pop("otel_span"), OpenTelemetrySpan)
assert call == {
"headers": {
"accept": "application/vnd.elasticsearch+json; compatible-with=8",
},
"body": None,
"endpoint_id": "indices.get",
"path_parts": {"index": "test"},
"request_timeout": 1,
"max_retries": 2,
"retry_on_status": (404,),
Expand Down
Loading

0 comments on commit b9aa69e

Please sign in to comment.