diff --git a/README.md b/README.md index 54c7642..c03a329 100644 --- a/README.md +++ b/README.md @@ -88,3 +88,67 @@ logger = logging.getLogger("my-logger") logger.addHandler(handler) logger.error(...) ``` + +Adding extra callable tags +-------------------------- + +Having a prior definition of: +```python +import logging +import logging_loki +from multiprocessing import Queue +from myapp.tracing import tracer + +get_context = lambda: tracer.active_span.context +add_trace_id = lambda: hex(get_context().trace_id)[ + 2:] if tracer is not None and tracer.active_span is not None else None +add_span_id = lambda: hex(get_context().span_id)[2:] if tracer is not None and tracer.active_span else None +``` + +If you want to add extra span IDs or trace IDs do the following: + +```python +handler = logging_loki.LokiQueueHandler( + Queue(-1), + url="https://my-loki-instance/loki/api/v1/push", + tags={"application": "my-app", 'span_id': add_span_id, 'trace_id': add_trace_id}, + auth=("username", "password"), + version="1" +) +logger = logging.getLogger("my-logger") +logger.addHandler(handler) +logger.error(...) +``` + +Basically if your callable returns a non-None value, it will be added as a tag. No casting to string will be made. + +You can use also the blocking approach of: + +```python +handler = logging_loki.LokiHandler( + url="https://my-loki-instance/loki/api/v1/push", + tags={"application": "my-app", "trace_id": add_trace_id, "span_id": add_span_id}, + auth=("username", "password"), + version="1", +) + +logger = logging.getLogger("my-logger") +logger.addHandler(handler) +logger.error( + "Something happened", + extra={"tags": {"service": "my-service"}}, +) +``` + +Note that Loki version "0" will not support callable tags. + + +Supplying extra tags +-------------------- + +If you want to supply extra tags, you can do it twofold: + +```python +logger.error('Something happened', extra={'test': 4}) +logger.error('Something happened', extra={'tags': {'test': 4}}) +``` \ No newline at end of file diff --git a/logging_loki/emitter.py b/logging_loki/emitter.py index 949ceea..deab402 100644 --- a/logging_loki/emitter.py +++ b/logging_loki/emitter.py @@ -20,6 +20,9 @@ BasicAuth = Optional[Tuple[str, str]] +KEYS_TO_SKIP = {'severity', 'logger', 'msg', 'message', 'tags', 'lineno'} + + class LokiEmitter(abc.ABC): """Base Loki emitter class.""" @@ -30,6 +33,17 @@ class LokiEmitter(abc.ABC): label_replace_with = const.label_replace_with session_class = requests.Session + @staticmethod + def get_entry_labels(record: logging.LogRecord, line: int) -> dict: + labels = {} + for key, value in record.__dict__.items(): + if key in KEYS_TO_SKIP: + continue + if value: + labels[key] = value + labels['line_no'] = line + return labels + def __init__(self, url: str, tags: Optional[dict] = None, auth: BasicAuth = None): """ Create new Loki emitter. @@ -89,7 +103,6 @@ def format_label(self, label: str) -> str: def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]: """Return tags that must be send to Loki with a log record.""" tags = dict(self.tags) if isinstance(self.tags, ConvertingDict) else self.tags - tags = copy.deepcopy(tags) tags[self.level_tag] = record.levelname.lower() tags[self.logger_tag] = record.name @@ -99,9 +112,8 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]: for tag_name, tag_value in extra_tags.items(): cleared_name = self.format_label(tag_name) - if cleared_name: + if cleared_name and tag_value: tags[cleared_name] = tag_value - return tags @@ -138,6 +150,6 @@ def build_payload(self, record: logging.LogRecord, line) -> dict: ts = str(int(time.time() * ns)) stream = { "stream": labels, - "values": [[ts, line]], + "values": [[ts, line, LokiEmitter.get_entry_labels(record, line)]], } return {"streams": [stream]} diff --git a/logging_loki/handlers.py b/logging_loki/handlers.py index 74a55cb..c6f376e 100644 --- a/logging_loki/handlers.py +++ b/logging_loki/handlers.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- - +import copy import logging import warnings from logging.handlers import QueueHandler from logging.handlers import QueueListener from queue import Queue -from typing import Dict +from typing import Dict, Callable, Any, Union from typing import Optional from typing import Type @@ -13,18 +13,48 @@ from logging_loki import emitter -class LokiQueueHandler(QueueHandler): +class TagMixin: + """ + A mixin class to support callable tags. + + This is to be inherited from as a first class, eg + >>> class Handler(TagMixin, logging.Handler): + >>> pass + """ + + def __init__(self, tags=None): + self.tags = tags or {} + + def prepare(self, record): + # This is invoked in the same thread in which logging is invoked + # assume the second class has a proper solution for prepare() + try: + record = self.__class__.__bases__[1].prepare(self, record) + except AttributeError: # logging.Handler has no prepare + pass + record.tags = getattr(record, 'tags', {}) + for key, value in (self.tags | record.tags).items(): + if callable(value): + value = value() + if value is None: + continue + record.__dict__[key] = value + return record + + +class LokiQueueHandler(TagMixin, QueueHandler): """This handler automatically creates listener and `LokiHandler` to handle logs queue.""" def __init__(self, queue: Queue, **kwargs): """Create new logger handler with the specified queue and kwargs for the `LokiHandler`.""" - super().__init__(queue) + QueueHandler.__init__(self, queue) + TagMixin.__init__(self, kwargs.get("tags")) self.handler = LokiHandler(**kwargs) # noqa: WPS110 self.listener = QueueListener(self.queue, self.handler) self.listener.start() -class LokiHandler(logging.Handler): +class LokiHandler(TagMixin, logging.Handler): """ Log handler that sends log records to Loki. @@ -39,7 +69,7 @@ class LokiHandler(logging.Handler): def __init__( self, url: str, - tags: Optional[dict] = None, + tags: Optional[Dict[str, Union[Any, Callable]]] = None, auth: Optional[emitter.BasicAuth] = None, version: Optional[str] = None, ): @@ -53,7 +83,8 @@ def __init__( version: Version of Loki emitter to use. """ - super().__init__() + logging.Handler.__init__(self) + TagMixin.__init__(self, tags) if version is None and const.emitter_ver == "0": msg = ( @@ -64,10 +95,16 @@ def __init__( ) warnings.warn(" ".join(msg), DeprecationWarning) + my_tags = tags or {} + version = version or const.emitter_ver - if version not in self.emitters: - raise ValueError("Unknown emitter version: {0}".format(version)) - self.emitter = self.emitters[version](url, tags, auth) + if version == '0' and any(callable(value) for value in my_tags.values()): + raise ValueError('Loki V0 handler does not support callable tags!') + + try: + self.emitter = self.emitters[version](url, tags, auth) + except KeyError as exc: + raise ValueError("Unknown emitter version: {0}".format(version)) from exc def handleError(self, record): # noqa: N802 """Close emitter and let default handler take actions on error.""" @@ -76,8 +113,9 @@ def handleError(self, record): # noqa: N802 def emit(self, record: logging.LogRecord): """Send log record to Loki.""" + record = self.prepare(record) # noinspection PyBroadException try: - self.emitter(record, self.format(record)) + self.emitter(record, record.lineno) except Exception: self.handleError(record) diff --git a/tests/test_emitter_v1.py b/tests/test_emitter_v1.py index b5656e1..7bf564b 100644 --- a/tests/test_emitter_v1.py +++ b/tests/test_emitter_v1.py @@ -176,3 +176,4 @@ def test_can_build_tags_from_converting_dict(emitter_v1): logger = logging.getLogger(logger_name) emitter: LokiEmitterV1 = logger.handlers[0].handler.emitter emitter.build_tags(create_record()) + payload = emitter.build_payload(create_record(), 10) diff --git a/tests/test_real_logs.py b/tests/test_real_logs.py new file mode 100644 index 0000000..9709bd6 --- /dev/null +++ b/tests/test_real_logs.py @@ -0,0 +1,54 @@ +import logging +import time + +import logging_loki +from logging_loki.emitter import LokiEmitterV1 + + +def test_callable_tags(): + class MyEmitter(LokiEmitterV1): + + def build_payload(self, record, line) -> dict: + labels = self.build_tags(record) + ns = 1e9 + ts = str(int(time.time() * ns)) + stream = { + "stream": labels, + "values": [[ts, line, self.get_entry_labels(record, line)]], + } + return {"streams": [stream]} + + def __call__(self, record, line_no): + payload = self.build_payload(record, line_no) + stream = payload['streams'][0]['values'][0][2] + assert 'application' in stream + assert stream['value'] == 5 + assert stream['device'] == 'test' + assert stream['levelname'] == 'WARNING' + + # Register a mock emitter + logging_loki.LokiHandler.emitters['mock_emitter'] = MyEmitter + + handler = logging_loki.LokiHandler( + url="https://example.com/loki/api/v1/push", + tags={"application": "my-app", 'value': lambda: 5}, + auth=("username", "password"), + version="mock_emitter" + ) + logger = logging.getLogger("my-logger") + logger.addHandler(handler) + logger.warning('Error occurred', extra={'tags': {'device': 'test'}}) + logger.warning('Error occurred', extra={'device': 'test'}) + + +def test_not_support_v0(): + try: + logging_loki.LokiHandler( + url="https://example.com/loki/api/v1/push", + tags={"application": "my-app", 'value': lambda: 5}, + auth=("username", "password"), + version="0") + except ValueError: + pass + else: + assert False, 'V0 supports callable labels'