From 900f926217cdaee41a9621999d86544c25ca9cb9 Mon Sep 17 00:00:00 2001 From: zschira Date: Tue, 28 Jan 2025 18:24:17 -0500 Subject: [PATCH] Add sec10k metadata directly in PUDL --- src/pudl/analysis/pudl_models.py | 18 +----- src/pudl/io_managers.py | 14 +---- src/pudl/metadata/classes.py | 48 +-------------- src/pudl/metadata/fields.py | 101 +++++++++++++++++++++++++++++++ 4 files changed, 108 insertions(+), 73 deletions(-) diff --git a/src/pudl/analysis/pudl_models.py b/src/pudl/analysis/pudl_models.py index 117fcc5d38..388d61848c 100644 --- a/src/pudl/analysis/pudl_models.py +++ b/src/pudl/analysis/pudl_models.py @@ -3,12 +3,10 @@ import os import pandas as pd -import pyarrow as pa from dagster import AssetsDefinition, asset -from deltalake import DeltaTable -def get_model_tables() -> list[str]: +def _get_model_tables() -> list[str]: """Return all tables produced by PUDL models or empty list if env variable not set.""" pudl_models_tables = [] if os.getenv("USE_PUDL_MODELS"): @@ -35,21 +33,11 @@ def pudl_models_asset_factory(table_name: str) -> AssetsDefinition: group_name="pudl_models", ) def _asset() -> pd.DataFrame: - return DeltaTable(_get_table_uri(table_name)).to_pandas() + return pd.read_parquet(_get_table_uri(table_name)) return _asset def get_pudl_models_assets() -> list[AssetsDefinition]: """Generate a collection of assets for all PUDL model tables.""" - return [pudl_models_asset_factory(table) for table in get_model_tables()] - - -def get_model_table_schemas() -> list[str, str, pa.Schema]: - """Return pyarrow schemas for all PUDL models tables.""" - dts = [DeltaTable(_get_table_uri(table_name)) for table_name in get_model_tables()] - - return [ - (dt.metadata().name, dt.metadata().description, dt.schema().to_pyarrow()) - for dt in dts - ] + return [pudl_models_asset_factory(table) for table in _get_model_tables()] diff --git a/src/pudl/io_managers.py b/src/pudl/io_managers.py index 266ff747b1..3ffca3fe26 100644 --- a/src/pudl/io_managers.py +++ b/src/pudl/io_managers.py @@ -25,7 +25,6 @@ from upath import UPath import pudl -from pudl.analysis.pudl_models import get_model_tables from pudl.metadata.classes import PUDL_PACKAGE, Package, Resource from pudl.workspace.setup import PudlPaths @@ -322,22 +321,13 @@ def load_input(self, context: InputContext) -> pd.DataFrame: class PudlParquetIOManager(IOManager): """IOManager that writes pudl tables to pyarrow parquet files.""" - def _get_table_resource(self, table_name: str) -> Resource: - """Return resource class for table.""" - if table_name not in get_model_tables(): - res = Resource.from_id(table_name) - else: - # For tables coming from PUDL modelling repo just use already parsed resource metadata - [res] = [r for r in PUDL_PACKAGE.resources if r.name == table_name] - return res - def handle_output(self, context: OutputContext, df: Any) -> None: """Writes pudl dataframe to parquet file.""" assert isinstance(df, pd.DataFrame), "Only panda dataframes are supported." table_name = get_table_name_from_context(context) parquet_path = PudlPaths().parquet_path(table_name) parquet_path.parent.mkdir(parents=True, exist_ok=True) - res = self._get_table_resource(table_name) + res = Resource.from_id(table_name) df = res.enforce_schema(df) schema = res.to_pyarrow() @@ -355,7 +345,7 @@ def load_input(self, context: InputContext) -> pd.DataFrame: """Loads pudl table from parquet file.""" table_name = get_table_name_from_context(context) parquet_path = PudlPaths().parquet_path(table_name) - res = self._get_table_resource(table_name) + res = Resource.from_id(table_name) df = pq.read_table(source=parquet_path, schema=res.to_pyarrow()).to_pandas() return res.enforce_schema(df) diff --git a/src/pudl/metadata/classes.py b/src/pudl/metadata/classes.py index 62ec0b214c..2c06243a84 100644 --- a/src/pudl/metadata/classes.py +++ b/src/pudl/metadata/classes.py @@ -36,7 +36,6 @@ ) import pudl.logging_helpers -from pudl.analysis.pudl_models import get_model_table_schemas from pudl.metadata.codes import CODE_METADATA from pudl.metadata.constants import ( CONSTRAINT_DTYPES, @@ -573,24 +572,6 @@ class Field(PudlMeta): harvest: FieldHarvest = FieldHarvest() encoder: Encoder | None = None - @classmethod - def from_pyarrow_field(cls, field: pa.Field) -> "Field": - """Construct from pyarrow field.""" - # Reverse map from frictionless -> pyarrow to pyarrow -> frictionless - type_map = { - value: key for value, key in FIELD_DTYPES_PYARROW.items() if key != "year" - } | { - pa.bool8(): "boolean", - pa.int32(): "integer", - pa.int64(): "integer", - pa.date32(): "date", - } - return cls( - name=field.name, - type=type_map[field.type], - description=field.metadata[b"description"].decode(), - ) - @field_validator("constraints") @classmethod def _check_constraints(cls, value, info: ValidationInfo): # noqa: C901 @@ -812,15 +793,6 @@ class Schema(PudlMeta): "missing_values", "primary_key", "foreign_keys", fn=_check_unique ) - @classmethod - def from_pyarrow_schema(cls, schema: pa.Schema) -> "Schema": - """Construct from a pyarrow schema.""" - return cls( - fields=[ - Field.from_pyarrow_field(schema.field(name)) for name in schema.names - ] - ) - @field_validator("fields") @classmethod def _check_field_names_unique(cls, fields: list[Field]): @@ -1314,6 +1286,7 @@ class Resource(PudlMeta): "pudl", "nrelatb", "vcerare", + "sec10k", ] | None ) = None @@ -1342,6 +1315,7 @@ class Resource(PudlMeta): "service_territories", "nrelatb", "vcerare", + "pudl_models", ] | None ) = None @@ -1477,18 +1451,6 @@ def from_id(cls, x: str) -> "Resource": """Construct from PUDL identifier (`resource.name`).""" return cls(**cls.dict_from_id(x)) - @classmethod - def from_pyarrow_schema( - cls, name: str, description: str, schema: pa.Schema - ) -> "Resource": - """Construct from a pyarrow schema.""" - return cls( - name=name, - description=description, - schema=Schema.from_pyarrow_schema(schema), - create_database_schema=False, - ) - def get_field(self, name: str) -> Field: """Return field with the given name if it's part of the Resources.""" names = [field.name for field in self.schema.fields] @@ -2015,12 +1977,6 @@ def from_resource_ids( if len(names) > i: resources += [Resource.dict_from_id(x) for x in names[i:]] - resources += [ - Resource.from_pyarrow_schema(name, description, schema).model_dump( - by_alias=True - ) - for name, description, schema in get_model_table_schemas() - ] if excluded_etl_groups: resources = [ resource diff --git a/src/pudl/metadata/fields.py b/src/pudl/metadata/fields.py index 6f1554b40d..627d748cd9 100644 --- a/src/pudl/metadata/fields.py +++ b/src/pudl/metadata/fields.py @@ -331,6 +331,14 @@ "or charging rent to host cell antennas on transmission towers." ), }, + "block": { + "type": "string", + "description": "Title of block of data.", + }, + "block_count": { + "type": "integer", + "description": "Some blocks are repeated, `block_count` defines the index of the data block.", + }, "boiler_fuel_code_1": { "type": "string", "description": "The code representing the most predominant type of energy that fuels the boiler.", @@ -659,6 +667,10 @@ "type": "boolean", "description": "Indicates whether the generator uses carbon capture technology.", }, + "central_index_key": { + "type": "string", + "description": "Identifier of the company in SEC database.", + }, "chlorine_equipment_cost": { "description": ( "Actual installed cost for the existing chlorine discharge " @@ -738,6 +750,14 @@ "description": "Average monthly coincident peak (CP) demand (for requirements purchases, and any transactions involving demand charges). Monthly CP demand is the metered demand during the hour (60-minute integration) in which the supplier's system reaches its monthly peak. In megawatts.", "unit": "MW", }, + "company_name": { + "type": "string", + "description": "Name of company submitting SEC 10k filing.", + }, + "company_name_raw": { + "type": "string", + "description": "Uncleaned name of company.", + }, "compliance_year_nox": { "type": "integer", "description": "Year boiler was or is expected to be in compliance with federal, state and/or local regulations for nitrogen oxide emissions.", @@ -998,6 +1018,14 @@ "description": "Code identifying a dataset available within PUDL.", "constraints": {"enum": list(SOURCES)}, }, + "date_filed": { + "type": "datetime", + "description": "Date filing was submitted.", + }, + "date_of_name_change": { + "type": "datetime", + "description": "Date of last name change of the company.", + }, "datetime_utc": { "type": "datetime", "description": "Date and time converted to Coordinated Universal Time (UTC).", @@ -1481,6 +1509,10 @@ ), "unit": "MWh", }, + "exhibit_21_version": { + "type": "string", + "description": "Version of exhibit 21 submitted (if applicable).", + }, "expense_type": {"type": "string", "description": "The type of expense."}, "ferc1_generator_agg_id": { "type": "integer", @@ -1611,6 +1643,18 @@ "type": "number", "description": "Total number of flue gas desulfurization unit scrubber trains.", }, + "former_conformed_name": { + "type": "string", + "description": "Former name of the company.", + }, + "filer_count": { + "type": "integer", + "description": "Index company information as some filings contain information for multiple companies.", + }, + "files_10k": { + "type": "boolean", + "description": "Indicates whether the company files a 10-K.", + }, "firing_rate_using_coal_tons_per_hour": { "type": "number", "unit": "tons_per_hour", @@ -1687,6 +1731,10 @@ "type": "integer", "description": "Four-digit year that applies to a particular forecasted value.", }, + "form_type": { + "type": "string", + "description": "Specific version of SEC 10k filed.", + }, "fraction_owned": { "type": "number", "description": "Proportion of generator ownership attributable to this utility.", @@ -2329,6 +2377,7 @@ "description": "Original reported energy interchange between adjacent balancing authorities.", "unit": "MWh", }, + "irs_number": {"type": "string", "description": "ID of the company with the IRS."}, "is_epacems_state": { "type": "boolean", "description": ( @@ -2344,6 +2393,10 @@ "type": "string", "description": "The code of the plant's ISO or RTO. NA if not reported in that year.", }, + "key": { + "type": "string", + "description": "Key within block.", + }, "kwh_per_customer": {"type": "number", "description": "kWh per customer."}, "label": { "type": "string", @@ -2456,6 +2509,14 @@ ), "unit": "MW", }, + "location": { + "type": "string", + "description": "Location of subsidiary company.", + }, + "location_of_inc": { + "type": "string", + "description": "Cleaned location of incorporation of the company.", + }, "longitude": { "type": "number", "description": "Longitude of the plant's location, in degrees.", @@ -3289,6 +3350,10 @@ "description": "Whether each generator record is for one owner or represents a total of all ownerships.", "constraints": {"enum": ["owned", "total"]}, }, + "ownership_percentage": { + "type": "string", + "description": "Percentage of subsidiary company owned by parent.", + }, "ownership_code": { "type": "string", "description": "Identifies the ownership for each generator.", @@ -3297,6 +3362,10 @@ "type": "boolean", "description": "Whether a plant part record has a duplicate record with different ownership status.", }, + "parent_company_cik": { + "type": "string", + "description": "CIK of the company's parent company.", + }, "particulate_control_id_eia": { "type": "string", "description": "Particulate matter control identification number. This ID is not a unique identifier.", @@ -3912,6 +3981,14 @@ "description": "Estimated electricity demand scaled by the total sales within a state.", "unit": "MWh", }, + "sec_company_id": { + "type": "string", + "description": "Assigned identifier for the company.", + }, + "sec10k_filename": { + "type": "string", + "description": "Name of filing as provided by SEC data portal.", + }, "secondary_transportation_mode_code": { "type": "string", "description": "Transportation mode for the second longest distance transported.", @@ -4202,6 +4279,10 @@ "constraints": {"enum": RELIABILITY_STANDARDS}, # TODO: Might want to make this column more specific to outages: ex: outage calculation standard. }, + "standard_industrial_classification": { + "type": "string", + "description": "The company's type of business.", + }, "standard_nox_rate": { "type": "number", "description": "Numeric value for the unit of measurement specified for nitrogen oxide.", @@ -4257,6 +4338,10 @@ "pattern": r"^\d{2}$", }, }, + "state_of_incorporation": { + "type": "string", + "description": "Two letter state code where company is incorporated.", + }, "steam_load_1000_lbs": { "type": "number", "description": "Total steam pressure produced by a unit during the reported hour.", @@ -4299,6 +4384,10 @@ # TODO: Disambiguate as this means different things in different tables. "description": "Physical street address.", }, + "street_address_2": { + "type": "string", + "description": "Secondary street address.", + }, "subcritical_tech": { "type": "boolean", "description": "Indicates whether the generator uses subcritical technology", @@ -4328,6 +4417,10 @@ "type": "integer", "description": "Sub-plant ID links EPA CEMS emissions units to EIA units.", }, + "subsidiary": { + "type": "string", + "description": "Name of subsidiary company.", + }, "sulfur_content_pct": { "type": "number", "description": "Sulfur content percentage by weight to the nearest 0.01 percent.", @@ -4811,6 +4904,10 @@ "type": "date", "description": "The record in the changelog is valid until this date. The record is valid from the report_date up until but not including the valid_until_date.", }, + "value": { + "type": "string", + "description": "String value of data point.", + }, "variable_peak_pricing": { "type": "boolean", "description": ( @@ -4944,6 +5041,10 @@ "type": "integer", "description": "Year the data was reported in, used for partitioning EPA CEMS.", }, + "year_quarter": { + "type": "string", + "description": "Year quarter filing applies to.", + }, "zip_code": { "type": "string", "description": "Five digit US Zip Code.",