Skip to content

Commit

Permalink
Add sec10k metadata directly in PUDL
Browse files Browse the repository at this point in the history
  • Loading branch information
zschira committed Jan 28, 2025
1 parent c0eb11d commit 900f926
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 73 deletions.
18 changes: 3 additions & 15 deletions src/pudl/analysis/pudl_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import os

import pandas as pd
import pyarrow as pa
from dagster import AssetsDefinition, asset
from deltalake import DeltaTable


def get_model_tables() -> list[str]:
def _get_model_tables() -> list[str]:
"""Return all tables produced by PUDL models or empty list if env variable not set."""
pudl_models_tables = []
if os.getenv("USE_PUDL_MODELS"):
Expand All @@ -35,21 +33,11 @@ def pudl_models_asset_factory(table_name: str) -> AssetsDefinition:
group_name="pudl_models",
)
def _asset() -> pd.DataFrame:
return DeltaTable(_get_table_uri(table_name)).to_pandas()
return pd.read_parquet(_get_table_uri(table_name))

return _asset


def get_pudl_models_assets() -> list[AssetsDefinition]:
"""Generate a collection of assets for all PUDL model tables."""
return [pudl_models_asset_factory(table) for table in get_model_tables()]


def get_model_table_schemas() -> list[str, str, pa.Schema]:
"""Return pyarrow schemas for all PUDL models tables."""
dts = [DeltaTable(_get_table_uri(table_name)) for table_name in get_model_tables()]

return [
(dt.metadata().name, dt.metadata().description, dt.schema().to_pyarrow())
for dt in dts
]
return [pudl_models_asset_factory(table) for table in _get_model_tables()]
14 changes: 2 additions & 12 deletions src/pudl/io_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from upath import UPath

import pudl
from pudl.analysis.pudl_models import get_model_tables
from pudl.metadata.classes import PUDL_PACKAGE, Package, Resource
from pudl.workspace.setup import PudlPaths

Expand Down Expand Up @@ -322,22 +321,13 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
class PudlParquetIOManager(IOManager):
"""IOManager that writes pudl tables to pyarrow parquet files."""

def _get_table_resource(self, table_name: str) -> Resource:
"""Return resource class for table."""
if table_name not in get_model_tables():
res = Resource.from_id(table_name)
else:
# For tables coming from PUDL modelling repo just use already parsed resource metadata
[res] = [r for r in PUDL_PACKAGE.resources if r.name == table_name]
return res

def handle_output(self, context: OutputContext, df: Any) -> None:
"""Writes pudl dataframe to parquet file."""
assert isinstance(df, pd.DataFrame), "Only panda dataframes are supported."
table_name = get_table_name_from_context(context)
parquet_path = PudlPaths().parquet_path(table_name)
parquet_path.parent.mkdir(parents=True, exist_ok=True)
res = self._get_table_resource(table_name)
res = Resource.from_id(table_name)

df = res.enforce_schema(df)
schema = res.to_pyarrow()
Expand All @@ -355,7 +345,7 @@ def load_input(self, context: InputContext) -> pd.DataFrame:
"""Loads pudl table from parquet file."""
table_name = get_table_name_from_context(context)
parquet_path = PudlPaths().parquet_path(table_name)
res = self._get_table_resource(table_name)
res = Resource.from_id(table_name)
df = pq.read_table(source=parquet_path, schema=res.to_pyarrow()).to_pandas()
return res.enforce_schema(df)

Expand Down
48 changes: 2 additions & 46 deletions src/pudl/metadata/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
)

import pudl.logging_helpers
from pudl.analysis.pudl_models import get_model_table_schemas
from pudl.metadata.codes import CODE_METADATA
from pudl.metadata.constants import (
CONSTRAINT_DTYPES,
Expand Down Expand Up @@ -573,24 +572,6 @@ class Field(PudlMeta):
harvest: FieldHarvest = FieldHarvest()
encoder: Encoder | None = None

@classmethod
def from_pyarrow_field(cls, field: pa.Field) -> "Field":
"""Construct from pyarrow field."""
# Reverse map from frictionless -> pyarrow to pyarrow -> frictionless
type_map = {
value: key for value, key in FIELD_DTYPES_PYARROW.items() if key != "year"
} | {
pa.bool8(): "boolean",
pa.int32(): "integer",
pa.int64(): "integer",
pa.date32(): "date",
}
return cls(
name=field.name,
type=type_map[field.type],
description=field.metadata[b"description"].decode(),
)

@field_validator("constraints")
@classmethod
def _check_constraints(cls, value, info: ValidationInfo): # noqa: C901
Expand Down Expand Up @@ -812,15 +793,6 @@ class Schema(PudlMeta):
"missing_values", "primary_key", "foreign_keys", fn=_check_unique
)

@classmethod
def from_pyarrow_schema(cls, schema: pa.Schema) -> "Schema":
"""Construct from a pyarrow schema."""
return cls(
fields=[
Field.from_pyarrow_field(schema.field(name)) for name in schema.names
]
)

@field_validator("fields")
@classmethod
def _check_field_names_unique(cls, fields: list[Field]):
Expand Down Expand Up @@ -1314,6 +1286,7 @@ class Resource(PudlMeta):
"pudl",
"nrelatb",
"vcerare",
"sec10k",
]
| None
) = None
Expand Down Expand Up @@ -1342,6 +1315,7 @@ class Resource(PudlMeta):
"service_territories",
"nrelatb",
"vcerare",
"pudl_models",
]
| None
) = None
Expand Down Expand Up @@ -1477,18 +1451,6 @@ def from_id(cls, x: str) -> "Resource":
"""Construct from PUDL identifier (`resource.name`)."""
return cls(**cls.dict_from_id(x))

@classmethod
def from_pyarrow_schema(
cls, name: str, description: str, schema: pa.Schema
) -> "Resource":
"""Construct from a pyarrow schema."""
return cls(
name=name,
description=description,
schema=Schema.from_pyarrow_schema(schema),
create_database_schema=False,
)

def get_field(self, name: str) -> Field:
"""Return field with the given name if it's part of the Resources."""
names = [field.name for field in self.schema.fields]
Expand Down Expand Up @@ -2015,12 +1977,6 @@ def from_resource_ids(
if len(names) > i:
resources += [Resource.dict_from_id(x) for x in names[i:]]

resources += [
Resource.from_pyarrow_schema(name, description, schema).model_dump(
by_alias=True
)
for name, description, schema in get_model_table_schemas()
]
if excluded_etl_groups:
resources = [
resource
Expand Down
Loading

0 comments on commit 900f926

Please sign in to comment.