Skip to content

Commit

Permalink
feat: Add a parallel JSON log by default (#1892)
Browse files Browse the repository at this point in the history
* feat: Add a parallel ECS formatted JSON log by default

For ease of programmatic parsing, and uploading to Kibana with filebeat
  • Loading branch information
favilo authored Jan 2, 2025
1 parent 0cc6bd5 commit 1c412a9
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 12 deletions.
26 changes: 21 additions & 5 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,13 @@ Logging in Rally is configured in ``~/.rally/logging.json``. For more informatio
* The Python reference documentation on the `logging configuration schema <https://docs.python.org/3/library/logging.config.html#logging-config-dictschema>`_ explains the file format.
* The `logging handler documentation <https://docs.python.org/3/library/logging.handlers.html>`_ describes how to customize where log output is written to.

By default, Rally will log all output to ``~/.rally/logs/rally.log``. The default timestamp is UTC, but users can opt for the local time by setting ``"timezone": "localtime"`` in the logging configuration file.
By default, Rally will log all output to ``~/.rally/logs/rally.log`` in plain text format and ``~/.rally/logs/rally.json`` in ECS JSON format.
The default timestamp for ``rally.log`` is UTC, but users can opt for the local time by setting ``"timezone": "localtime"`` in the logging configuration file.
The ``rally.json`` file is formatted to the ECS format for ease of ingestion with filebeat. See the `ECS Reference <https://www.elastic.co/guide/en/ecs/current/ecs-using-ecs.html>`_ for more information.

There are a number of default options for the ``json`` logger that can be overridden in ``~/.rally/logging.json``.
First, ``exclude_fields`` will exclude ``log.original`` from the ECS defaults, since it can be quite noisy and superfluous.
And ``mutators`` is by default set to ``["esrally.log.rename_actor_fields", "esrally.log.rename_async_fields"]`` which will rename ``actorAddress`` and ``taskName`` to ``rally.thespian.actorAddress`` and ``python.asyncio.task`` respectively.

The log file will not be rotated automatically as this is problematic due to Rally's multi-process architecture. Setup an external tool like `logrotate <https://linux.die.net/man/8/logrotate>`_ to achieve that. See the following example as a starting point for your own ``logrotate`` configuration and ensure to replace the path ``/home/user/.rally/logs/rally.log`` with the proper one::

Expand All @@ -207,7 +213,7 @@ The log file will not be rotated automatically as this is problematic due to Ral
Example
~~~~~~~

With the following configuration Rally will log all output to standard error::
With the following configuration Rally will log all output to standard error, and format the timestamps in the local timezone::

{
"version": 1,
Expand Down Expand Up @@ -290,7 +296,10 @@ Example
~~~~~~~

With the following configuration Rally will log to ``~/.rally/logs/rally.log`` and ``~/.rally/logs/rally.json``, the
latter being a JSON file::
latter being a JSON file.

The ``mutators`` property is optional and defaults to ``["esrally.log.rename_actor_fields", "esrally.log.rename_async_fields"]``.
Similarly, the ``exclude_fields`` property is optional and defaults to ``["log.original"]``::

{
"version": 1,
Expand All @@ -301,8 +310,15 @@ latter being a JSON file::
"()": "esrally.log.configure_utc_formatter"
},
"json": {
"datefmt": "%Y-%m-%d %H:%M:%S",
"class": "pythonjsonlogger.jsonlogger.JsonFormatter"
"format": "%(message)s",
"exclude_fields": [
"log.original"
],
"mutators": [
"esrally.log.rename_actor_fields",
"esrally.log.rename_async_fields"
],
"()": "esrally.log.configure_ecs_formatter"
}
},
"handlers": {
Expand Down
60 changes: 58 additions & 2 deletions esrally/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import logging.config
import os
import time
import typing

import ecs_logging

from esrally import paths
from esrally.utils import io
from esrally.utils import collections, io


# pylint: disable=unused-argument
def configure_utc_formatter(*args, **kwargs):
def configure_utc_formatter(*args: typing.Any, **kwargs: typing.Any) -> logging.Formatter:
"""
Logging formatter that renders timestamps UTC, or in the local system time zone when the user requests it.
"""
Expand All @@ -40,6 +43,59 @@ def configure_utc_formatter(*args, **kwargs):
return formatter


MutatorType = typing.Callable[[logging.LogRecord, typing.Dict[str, typing.Any]], None]


class RallyEcsFormatter(ecs_logging.StdlibFormatter):
def __init__(
self,
*args: typing.Any,
mutators: typing.Optional[typing.List[MutatorType]] = None,
**kwargs: typing.Any,
):
super().__init__(*args, **kwargs)
self.mutators = mutators or []

def format_to_ecs(self, record: logging.LogRecord) -> typing.Dict[str, typing.Any]:
log_dict = super().format_to_ecs(record)
self.apply_mutators(record, log_dict)
return log_dict

def apply_mutators(self, record: logging.LogRecord, log_dict: typing.Dict[str, typing.Any]) -> None:
for mutator in self.mutators:
mutator(record, log_dict)


def rename_actor_fields(record: logging.LogRecord, log_dict: typing.Dict[str, typing.Any]) -> None:
fields = {}
if log_dict.get("actorAddress"):
fields["address"] = log_dict.pop("actorAddress")
if fields:
collections.deep_update(log_dict, {"rally": {"thespian": fields}})


# Special case for asyncio fields as they are not part of the standard ECS log dict
def rename_async_fields(record: logging.LogRecord, log_dict: typing.Dict[str, typing.Any]) -> None:
fields = {}
if hasattr(record, "taskName") and record.taskName is not None:
fields["task"] = record.taskName
if fields:
collections.deep_update(log_dict, {"python": {"asyncio": fields}})


def configure_ecs_formatter(*args: typing.Any, **kwargs: typing.Any) -> ecs_logging.StdlibFormatter:
"""
ECS Logging formatter
"""
fmt = kwargs.pop("format", None)
configurator = logging.config.BaseConfigurator({})
mutators = kwargs.pop("mutators", [rename_actor_fields, rename_async_fields])
mutators = [fn if callable(fn) else configurator.resolve(fn) for fn in mutators]

formatter = RallyEcsFormatter(fmt=fmt, mutators=mutators, *args, **kwargs)
return formatter


def log_config_path():
"""
:return: The absolute path to Rally's log configuration file.
Expand Down
41 changes: 36 additions & 5 deletions esrally/resources/logging.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@
"datefmt": "%Y-%m-%d %H:%M:%S",
"()": "esrally.log.configure_utc_formatter"
},
"json": {
"format": "%(message)s %(taskName)s",
"exclude_fields": [
"log.original"
],
"mutators": [
"esrally.log.rename_actor_fields",
"esrally.log.rename_async_fields"
],
"()": "esrally.log.configure_ecs_formatter"
},
"profile": {
"format": "%(asctime)s,%(msecs)d PID:%(process)d %(name)s %(levelname)s %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
Expand All @@ -23,33 +34,53 @@
"filename": "${LOG_PATH}/rally.log",
"encoding": "UTF-8",
"formatter": "normal",
"filters": ["isActorLog"]
"filters": [
"isActorLog"
]
},
"rally_profile_handler": {
"()": "esrally.log.configure_profile_file_handler",
"filename": "${LOG_PATH}/profile.log",
"delay": true,
"encoding": "UTF-8",
"formatter": "profile"
},
"rally_json_handler": {
"()": "esrally.log.configure_file_handler",
"filename": "${LOG_PATH}/rally.json",
"encoding": "UTF-8",
"formatter": "json",
"filters": [
"isActorLog"
]
}
},
"root": {
"handlers": ["rally_log_handler"],
"handlers": [
"rally_log_handler",
"rally_json_handler"
],
"level": "INFO"
},
"loggers": {
"elasticsearch": {
"handlers": ["rally_log_handler"],
"handlers": [
"rally_log_handler"
],
"level": "WARNING",
"propagate": false
},
"rally.profile": {
"handlers": ["rally_profile_handler"],
"handlers": [
"rally_profile_handler"
],
"level": "INFO",
"propagate": false
},
"elastic_transport": {
"handlers": ["rally_log_handler"],
"handlers": [
"rally_log_handler"
],
"level": "WARNING",
"propagate": false
}
Expand Down
33 changes: 33 additions & 0 deletions esrally/utils/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import copy
from typing import Any, Generator, Mapping


Expand All @@ -37,3 +38,35 @@ def merge_dicts(d1: Mapping[str, Any], d2: Mapping[str, Any]) -> Generator[Any,
yield k, d1[k]
else:
yield k, d2[k]


def deep_update(orig_dict: Mapping[str, Any], *updates: Mapping[str, Any]) -> None:
"""
Recursively updates a `dict` with other dicts in place.
NOTE: This function has different semantics than `merge_dicts` as it does not merge lists.
For example:
```python
d1 = {"foo": [1, 2, 3]}
d2 = {"foo": [3, 4, 5]}
merged = merge_dicts(d1, d2))
updated = copy.deepcopy(d1)
deep_update(updated, d2)
assert merged == {"foo": [1, 2, 3, 4, 5]}
assert updated == {"foo": [3, 4, 5]}
```
:param orig_dict: The original dict. May be empty.
:param updates: The dicts to update originale dict with. May be empty.
"""
for update in updates:
if update is None:
continue
for k, v in update.items():
if k in orig_dict and isinstance(orig_dict[k], Mapping) and isinstance(v, Mapping):
deep_update(orig_dict[k], v)
else:
orig_dict[k] = copy.deepcopy(v)
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ dependencies = [
"typing-extensions==4.12.2",
# License: BSD-2-Clause license
"python-json-logger==2.0.7",
# License: Apache 2.0
"ecs-logging==2.2.0",
]

[project.optional-dependencies]
Expand Down
114 changes: 114 additions & 0 deletions tests/utils/collections_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,117 @@ def test_can_merge_nested_booleans_in_dicts(self):
"other": [1, 2, 3],
}
}


class TestDeepUpdate:
def testcan_update_empty_dict(self):
d1 = {}
d2 = {}

collections.deep_update(d1, d2)

assert d1 == d2
assert d1 is not d2

def test_can_update_with_no_update(self):
d1 = {"foo": "bar"}

collections.deep_update(d1)

assert d1 is not None
assert d1["foo"] == "bar"

def test_can_update_with_empty_dict(self):
d1 = {"foo": "bar"}
d2 = {}

collections.deep_update(d1, d2)

assert d1 is not None
assert d1 is not d2
assert d1["foo"] == "bar"

def test_can_update_empty_dict(self):
d1 = {}
d2 = {"foo": "bar"}

collections.deep_update(d1, d2)

assert d1 == d2
assert d1 is not d2
assert d1["foo"] == "bar"

def test_can_update_non_empty_dict(self):
d1 = {"foo": "bar"}
d2 = {"foo": "baz"}

collections.deep_update(d1, d2)

assert d1 == d2
assert d1 is not d2
assert d1["foo"] == "baz"

def test_can_update_nested_dict(self):
d1 = {"foo": {"bar": "baz"}}
d2 = {"foo": {"bar": "qux"}}

collections.deep_update(d1, d2)

assert d1 == d2
assert d1 is not d2
assert d1["foo"]["bar"] == "qux"

def test_can_update_nested_list(self):
d1 = {"foo": [1, 2, 3]}
d2 = {"foo": [3, 4, 5]}

collections.deep_update(d1, d2)

assert d1 == d2
assert d1 is not d2
assert d1["foo"] == [3, 4, 5]

@pytest.mark.parametrize("seed", range(20))
def test_can_update_randomized_empty_and_non_empty_dict(self, seed):
random.seed(seed)

dct = {"params": {"car-params": {"data_paths": "/mnt/local_ssd"}}}
d1: Mapping[Any, Any] = random.choice([{}, dct])
d2: Mapping[Any, Any] = dct if not d1 else {}

collections.deep_update(d1, d2)

assert d1 == dct

def test_can_update_nested_dicts(self):
d1 = {
"params": {
"car": "4gheap",
"car-params": {"additional_cluster_settings": {"indices.queries.cache.size": "5%", "transport.tcp.compress": True}},
"unique-param": "foobar",
}
}

d2 = {"params": {"car-params": {"data_paths": "/mnt/local_ssd"}}}
collections.deep_update(d1, d2)

assert d1 == {
"params": {
"car-params": {
"additional_cluster_settings": {"indices.queries.cache.size": "5%", "transport.tcp.compress": True},
"data_paths": "/mnt/local_ssd",
},
"car": "4gheap",
"unique-param": "foobar",
}
}

def test_nested_updates_are_deep_copied(self):
d1 = {"foo": {"bar": "baz"}}
d2 = {"foo": {"bar": [1, 2, 3]}}
collections.deep_update(d1, d2)

assert d1 is not d2 # type: ignore[comparison-overlap]
assert d1["foo"] is not d2["foo"] # type: ignore[comparison-overlap]
assert d1["foo"]["bar"] is not d2["foo"]["bar"] # type: ignore[comparison-overlap]
assert d1["foo"]["bar"] == [1, 2, 3] # type: ignore[comparison-overlap]

0 comments on commit 1c412a9

Please sign in to comment.