Skip to content

Commit

Permalink
Couple of diagnostic features.
Browse files Browse the repository at this point in the history
1. added support for prometheus metrics for runtime diagnostics.
2. slow task tracking - logs tasks over 5 minutes once and reports
all slow tasks in the final report.
  • Loading branch information
rousik committed Dec 4, 2023
1 parent 54493a9 commit 75df56f
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 147 deletions.
44 changes: 43 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ py-gfm = "^2.0.0"
backoff = "^2.2.1"
progress = "^1.6"
opentelemetry-exporter-gcp-trace = "^1.6.0"
prometheus-client = "^0.19.0"
psutil = "^5.9.6"


[tool.poetry.group.dev.dependencies]
Expand Down
91 changes: 62 additions & 29 deletions src/pudl_output_differ/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,25 @@
import argparse
import atexit
import logging
import os
import shutil
import sys
import tempfile

import fsspec
import markdown
import psutil
from mdx_gfm import GithubFlavoredMarkdownExtension
from opentelemetry import trace
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from prometheus_client import Gauge, start_http_server

from pudl_output_differ.files import DirectoryAnalyzer, is_remote
from pudl_output_differ.task_queue import TaskQueue
from pudl_output_differ.task_queue import TaskQueue, TaskQueueSettings
from pudl_output_differ.types import ObjectPath

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -96,19 +99,6 @@ def parse_command_line(argv) -> argparse.Namespace:
default="",
help="""If set, write html markdown report into this file.""",
)

parser.add_argument(
"--github-repo",
type=str,
default="",
help="Name of the github repository where comments should be posted.",
)
parser.add_argument(
"--github-pr",
type=int,
default=0,
help="If supplied, diff will be published as a comment to the github PR.",
)
parser.add_argument(
"--gcp-cloud-trace",
type=bool,
Expand All @@ -128,6 +118,25 @@ def parse_command_line(argv) -> argparse.Namespace:
# default="INFO",
help="Controls the severity of logging.",
)
parser.add_argument(
"--prometheus-port",
type=int,
default=9101,
help="Port on which to start prometheus metrics server."
)
# parser.add_argument(
# "--github-repo",
# type=str,
# default="",
# help="Name of the github repository where comments should be posted.",
# )
# parser.add_argument(
# "--github-pr",
# type=int,
# default=0,
# help="If supplied, diff will be published as a comment to the github PR.",
# )

arguments = parser.parse_args(argv[1:])
return arguments

Expand All @@ -143,14 +152,14 @@ def setup_tracing(args: argparse.Namespace) -> None:
)
if args.otel_trace_backend:
logger.info(f"Publishing traces to OTEL backend {args.otel_trace_backend}")
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=args.trace_backend))
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=args.otel_trace_backend))
provider.add_span_processor(processor)

if args.gcp_cloud_trace:
logger.info("Publishing traces to Google Cloud Trace service.")
provider.add_span_processor(
BatchSpanProcessor(CloudTraceSpanExporter())
)
)
trace.set_tracer_provider(provider)


Expand All @@ -169,20 +178,44 @@ def main() -> int:
lpath = args.left
rpath = args.right

with tracer.start_as_current_span(name="main"):
task_queue = TaskQueue(max_workers=args.max_workers)

task_queue.put(
DirectoryAnalyzer(
object_path=ObjectPath(),
left_path=lpath,
right_path=rpath,
local_cache_root=args.cache_dir,
filename_filter=args.filename_filter,
)
if args.prometheus_port:
start_http_server(args.prometheus_port)
Gauge("cpu_usage", "Usage of the CPU in percent.").set_function(
lambda: psutil.cpu_percent(interval=1)
)
Gauge("memory_usage", "Usage of the memory in percent.").set_function(
lambda: psutil.virtual_memory().percent
)
proc_self = psutil.Process(os.getpid())
Gauge("process_memory_rss", "RSS of the Python process").set_function(
lambda: proc_self.memory_info().rss
)
task_queue.run()
task_queue.wait()
Gauge("process_memory_vms", "VMS of the Python process").set_function(
lambda: proc_self.memory_info().vms
)
# TODO(rousik): proc_self.cpu_times() can also be helpful to get total CPU burn.
# Note that the above approach may not be optimal, we might choose to use
# with proc_self.oneshot(): to avoid repeated calls, and perhaps run the
# updater in a background thread that can sleep a lot.


task_queue = TaskQueue(
settings=TaskQueueSettings(
max_workers=args.max_workers,
)
)

task_queue.put(
DirectoryAnalyzer(
object_path=ObjectPath(),
left_path=lpath,
right_path=rpath,
local_cache_root=args.cache_dir,
filename_filter=args.filename_filter,
)
)
task_queue.run()
task_queue.wait()

if args.html_report:
md = task_queue.to_markdown()
Expand Down
13 changes: 6 additions & 7 deletions src/pudl_output_differ/files.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
"""Generic utilities for diffing PUDL_OUTPUT directories."""

from pathlib import Path
import logging
import re
from pathlib import Path
from typing import Counter, Iterator
from opentelemetry import trace

import fsspec
from opentelemetry import trace

from pudl_output_differ.parquet import ParquetAnalyzer, ParquetFile
from pudl_output_differ.sqlite import Database, SQLiteAnalyzer

from pudl_output_differ.types import (
Result, Analyzer, KeySetDiff, TaskQueueInterface
)
from pudl_output_differ.types import Analyzer, KeySetDiff, Result, TaskQueueInterface

logger = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)
Expand All @@ -38,7 +36,8 @@ class DirectoryAnalyzer(Analyzer):
filename_filter: str = ""

def get_title(self) -> str:
return "## Files"
return "Files"

def get_files(self, root_path: str) -> dict[str, str]:
"""Returns list of files in the output directory.
Expand Down
82 changes: 46 additions & 36 deletions src/pudl_output_differ/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import backoff
import pandas as pd
from opentelemetry import trace
from prometheus_client import Summary
from pydantic_settings import BaseSettings, SettingsConfigDict
from sqlalchemy import Connection, Engine, create_engine, inspect, text
from sqlalchemy.exc import OperationalError
Expand All @@ -23,12 +24,13 @@
logger = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)

SQLITE_CONNECT_SECONDS = Summary("sqlite_connect_seconds", "Time spent connecting sqlite databases.", ["db_name", "table_name"])

REPORT_YEAR_PARTITION = "substr(report_date, 1, 4)"


# TODO(rousik): for the sake of unit-testing, we should be passing
# these as `settings` to the individual analyzers.
# TODO(rousik): for the sake of unit-testing, we should be passing
# these as `settings` to the individual analyzers.
class SQLiteSettings(BaseSettings):
"""Default configuration for this module."""

Expand All @@ -46,6 +48,10 @@ class SQLiteSettings(BaseSettings):
# rows for the matching PKs.
single_pass_pk_comparison: bool = False

# If this is set to True, records and columns outside of PK columns will be
# compared. By default we only look for PK columns.
enable_full_row_comparison: bool = False


class Database(TypeDef):
"""Represents a database."""
Expand Down Expand Up @@ -138,7 +144,7 @@ class TableAnalyzer(Analyzer):

def get_title(self) -> str:
"""Returns the title of the analysis."""
title = f"## Table {self.db_name}/{self.table_name}"
title = f"Table {self.db_name}/{self.table_name}"
if self.partition_key:
title += f" (partition {self.get_partition_func()}=`{self.partition_key}`)"
return title
Expand Down Expand Up @@ -247,7 +253,7 @@ def split_to_partitioned_tasks(

if md.tell() > 0:
yield Result(markdown=md.getvalue())

@backoff.on_exception(backoff.expo, OperationalError, max_tries=4)
def retry_connect(self, engine: Engine) -> Connection:
"""Connects to the database, retrying on OperationalError."""
Expand All @@ -266,9 +272,6 @@ def execute(self, task_queue: TaskQueueInterface) -> Iterator[Result]:
l_db_engine = create_engine(f"sqlite:///{self.left_db_path}")
r_db_engine = create_engine(f"sqlite:///{self.right_db_path}")

lconn = self.retry_connect(l_db_engine)
rconn = self.retry_connect(r_db_engine)

# TODO(rousik): test for schema discrepancies here.
l_pk = self.get_pk_columns(l_db_engine)
r_pk = self.get_pk_columns(r_db_engine)
Expand All @@ -278,11 +281,17 @@ def execute(self, task_queue: TaskQueueInterface) -> Iterator[Result]:
f"Primary key columns for {self.table_name} do not match."
)

with SQLITE_CONNECT_SECONDS.labels(self.db_name, self.table_name).time():
lconn = self.retry_connect(l_db_engine)
rconn = self.retry_connect(r_db_engine)

if not self.partition_key and self.is_partitioned_table():
for res in self.split_to_partitioned_tasks(task_queue, lconn, rconn):
yield res

if not l_pk:
if not self.settings.enable_full_row_comparison:
return
for res in self.compare_raw_tables(lconn, rconn):
yield res
else:
Expand Down Expand Up @@ -446,34 +455,35 @@ def compare_pk_tables(
)
yield Result(markdown=f" * removed {rows_removed} rows {pct_change}\n")

with tracer.start_as_current_span("compare_overlapping_rows") as sp:
sp.set_attribute("num_rows", len(overlap_index))
if self.settings.single_pass_pk_comparison:
ldf = ldf.loc[overlap_index]
rdf = rdf.loc[overlap_index]
else:
with tracer.start_as_current_span("load_overlapping_rows"):
ldf = self.get_records(ldb, columns=cols_intact, index_columns=pk_cols)
if self.settings.enable_full_row_comparison:
with tracer.start_as_current_span("compare_overlapping_rows") as sp:
sp.set_attribute("num_rows", len(overlap_index))
if self.settings.single_pass_pk_comparison:
ldf = ldf.loc[overlap_index]

rdf = self.get_records(rdb, columns=cols_intact, index_columns=pk_cols)
rdf = rdf.loc[overlap_index]

diff_rows = ldf.compare(rdf, result_names=("left", "right"))
rows_changed = len(diff_rows)
if rows_changed:
pct_change = float(rows_changed) * 100 / orig_row_count
yield Result(markdown=f" * changed {rows_changed} rows ({pct_change:.2f}% change)\n")

# calculate number of rows that have changes in a particular column
changes_per_col = (~diff_rows.T.isna()).groupby(level=0).any().T.sum()
changes_per_col = changes_per_col.to_frame().reset_index()
changes_per_col.columns = ["column_name", "num_rows"]

# TODO(rousik): assign column names: column_name, rows_changed
# TODO(rousik): This could be severity DIAGNOSTIC.

cc = StringIO()
cc.write("\nNumber of changes found per column:\n\n")
cc.write(changes_per_col.to_markdown())
yield Result(severity=ReportSeverity.DIAGNOSTIC, markdown=cc.getvalue())
else:
with tracer.start_as_current_span("load_overlapping_rows"):
ldf = self.get_records(ldb, columns=cols_intact, index_columns=pk_cols)
ldf = ldf.loc[overlap_index]

rdf = self.get_records(rdb, columns=cols_intact, index_columns=pk_cols)
rdf = rdf.loc[overlap_index]

diff_rows = ldf.compare(rdf, result_names=("left", "right"))
rows_changed = len(diff_rows)
if rows_changed:
pct_change = float(rows_changed) * 100 / orig_row_count
yield Result(markdown=f" * changed {rows_changed} rows ({pct_change:.2f}% change)\n")

# calculate number of rows that have changes in a particular column
changes_per_col = (~diff_rows.T.isna()).groupby(level=0).any().T.sum()
changes_per_col = changes_per_col.to_frame().reset_index()
changes_per_col.columns = ["column_name", "num_rows"]

# TODO(rousik): assign column names: column_name, rows_changed
# TODO(rousik): This could be severity DIAGNOSTIC.

cc = StringIO()
cc.write("\nNumber of changes found per column:\n\n")
cc.write(changes_per_col.to_markdown())
yield Result(severity=ReportSeverity.DIAGNOSTIC, markdown=cc.getvalue())
Loading

0 comments on commit 75df56f

Please sign in to comment.