Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sec10k metadata directly in PUDL #4035

Merged
merged 23 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions src/pudl/analysis/pudl_models.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels odd that this is under the analysis subpackage given that the analysis isn't done here. It seems more like a straightforward ETL of some tables, albeit tables that we ourselves have produced elsewhere. Would it be more legible if it followed the same pattern as for other datasets, adding an sec10k module under the extract and transform subpackages?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the organization feels weird, but I don't think that would quite make sense either. We are doing some basic transformations in PUDL right now, but ideally these would all be moved back upstream so all we actually do in PUDL is read the parquet files in so they can be distributed. In that case the extract step would be creating core/out tables which doesn't feel right. Would it make sense to add this to the etl directory?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is just a new class of externally processed table? If it doesn't clearly fit into one of the other categories right now then I guess we can figure out where to put it when other tables like this show up.

The etl directory feels like a weird junk drawer to me. I still like the idea of organizing the subpackages by data source, given that different data sources have ended up having slightly different ETL journeys that don't necessarily fit neatly in to E, T, and L (and the fact that L has been almost totally take over by Dagster infra)

Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import os

import pandas as pd
import pyarrow as pa
from dagster import AssetsDefinition, asset
from deltalake import DeltaTable
zaneselvans marked this conversation as resolved.
Show resolved Hide resolved


def get_model_tables() -> list[str]:
def _get_model_tables() -> list[str]:
"""Return all tables produced by PUDL models or empty list if env variable not set."""
pudl_models_tables = []
if os.getenv("USE_PUDL_MODELS"):
Expand All @@ -35,21 +33,11 @@ def pudl_models_asset_factory(table_name: str) -> AssetsDefinition:
group_name="pudl_models",
)
def _asset() -> pd.DataFrame:
return DeltaTable(_get_table_uri(table_name)).to_pandas()
return pd.read_parquet(_get_table_uri(table_name))

return _asset


def get_pudl_models_assets() -> list[AssetsDefinition]:
"""Generate a collection of assets for all PUDL model tables."""
return [pudl_models_asset_factory(table) for table in get_model_tables()]


def get_model_table_schemas() -> list[str, str, pa.Schema]:
"""Return pyarrow schemas for all PUDL models tables."""
dts = [DeltaTable(_get_table_uri(table_name)) for table_name in get_model_tables()]

return [
(dt.metadata().name, dt.metadata().description, dt.schema().to_pyarrow())
for dt in dts
]
return [pudl_models_asset_factory(table) for table in _get_model_tables()]
14 changes: 2 additions & 12 deletions src/pudl/io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from upath import UPath

import pudl
from pudl.analysis.pudl_models import get_model_tables
from pudl.metadata.classes import PUDL_PACKAGE, Package, Resource
from pudl.workspace.setup import PudlPaths

Expand Down Expand Up @@ -322,22 +321,13 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
class PudlParquetIOManager(IOManager):
"""IOManager that writes pudl tables to pyarrow parquet files."""

def _get_table_resource(self, table_name: str) -> Resource:
"""Return resource class for table."""
if table_name not in get_model_tables():
res = Resource.from_id(table_name)
else:
# For tables coming from PUDL modelling repo just use already parsed resource metadata
[res] = [r for r in PUDL_PACKAGE.resources if r.name == table_name]
return res

def handle_output(self, context: OutputContext, df: Any) -> None:
"""Writes pudl dataframe to parquet file."""
assert isinstance(df, pd.DataFrame), "Only panda dataframes are supported."
table_name = get_table_name_from_context(context)
parquet_path = PudlPaths().parquet_path(table_name)
parquet_path.parent.mkdir(parents=True, exist_ok=True)
res = self._get_table_resource(table_name)
res = Resource.from_id(table_name)

df = res.enforce_schema(df)
schema = res.to_pyarrow()
Expand All @@ -355,7 +345,7 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
"""Loads pudl table from parquet file."""
table_name = get_table_name_from_context(context)
parquet_path = PudlPaths().parquet_path(table_name)
res = self._get_table_resource(table_name)
res = Resource.from_id(table_name)
df = pq.read_table(source=parquet_path, schema=res.to_pyarrow()).to_pandas()
return res.enforce_schema(df)

Expand Down
48 changes: 2 additions & 46 deletions src/pudl/metadata/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
)

import pudl.logging_helpers
from pudl.analysis.pudl_models import get_model_table_schemas
from pudl.metadata.codes import CODE_METADATA
from pudl.metadata.constants import (
CONSTRAINT_DTYPES,
Expand Down Expand Up @@ -573,24 +572,6 @@ class Field(PudlMeta):
harvest: FieldHarvest = FieldHarvest()
encoder: Encoder | None = None

@classmethod
def from_pyarrow_field(cls, field: pa.Field) -> "Field":
"""Construct from pyarrow field."""
# Reverse map from frictionless -> pyarrow to pyarrow -> frictionless
type_map = {
value: key for value, key in FIELD_DTYPES_PYARROW.items() if key != "year"
} | {
pa.bool8(): "boolean",
pa.int32(): "integer",
pa.int64(): "integer",
pa.date32(): "date",
}
return cls(
name=field.name,
type=type_map[field.type],
description=field.metadata[b"description"].decode(),
)

@field_validator("constraints")
@classmethod
def _check_constraints(cls, value, info: ValidationInfo): # noqa: C901
Expand Down Expand Up @@ -812,15 +793,6 @@ class Schema(PudlMeta):
"missing_values", "primary_key", "foreign_keys", fn=_check_unique
)

@classmethod
def from_pyarrow_schema(cls, schema: pa.Schema) -> "Schema":
"""Construct from a pyarrow schema."""
return cls(
fields=[
Field.from_pyarrow_field(schema.field(name)) for name in schema.names
]
)

@field_validator("fields")
@classmethod
def _check_field_names_unique(cls, fields: list[Field]):
Expand Down Expand Up @@ -1314,6 +1286,7 @@ class Resource(PudlMeta):
"pudl",
"nrelatb",
"vcerare",
"sec10k",
]
| None
) = None
Expand Down Expand Up @@ -1342,6 +1315,7 @@ class Resource(PudlMeta):
"service_territories",
"nrelatb",
"vcerare",
"pudl_models",
]
| None
) = None
Expand Down Expand Up @@ -1477,18 +1451,6 @@ def from_id(cls, x: str) -> "Resource":
"""Construct from PUDL identifier (`resource.name`)."""
return cls(**cls.dict_from_id(x))

@classmethod
def from_pyarrow_schema(
cls, name: str, description: str, schema: pa.Schema
) -> "Resource":
"""Construct from a pyarrow schema."""
return cls(
name=name,
description=description,
schema=Schema.from_pyarrow_schema(schema),
create_database_schema=False,
)

def get_field(self, name: str) -> Field:
"""Return field with the given name if it's part of the Resources."""
names = [field.name for field in self.schema.fields]
Expand Down Expand Up @@ -2015,12 +1977,6 @@ def from_resource_ids(
if len(names) > i:
resources += [Resource.dict_from_id(x) for x in names[i:]]

resources += [
Resource.from_pyarrow_schema(name, description, schema).model_dump(
by_alias=True
)
for name, description, schema in get_model_table_schemas()
]
if excluded_etl_groups:
resources = [
resource
Expand Down
Loading
Loading