Skip to content

Commit

Permalink
Task queue and other cleanups (#2)
Browse files Browse the repository at this point in the history
This change introduces some cleanups, including:
- creating separate model for object path, encapsulating its features and behaviors
- adding support for running diff tool via poetry command 
- move task queue into `task_queue.py` module, and rework its internal to use runners on n-threads. This way, we can manage task order and choose priority, or enforce constraints
- for now, only permit one task per sqlite database to avoid collisions when accessing the underlying file
- add some extra tests
  • Loading branch information
rousik authored Nov 26, 2023
1 parent e685c73 commit a86d8ab
Show file tree
Hide file tree
Showing 10 changed files with 453 additions and 224 deletions.
9 changes: 8 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
FROM python:3.10-buster
RUN pip install poetry==1.6

# TODO(rousik): Presumably, when we run this tool, we will want to
# either mount remote paths as "local" directories, or cache the remote
# files locally using plentiful storage.

# --cache-dir argument can be used to control where things are cached.


WORKDIR /app
COPY pyproject.toml poetry.lock /app
COPY README.md /app
COPY ./src /app/src
RUN poetry config virtualenvs.create false
RUN poetry install --only main
ENTRYPOINT ["poetry", "run", "python", "-m", "pudl_output_differ.main"]
ENTRYPOINT ["poetry", "run", "diff"]
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ authors = ["Jan Rous <[email protected]>"]
readme = "README.md"
packages = [{include = "pudl_output_differ", from = "src"}]

[tool.poetry.scripts]
diff = "pudl_output_differ.cli:main"

[tool.poetry.dependencies]
python = "^3.10"
pydantic = "^2.3.0"
Expand Down
31 changes: 7 additions & 24 deletions src/pudl_output_differ/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
import markdown

from pudl_output_differ.files import DirectoryAnalyzer, is_remote
from pudl_output_differ.types import TaskQueue
from pudl_output_differ.task_queue import TaskQueue
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry import trace
from mdx_gfm import GithubFlavoredMarkdownExtension

from pudl_output_differ.types import ObjectPath

logger = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)

Expand Down Expand Up @@ -161,37 +163,18 @@ def main() -> int:

task_queue.put(
DirectoryAnalyzer(
object_path=[],
object_path=ObjectPath(),
left_path=lpath,
right_path=rpath,
local_cache_root=args.cache_dir,
filename_filter=args.filename_filter,
)
)
reports = 0
nonempty_reports = 0
for analysis in task_queue.iter_analyses(
catch_exceptions=args.catch_exceptions
):
reports += 1
# TODO(rousik): it would be good if AnalysisReport contained metadata
# identifyng the analyzer that produced it. Perhaps we could use
# wrapper that will contain both the analysis, as well as the analyzer
# metadata, e.g.:
# - object_path
# - instance that produced it (config)
# - possible runtime exception information (so that we can distinguish)
# Analysis itself could have severity (ERROR, WARNING) to indicate
# whether the problem is serious or not.
if analysis.markdown:
nonempty_reports += 1
print(analysis.title)
print(analysis.markdown)
print()
logger.info(f"Total {reports} reports, with {nonempty_reports} nonempty.")
task_queue.run()
task_queue.wait()

if args.html_report:
md = task_queue.to_markdown(catch_exceptions=True)
md = task_queue.to_markdown()
with open(args.html_report, "w") as f:
f.write(MARKDOWN_CSS_STYLE)
f.write('<article class="markdown-body">')
Expand Down
26 changes: 14 additions & 12 deletions src/pudl_output_differ/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pudl_output_differ.sqlite import Database, SQLiteAnalyzer

from pudl_output_differ.types import (
AnalysisReport, Analyzer, KeySetDiff, TaskQueue
AnalysisReport, Analyzer, KeySetDiff, TaskQueueInterface
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,17 +82,19 @@ def retrieve_remote(self, full_path: str) -> str:
# fs to detrmine local path only when needed.
return f.name

# TODO(rousik): passing parents this way is a bit clunky, but acceptable.
@tracer.start_as_current_span(name="DirectoryAnalyzer")
def execute(self, task_queue: TaskQueue) -> AnalysisReport:
def execute(self, task_queue: TaskQueueInterface) -> AnalysisReport:
"""Computes diff between two output directories.
Files on the left and right are compared for presence, children
are deeper-layer analyzers for specific file types that are supported.
"""
sp = trace.get_current_span()
sp.set_attribute("left_path", self.left_path)
sp.set_attribute("right_path", self.right_path)
trace.get_current_span().set_attributes(
{
"left_path": self.left_path,
"right_path": self.right_path,
}
)

lfs = self.get_files(self.left_path)
rfs = self.get_files(self.right_path)

Expand All @@ -106,16 +108,16 @@ def execute(self, task_queue: TaskQueue) -> AnalysisReport:

task_queue.put(
SQLiteAnalyzer(
object_path = [Database(name=shared_file)],
db_name = shared_file,
object_path=self.object_path.extend(Database(name=shared_file)),
db_name=shared_file,
left_db_path=left_path,
right_db_path=right_path,
)
)
# TODO(rousik): other file formats are: json, parquet, yml.

return AnalysisReport(
object_path = [],
title = "# Files",
markdown = file_diff.markdown(long_format=True),
object_path=self.object_path,
title= "# Files",
markdown=file_diff.markdown(long_format=True),
)
112 changes: 56 additions & 56 deletions src/pudl_output_differ/parquet.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,65 @@
"""Module for comparing contents of parquet files."""
# """Module for comparing contents of parquet files."""

import logging
from pudl_output_differ.sqlite import RowCountDiff
from pudl_output_differ.types import DiffEvaluatorBase, DiffTreeNode, KeySetDiff, TaskQueue
import pyarrow.parquet as pq
# import logging
# from pudl_output_differ.sqlite import RowCountDiff
# from pudl_output_differ.types import DiffEvaluatorBase, DiffTreeNode, KeySetDiff, TaskQueue
# import pyarrow.parquet as pq


logger = logging.getLogger(__name__)
# logger = logging.getLogger(__name__)


class ParquetEvaluator(DiffEvaluatorBase):
left_path: str
right_path: str
# class ParquetEvaluator(DiffEvaluatorBase):
# left_path: str
# right_path: str


def get_columns(self, schema: pq.ParquetSchema)-> list[str]:
"""Return list containing column_name::column_type."""
ret = []
for i in len(schema.names):
ret.append(
schema.column(i).name + "::" + schema.column(i).logical_type.type
)
return ret
# def get_columns(self, schema: pq.ParquetSchema)-> list[str]:
# """Return list containing column_name::column_type."""
# ret = []
# for i in len(schema.names):
# ret.append(
# schema.column(i).name + "::" + schema.column(i).logical_type.type
# )
# return ret

def execute(self, task_queue: TaskQueue) -> list[DiffTreeNode]:
"""Compare two parquet files."""
diffs = []
# lfs, lpath = fsspec.core.url_to_fs(self.left_path)
# rfs, rpath = fsspec.open(self.right_path)
# def execute(self, task_queue: TaskQueue) -> list[DiffTreeNode]:
# """Compare two parquet files."""
# diffs = []
# # lfs, lpath = fsspec.core.url_to_fs(self.left_path)
# # rfs, rpath = fsspec.open(self.right_path)

lmeta = pq.read_metadata(self.left_path)
rmeta = pq.read_metadata(self.right_path)
if not lmeta.schema.equals(rmeta.schema):
logger.info("Parquet schemas are different.")
diffs.append(
self.parent_node.add_child(
DiffTreeNode(
name="ParquetSchema",
diff=KeySetDiff.from_sets(
set(self.get_columns(lmeta.schema)),
set(self.get_columns(rmeta.schema)),
entity="columns",
)
)
)
)
# Now, go on to compare the metadata more broadly.
if not lmeta.equals(rmeta):
logger.info("Parquet metadata are different.")
logger.info(f"Left metadata: {lmeta}")
logger.info(f"Right metadata: {rmeta}")
if lmeta.num_rows != rmeta.num_rows:
logger.info("Number of rows are different.")
diffs.append(
self.parent_node.add_child(
DiffTreeNode(
name="ParquetNumRows",
diff=RowCountDiff(
left_rows=lmeta.num_rows,
right_rows=rmeta.num_rows)
)
)
)
return diffs
# lmeta = pq.read_metadata(self.left_path)
# rmeta = pq.read_metadata(self.right_path)
# if not lmeta.schema.equals(rmeta.schema):
# logger.info("Parquet schemas are different.")
# diffs.append(
# self.parent_node.add_child(
# DiffTreeNode(
# name="ParquetSchema",
# diff=KeySetDiff.from_sets(
# set(self.get_columns(lmeta.schema)),
# set(self.get_columns(rmeta.schema)),
# entity="columns",
# )
# )
# )
# )
# # Now, go on to compare the metadata more broadly.
# if not lmeta.equals(rmeta):
# logger.info("Parquet metadata are different.")
# logger.info(f"Left metadata: {lmeta}")
# logger.info(f"Right metadata: {rmeta}")
# if lmeta.num_rows != rmeta.num_rows:
# logger.info("Number of rows are different.")
# diffs.append(
# self.parent_node.add_child(
# DiffTreeNode(
# name="ParquetNumRows",
# diff=RowCountDiff(
# left_rows=lmeta.num_rows,
# right_rows=rmeta.num_rows)
# )
# )
# )
# return diffs
29 changes: 18 additions & 11 deletions src/pudl_output_differ/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
AnalysisReport,
Analyzer,
KeySetDiff,
TaskQueue,
TaskQueueInterface,
TypeDef,
)

Expand Down Expand Up @@ -45,19 +45,28 @@ class SQLiteSettings(BaseSettings):

class Database(TypeDef):
"""Represents a database."""

name: str

def __str__(self):
return f"Database({self.name})"


class Table(TypeDef):
"""Represents a table in a database."""

name: str

def __str__(self):
return f"Table({self.name})"


class Partition(TypeDef):
"""Represents partition of a table."""
pk: str

def __str__(self):
return f"Partition(key:{self.pk})"


class SQLiteAnalyzer(Analyzer):
db_name: str
Expand All @@ -77,11 +86,10 @@ def should_process_table(self, table_name) -> bool:
return True
return table_name in self.settings.sqlite_tables_only

@tracer.start_as_current_span(name="SQLiteAnalyzer.execute")
def execute(self, task_queue: TaskQueue) -> AnalysisReport:
def execute(self, task_queue: TaskQueueInterface) -> AnalysisReport:
"""Analyze tables and their schemas."""
sp = trace.get_current_span()
sp.set_attribute("db_name", self.db_name)
trace.get_current_span().set_attribute("db_name", self.db_name)

ldb = create_engine(f"sqlite:///{self.left_db_path}")
rdb = create_engine(f"sqlite:///{self.right_db_path}")

Expand All @@ -101,7 +109,7 @@ def execute(self, task_queue: TaskQueue) -> AnalysisReport:

task_queue.put(
TableAnalyzer(
object_path=self.extend_path(Table(name=table_name)),
object_path=self.object_path.extend(Table(name=table_name)),
left_db_path=self.left_db_path,
right_db_path=self.right_db_path,
db_name=self.db_name,
Expand Down Expand Up @@ -171,7 +179,7 @@ def get_partitions(self, conn: Connection) -> dict[str, int]:

@tracer.start_as_current_span("split_to_partitioned_tasks")
def split_to_partitioned_tasks(
self, task_queue: TaskQueue, lconn: Connection, rconn: Connection
self, task_queue: TaskQueueInterface, lconn: Connection, rconn: Connection
) -> AnalysisReport:
"""Splits table analysis into partitioned tasks.
Expand Down Expand Up @@ -209,7 +217,7 @@ def split_to_partitioned_tasks(
for partition_key in partition_diff.shared:
task_queue.put(
TableAnalyzer(
object_path=self.extend_path(Partition(pk=partition_key)),
object_path=self.object_path.extend(Partition(pk=partition_key)),
left_db_path=self.left_db_path,
right_db_path=self.right_db_path,
db_name=self.db_name,
Expand All @@ -233,8 +241,7 @@ def split_to_partitioned_tasks(
markdown=md.getvalue(),
)

@tracer.start_as_current_span(name="TableAnalyzer.execute")
def execute(self, task_queue: TaskQueue) -> AnalysisReport:
def execute(self, task_queue: TaskQueueInterface) -> AnalysisReport:
"""Analyze tables and their schemas."""
sp = trace.get_current_span()
sp.set_attribute("db_name", self.db_name)
Expand Down
Loading

0 comments on commit a86d8ab

Please sign in to comment.