Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] BlobUploader utilities to enable handling of large data in instrumentation #3122

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions opentelemetry-instrumentation/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ dependencies = [
"packaging >= 18.0",
]

[project.optional-dependencies]
gcs = [
"google-cloud-storage==2.19.0"
]
magic = [
"python-magic==0.4.27"
]

[project.scripts]
opentelemetry-bootstrap = "opentelemetry.instrumentation.bootstrap:run"
opentelemetry-instrument = "opentelemetry.instrumentation.auto_instrumentation:run"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Exposes API methods to callers from the package name."""

from opentelemetry.instrumentation._blobupload.api.blob import Blob
from opentelemetry.instrumentation._blobupload.api.blob_uploader import (
BlobUploader,
)
from opentelemetry.instrumentation._blobupload.api.constants import (
NOT_UPLOADED,
)
from opentelemetry.instrumentation._blobupload.api.content_type import (
detect_content_type,
)
from opentelemetry.instrumentation._blobupload.api.labels import (
generate_labels_for_event,
generate_labels_for_span,
generate_labels_for_span_event,
)
from opentelemetry.instrumentation._blobupload.api.provider import (
BlobUploaderProvider,
get_blob_uploader,
set_blob_uploader_provider,
)

__all__ = [
Blob,
BlobUploader,
NOT_UPLOADED,
detect_content_type,
generate_labels_for_event,
generate_labels_for_span,
generate_labels_for_span_event,
BlobUploaderProvider,
get_blob_uploader,
set_blob_uploader_provider,
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import base64
from typing import Dict, Optional


class Blob(object):
"""Represents an opaque binary object and associated metadata.

This object conteptually has the following properties:

- raw_bytes: the actual data (payload) of the Blob
- content_type: metadata about the content type (e.g. "image/jpeg")
- labels: key/value data that can be used to identify and contextualize
the object such as {"trace_id": "...", "span_id": "...", "filename": ...}
"""

def __init__(
self,
raw_bytes: bytes,
content_type: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
):
"""Initialize the blob with an explicit set of properties.

Args:
raw_bytes: the required payload
content_type: the MIME type describing the type of data in the payload
labels: additional key/value data about the Blob
"""
self._raw_bytes = _raw_bytes
self._content_type = content_type
self._labels = labels or {}

@staticmethod
def from_data_uri(cls, uri: str, labels: Optional[dict] = None) -> "Blob":
"""Instantiate a blob from a 'data:...' URI.

Args:
uri: A URI in the 'data:' format. Supports a subset of 'data:' URIs
that encode the data with the 'base64' extension and that include
a content type. Should work with any normal 'image/jpeg', 'image/png',
'application/pdf', 'audio/aac', and many others. DOES NOT SUPPORT
encoding data as percent-encoded text (no "base64").

labels: Additional key/value data to include in the constructed Blob.
"""
if not uri.startswith("data:"):
raise ValueError(
'Invalid "uri"; expected "data:" prefix. Found: "{}"'.format(
uri
)
)
if ";base64," not in uri:
raise ValueError(
'Invalid "uri"; expected ";base64," section. Found: "{}"'.format(
uri
)
)
data_prefix_len = len("data:")
after_data_prefix = uri[data_prefix_len:]
if ";" not in after_data_prefix:
raise ValueError(
'Invalid "uri"; expected ";" in URI. Found: "{}"'.format(uri)
)
content_type, remaining = after_data_prefix.split(";", 1)
while not remaining.startswith("base64,"):
_, remaining = remaining.split(";", 1)
assert remaining.startswith("base64,")
base64_len = len("base64,")
base64_encoded_content = remaining[base64_len:]
try:
raw_bytes = base64.standard_b64decode(base64_encoded_content)
except ValueError:
raw_bytes = base64.urlsafe_b64decode(base64_encoded_content)
return Blob(raw_bytes, content_type=content_type, labels=labels)

@property
def raw_bytes(self) -> bytes:
"""Returns the raw bytes (payload) of this Blob."""
return self._raw_bytes

@property
def content_type(self) -> Optional[str]:
"""Returns the content type (or None) of this Blob."""
return self._content_type

@property
def labels(self) -> Dict[str, str]:
"""Returns the key/value metadata of this Blob."""
return _frozendict(self._labels)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Defines an interface for performing asynchronous blob uploading."""

import abc

from opentelemetry.instrumentation._blobupload.api.blob import Blob
from opentelemetry.instrumentation._blobupload.api.constants import (
NOT_UPLOADED,
)


class BlobUploader(abc.ABC):
"""Pure abstract base class representing a component that does blob uploading."""

@abc.abstractmethod
def upload_async(self, blob: Blob) -> str:
return NOT_UPLOADED
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Defines contexts that are used by the '_blobupload' package."""

# Special constant used to indicate that a BlobUploader did not upload.
NOT_UPLOADED = '/dev/null'
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Provides utilities for automatic content-type detection."""


# Helper used to handle the possibility of optional 'magic' dependency
# being unavailable for guessing the MIME type of raw bytes.
class _FallBackModule(object):
"""Class that is shaped like the portion of 'magic' we need."""

def from_buffer(self, raw_bytes, mime=True):
"""Fallback, subpar implementation of 'from_buffer'."""
return "application/octet-stream"


# Set up '_module' to either use 'magic' or the fallback.
_module = _FallBackModule()
try:
import magic

_module = magic
except ImportError:
pass


def detect_content_type(raw_bytes: bytes) -> str:
"""Attempts to infer the content type of the specified data."""
return _module.from_buffer(raw_bytes, mime=True)
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Provides utilities for providing basic identifying labels for blobs."""


def generate_labels_for_span(trace_id: str, span_id: str) -> dict:
"""Returns metadata for a span."""
return {"otel_type": "span", "trace_id": trace_id, "span_id": span_id}


def generate_labels_for_event(
trace_id: str, span_id: str, event_name: str
) -> dict:
"""Returns metadata for an event."""
result = generate_labels_for_span(trace_id, span_id)
result.update(
{
"otel_type": "event",
"event_name": event_name,
}
)
return result


def generate_labels_for_span_event(
trace_id: str, span_id: str, event_name: str, event_index: int
) -> dict:
"""Returns metadata for a span event."""
result = generate_labels_for_event(trace_id, span_id, event_name)
result.update(
{
"otel_type": "span_event",
"event_index": event_index,
}
)
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import abc
import logging
from typing import Optional

from opentelemetry.instrumentation._blobupload.api.blob import Blob
from opentelemetry.instrumentation._blobupload.api.blob_uploader import (
BlobUploader,
)

_logger = logging.getLogger(__name__)


class _NoOpBlobUploader(BlobUploader):
"""Implementation of BlobUploader that does nothing."""

def upload_async(self, blob: Blob) -> str:
return NOT_UPLOADED


class BlobUploaderProvider(abc.ABC):
"""Pure abstract base for configuring how to provide a BlobUploader."""

def get_blob_uploader(self, use_case: Optional[str]) -> BlobUploader:
"""Returns a BlobUploader for the specified use case.

Args:
use_case: An optional use case that describes what the uploader is for. This could
name a particular package, class, or instrumentation. It is intended to allow
users to differentiate upload behavior based on the target instrumentation.

Returns:
A BlobUploader that is appropriate for the use case.
"""
return _NoOpBlobUploader()


class _DefaultBlobUploaderProvider(BlobUploaderProvider):
"""Default provider used when none has been configured."""

def get_blob_uploader(self, use_case: Optional[str]) -> BlobUploader:
use_case_formatted = "(None)"
if use_case:
use_case_formatted = use_case
_logger.warning(
"No BlobUploaderProvider configured; returning a no-op for use case {}".format(
use_case_formatted
)
)
return _NoOpBlobUploader()


_blob_uploader_provider = _DefaultBlobUploaderProvider()


def set_blob_uploader_provider(provider: BlobUploaderProvider):
"""Allows configuring the behavior of 'get_blob_uploader."""
global _blob_uploader_provider
_blob_uploader_provider = provider


def get_blob_uploader(use_case: Optional[str] = None) -> BlobUploader:
return _blob_uploader_provider.get_blob_uploader(use_case)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from opentelemetry.instrumentation._blobupload.backend.google.gcs._gcs_impl import GcsBlobUploader
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import io
import uuid

from google.cloud.storage import Client as GcsClient
from google.cloud.storage import Blob as GcsBlob

from opentelemetry.instrumentation._blobupload.api import Blob
from opentelemetry.instrumentation._blobupload.api import BlobUploader
from opentelemetry.instrumentation._blobupload.utils import SimpleBlobUploader
from opentelemetry.instrumentation._blobupload.utils import blob_uploader_from_simple_blob_uploader


def _path_segment_from_labels(labels):
"""Returns a path segment based on blob label metadata.

This aims to return paths like:

'traces/12345/spans/56789'
'traces/12345/spans/56789/events/0'
'traces/12345/spans/56789/events/some.event.name'

...depending on the particular type of signal source.

"""
segments = []
target_segments = [
('traces', 'trace_id', 'unknown'),
('spans', 'span_id', 'unknown'),
('events', 'event_index', None),
]
for segment_prefix, label_key, default_val in target_segments:
label_value = labels.get(label_key) or default_val
if label_value:
segments.append(segment_prefix)
segments.append(label_value)
if ((labels.get('otel_type') in ['event', 'span_event']) and
('events' not in segments)):
event_name = labels.get('event_name') or 'unknown'
segments.append('events')
segments.append(event_name)
return '/'.join(segments)



class _SimpleGcsBlobUploader(SimpleBlobUploader):

def __init__(self, prefix: str, client:Optional[GcsClient]=None):
if not prefix:
raise ValueError('Must supply a non-empty prefix.')
if not prefix.startswith('gs://'):
raise ValueError('Invalid prefix; must start with "gs://"; found: "{}".'.format(prefix))
if not prefix.endswith('/'):
prefix = '{}/'.format(prefix)
self._prefix = prefix
self._client = client or GcsClient()

def generate_destination_uri(self, blob: Blob) -> str:
origin_path = _path_segment_from_labels(blob.labels)
upload_id = uuid.uuid4().hex
return '{}{}/uploads/{}'.format(self._prefix, origin_path, upload_id)

def upload_sync(self, uri: str, blob: Blob):
gcs_blob = GcsBlob.from_string(uri, client=self._client)
gcs_blob.upload_from_file(
io.BytesIO(blob.raw_bytes),
content_type=blob.content_type)
metadata = gcs_blob.metadata or {}
metadata.update(blob.labels)
gcs_blob.metadata = metadata


class GcsBlobUploader(BlobUploader):

def __init__(self, prefix: str, client:Optional[GcsClient]=None):
simple_uploader = _SimpleGcsBlobUploader(prefix, client)
self._delegate = blob_uploader_from_simple_blob_uploader(simple_uploader)

def upload_async(self, blob: Blob) -> str:
return self._delegate.upload_async(blob)
Loading
Loading