Skip to content

Commit

Permalink
Replace AnalysisReport with Iterator[Result]
Browse files Browse the repository at this point in the history
Drop the old AnalysisReport structure in favor of lightweight Result
that just has severity and markdown content.

Switch execute() methods to yield Results which makes the flow a bit
easier to deal with.

Titles are now generated by analyzer.get_title() instead of being
bundled with the reports themselves, and similarly, object_paths
are part of the task_queue metadada rather than Results.

Also added progress bar and removed continuous printing of results
as they become available. We are at a stage where final report
may be good enough.

Logging may still interfere with progress bar though, some other
libraries may solve this, but for now, it's probably okay as-is.
  • Loading branch information
rousik committed Nov 28, 2023
1 parent 1064b8a commit c18eaaf
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 160 deletions.
60 changes: 35 additions & 25 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ opentelemetry-exporter-otlp-proto-grpc = "^1.20.0"
markdown = "^3.5.1"
py-gfm = "^2.0.0"
backoff = "^2.2.1"
progress = "^1.6"


[tool.poetry.group.dev.dependencies]
Expand Down
8 changes: 2 additions & 6 deletions src/pudl_output_differ/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,12 @@ def main() -> int:
},
),
)
# TODO(rousik): add support for CloudTraceSpanExporter here, see
# https://cloud.google.com/trace/docs/setup/python-ot
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=args.trace_backend))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

# provider = TracerProvider()
# TODO(rousik): add support for other trace backends.
# processor = BatchSpanProcessor(ConsoleSpanExporter())
# provider.add_span_processor(processor)
# trace.set_tracer_provider(provider)

if not args.cache_dir and any(is_remote(p) for p in [args.left, args.right]):
args.cache_dir = tempfile.mkdtemp()
logger.info(f"Created temporary cache directory {args.cache_dir}")
Expand Down
16 changes: 7 additions & 9 deletions src/pudl_output_differ/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
from pathlib import Path
import logging
import re
from typing import Counter
from typing import Counter, Iterator
from opentelemetry import trace

import fsspec
from pudl_output_differ.parquet import ParquetAnalyzer, ParquetFile
from pudl_output_differ.sqlite import Database, SQLiteAnalyzer

from pudl_output_differ.types import (
AnalysisReport, Analyzer, KeySetDiff, TaskQueueInterface
Result, Analyzer, KeySetDiff, TaskQueueInterface
)

logger = logging.getLogger(__name__)
Expand All @@ -37,7 +37,8 @@ class DirectoryAnalyzer(Analyzer):
local_cache_root: str | None = None
filename_filter: str = ""


def get_title(self) -> str:
return "## Files"
def get_files(self, root_path: str) -> dict[str, str]:
"""Returns list of files in the output directory.
Expand Down Expand Up @@ -84,7 +85,7 @@ def retrieve_remote(self, full_path: str) -> str:
# fs to detrmine local path only when needed.
return f.name

def execute(self, task_queue: TaskQueueInterface) -> AnalysisReport:
def execute(self, task_queue: TaskQueueInterface) -> Iterator[Result]:
"""Computes diff between two output directories.
Files on the left and right are compared for presence, children
Expand Down Expand Up @@ -136,8 +137,5 @@ def execute(self, task_queue: TaskQueueInterface) -> AnalysisReport:
for fmt, count in unsupported_formats.items():
logger.warning(f"Unsupported file format {fmt} found {count} times.")

return AnalysisReport(
object_path=self.object_path,
title= "# Files",
markdown=file_diff.markdown(long_format=True),
)
if file_diff.has_diff():
yield Result(markdown=file_diff.markdown(long_format=True))
23 changes: 11 additions & 12 deletions src/pudl_output_differ/parquet.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
"""Module for comparing contents of parquet files."""

from io import StringIO
import logging
from typing import Iterator

from pydantic_settings import BaseSettings, SettingsConfigDict
from pudl_output_differ.types import AnalysisReport, Analyzer, TaskQueueInterface, TypeDef
import pyarrow.parquet as pq
from pydantic_settings import BaseSettings, SettingsConfigDict

from pudl_output_differ.types import (
Analyzer,
Result,
TaskQueueInterface,
TypeDef,
)

logger = logging.getLogger(__name__)

Expand All @@ -29,23 +35,16 @@ class ParquetAnalyzer(Analyzer):
right_path: str
# TODO(rousik): add settings once we know how to tune this

def execute(self, task_queue: TaskQueueInterface) -> AnalysisReport:
md = StringIO()
def execute(self, task_queue: TaskQueueInterface) -> Iterator[Result]:
lmeta = pq.read_metadata(self.left_path)
rmeta = pq.read_metadata(self.right_path)
if not lmeta.schema.equals(rmeta.schema):
md.write("* parquet schemas are different.\n")
yield Result(markdown=" * parquet schemas are different.\n")
# TODO(rousik): add comparison of schema columns

# TODO(rousik): try loading the contents of the parquet files
# for comparison. This will be similar to sqlite and may reuse
# the same pandas machinery.
return AnalysisReport(
object_path=self.object_path,
title=f"## Parquet file {self.name}",
markdown=md.getvalue()
)


# logger = logging.getLogger(__name__)

Expand Down
Loading

0 comments on commit c18eaaf

Please sign in to comment.