Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename some straggler assets #3294

Merged
merged 11 commits into from
Jan 31, 2024
6 changes: 3 additions & 3 deletions devtools/debug-ferc1-etl.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@
"source": [
"from dagster import build_op_context\n",
"\n",
"from pudl.extract.ferc1 import raw_xbrl_metadata_json\n",
"from pudl.transform.ferc1 import clean_xbrl_metadata_json\n",
"from pudl.extract.ferc1 import raw_ferc1_xbrl__metadata_json\n",
"from pudl.transform.ferc1 import _core_ferc1_xbrl__metadata_json\n",
"\n",
"context = build_op_context()\n",
"xbrl_metadata_json_dict = clean_xbrl_metadata_json(raw_xbrl_metadata_json(context))"
"xbrl_metadata_json_dict = _core_ferc1_xbrl__metadata_json(raw_ferc1_xbrl__metadata_json(context))"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/naming_conventions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ Naming convention: ``core_{source}__{asset_type}_{asset_name}``
typically contain measurements of processes like net generation or co2 emissions.
Examples:

* ``core_ferc714__hourly_demand_pa``,
* ``core_ferc714__hourly_demand_by_planning_area``,
* ``core_ferc1__yearly_plant_in_service``.

Output layer
Expand Down
5 changes: 3 additions & 2 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ Data Coverage
The newly accessible tables include:

* :ref:`core_ferc714__respondent_id` (linking FERC-714 respondents to EIA utilities)
* :ref:`core_ferc714__hourly_demand_pa` (hourly electricity demand by planning area)
* :ref:`core_ferc714__hourly_demand_by_planning_area` (hourly electricity demand by
planning area)
* :ref:`out_ferc714__respondents_with_fips` (annual respondents with county FIPS IDs)
* :ref:`out_ferc714__summarized_demand` (annual demand for FERC-714 respondents)

Expand Down Expand Up @@ -366,7 +367,7 @@ Analysis
(:ref:`out_eia861__compiled_geometry_balancing_authorities` and
:ref:`out_eia861__compiled_geometry_utilities`), and the estimated total hourly
electricity demand for each US state in
:ref:`out_ferc714__hourly_predicted_state_demand`. See :issue:`1973`
:ref:`out_ferc714__hourly_estimated_state_demand`. See :issue:`1973`
and :pr:`2550`.

Deprecations
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Rename a couple of ferc714 tables and add out_eia__yearly_generators_by_ownership to database

Revision ID: d4bef486cb7a
Revises: 997d2c7bc7de
Create Date: 2024-01-24 10:51:57.643199

"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import sqlite

# revision identifiers, used by Alembic.
revision = 'd4bef486cb7a'
down_revision = '997d2c7bc7de'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('out_ferc714__hourly_estimated_state_demand',
sa.Column('state_id_fips', sa.Text(), nullable=False, comment='Two digit state FIPS code.'),
sa.Column('utc_datetime', sqlite.DATETIME(), nullable=False),
sa.Column('demand_mwh', sa.Float(), nullable=True),
sa.Column('scaled_demand_mwh', sa.Float(), nullable=True, comment='Estimated electricity demand scaled by the total sales within a state.'),
sa.PrimaryKeyConstraint('state_id_fips', 'utc_datetime', name=op.f('pk_out_ferc714__hourly_estimated_state_demand'))
)
op.create_table('core_ferc714__hourly_demand_by_planning_area',
sa.Column('respondent_id_ferc714', sa.Integer(), nullable=False, comment='FERC Form 714 respondent ID. Note that this ID does not correspond to FERC respondent IDs from other forms.'),
sa.Column('report_date', sa.Date(), nullable=False, comment='Date reported.'),
sa.Column('utc_datetime', sqlite.DATETIME(), nullable=False),
sa.Column('timezone', sa.Enum('America/New_York', 'America/Chicago', 'America/Denver', 'America/Los_Angeles', 'America/Anchorage', 'Pacific/Honolulu'), nullable=True, comment='IANA timezone name'),
sa.Column('demand_mwh', sa.Float(), nullable=True),
sa.ForeignKeyConstraint(['respondent_id_ferc714'], ['core_ferc714__respondent_id.respondent_id_ferc714'], name=op.f('fk_core_ferc714__hourly_demand_by_planning_area_respondent_id_ferc714_core_ferc714__respondent_id')),
sa.PrimaryKeyConstraint('respondent_id_ferc714', 'utc_datetime', name=op.f('pk_core_ferc714__hourly_demand_by_planning_area'))
)
op.drop_table('core_ferc714__hourly_demand_pa')
op.drop_table('out_ferc714__hourly_predicted_state_demand')
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('out_ferc714__hourly_predicted_state_demand',
sa.Column('state_id_fips', sa.TEXT(), nullable=False),
sa.Column('utc_datetime', sa.DATETIME(), nullable=False),
sa.Column('demand_mwh', sa.FLOAT(), nullable=True),
sa.Column('scaled_demand_mwh', sa.FLOAT(), nullable=True),
sa.PrimaryKeyConstraint('state_id_fips', 'utc_datetime', name='pk_out_ferc714__hourly_predicted_state_demand')
)
op.create_table('core_ferc714__hourly_demand_pa',
sa.Column('respondent_id_ferc714', sa.INTEGER(), nullable=False),
sa.Column('report_date', sa.DATE(), nullable=False),
sa.Column('utc_datetime', sa.DATETIME(), nullable=False),
sa.Column('timezone', sa.VARCHAR(length=19), nullable=True),
sa.Column('demand_mwh', sa.FLOAT(), nullable=True),
sa.ForeignKeyConstraint(['respondent_id_ferc714'], ['core_ferc714__respondent_id.respondent_id_ferc714'], name='fk_core_ferc714__hourly_demand_pa_respondent_id_ferc714_core_ferc714__respondent_id'),
sa.PrimaryKeyConstraint('respondent_id_ferc714', 'utc_datetime', name='pk_core_ferc714__hourly_demand_pa')
)
op.drop_table('core_ferc714__hourly_demand_by_planning_area')
op.drop_table('out_ferc714__hourly_estimated_state_demand')
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions src/pudl/analysis/plant_parts_eia.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@


@asset(
io_manager_key="pudl_sqlite_io_manager",
compute_kind="Python",
)
def out_eia__yearly_generators_by_ownership(
Expand Down
30 changes: 16 additions & 14 deletions src/pudl/analysis/state_demand.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,12 @@ def load_ventyx_hourly_state_demand(path: str) -> pd.DataFrame:
},
)
def load_hourly_demand_matrix_ferc714(
core_ferc714__hourly_demand_pa: pd.DataFrame,
core_ferc714__hourly_demand_by_planning_area: pd.DataFrame,
bendnorman marked this conversation as resolved.
Show resolved Hide resolved
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Read and format FERC 714 hourly demand into matrix form.

Args:
core_ferc714__hourly_demand_pa: FERC 714 hourly demand time series by planning area.
core_ferc714__hourly_demand_by_planning_area: FERC 714 hourly demand time series by planning area.

Returns:
Hourly demand as a matrix with a `datetime` row index
Expand All @@ -280,22 +280,24 @@ def load_hourly_demand_matrix_ferc714(
of each `respondent_id_ferc714` and reporting `year` (int).
"""
# Convert UTC to local time (ignoring daylight savings)
core_ferc714__hourly_demand_pa["utc_offset"] = core_ferc714__hourly_demand_pa[
"timezone"
].map(STANDARD_UTC_OFFSETS)
core_ferc714__hourly_demand_pa["datetime"] = utc_to_local(
core_ferc714__hourly_demand_pa["utc_datetime"],
core_ferc714__hourly_demand_pa["utc_offset"],
core_ferc714__hourly_demand_by_planning_area[
"utc_offset"
] = core_ferc714__hourly_demand_by_planning_area["timezone"].map(
STANDARD_UTC_OFFSETS
)
core_ferc714__hourly_demand_by_planning_area["datetime"] = utc_to_local(
core_ferc714__hourly_demand_by_planning_area["utc_datetime"],
core_ferc714__hourly_demand_by_planning_area["utc_offset"],
)
# Pivot to demand matrix: timestamps x respondents
matrix = core_ferc714__hourly_demand_pa.pivot(
matrix = core_ferc714__hourly_demand_by_planning_area.pivot(
index="datetime", columns="respondent_id_ferc714", values="demand_mwh"
)
# List timezone by year for each respondent
core_ferc714__hourly_demand_pa["year"] = core_ferc714__hourly_demand_pa[
"report_date"
].dt.year
utc_offset = core_ferc714__hourly_demand_pa.groupby(
core_ferc714__hourly_demand_by_planning_area[
"year"
] = core_ferc714__hourly_demand_by_planning_area["report_date"].dt.year
utc_offset = core_ferc714__hourly_demand_by_planning_area.groupby(
["respondent_id_ferc714", "year"], as_index=False
)["utc_offset"].first()
return matrix, utc_offset
Expand Down Expand Up @@ -579,7 +581,7 @@ def total_state_sales_eia861(
),
},
)
def out_ferc714__hourly_predicted_state_demand(
def out_ferc714__hourly_estimated_state_demand(
context,
_out_ferc714__hourly_imputed_demand: pd.DataFrame,
core_censusdp1__entity_county: pd.DataFrame,
Expand Down
2 changes: 1 addition & 1 deletion src/pudl/convert/censusdp1tract_to_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
},
required_resource_keys={"datastore"},
)
def censusdp1tract_to_sqlite(context):
def raw_census__dp1(context):
bendnorman marked this conversation as resolved.
Show resolved Hide resolved
"""Use GDAL's ogr2ogr utility to convert the Census DP1 GeoDB to an SQLite DB.

The Census DP1 GeoDB is read from the datastore, where it is stored as a
Expand Down
2 changes: 1 addition & 1 deletion src/pudl/etl/epacems_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def core_epacems__hourly_emissions(
),
}
)
def emissions_unit_ids_epacems(
def _core_epacems__emissions_unit_ids(
core_epacems__hourly_emissions: dd.DataFrame,
) -> pd.DataFrame:
"""Make unique annual plant_id_eia and emissions_unit_id_epa.
Expand Down
8 changes: 4 additions & 4 deletions src/pudl/etl/glue_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def correct_epa_eia_plant_id_mapping(df: pd.DataFrame) -> pd.DataFrame:
def core_epa__assn_eia_epacamd_subplant_ids(
_core_epa__assn_eia_epacamd_unique: pd.DataFrame,
core_eia860__scd_generators: pd.DataFrame,
emissions_unit_ids_epacems: pd.DataFrame,
_core_epacems__emissions_unit_ids: pd.DataFrame,
core_eia860__assn_boiler_generator: pd.DataFrame,
) -> pd.DataFrame:
"""Groups units and generators into unique subplant groups.
Expand Down Expand Up @@ -301,7 +301,7 @@ def core_epa__assn_eia_epacamd_subplant_ids(
augement_crosswalk_with_generators_eia860(
_core_epa__assn_eia_epacamd_unique, core_eia860__scd_generators
)
.pipe(augement_crosswalk_with_epacamd_ids, emissions_unit_ids_epacems)
.pipe(augement_crosswalk_with_epacamd_ids, _core_epacems__emissions_unit_ids)
.pipe(augement_crosswalk_with_bga_eia860, core_eia860__assn_boiler_generator)
)
# use graph analysis to identify subplants
Expand Down Expand Up @@ -374,13 +374,13 @@ def augement_crosswalk_with_generators_eia860(


def augement_crosswalk_with_epacamd_ids(
crosswalk_clean: pd.DataFrame, emissions_unit_ids_epacems: pd.DataFrame
crosswalk_clean: pd.DataFrame, _core_epacems__emissions_unit_ids: pd.DataFrame
) -> pd.DataFrame:
"""Merge all EPA CAMD IDs into the crosswalk."""
return crosswalk_clean.assign(
emissions_unit_id_epa=lambda x: x.emissions_unit_id_epa.fillna(x.generator_id)
).merge(
emissions_unit_ids_epacems[
_core_epacems__emissions_unit_ids[
["plant_id_eia", "emissions_unit_id_epa"]
].drop_duplicates(),
how="outer",
Expand Down
17 changes: 9 additions & 8 deletions src/pudl/extract/eia860.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ def get_dtypes(page, **partition):
)


eia860_raw_dfs = excel.raw_df_factory(Extractor, name="eia860")
raw_eia860__all_dfs = excel.raw_df_factory(Extractor, name="eia860")
cmgosnell marked this conversation as resolved.
Show resolved Hide resolved


# TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return
@multi_asset(
outs={table_name: AssetOut() for table_name in sorted(raw_table_names)},
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_eia860(context, eia860_raw_dfs):
def extract_eia860(context, raw_eia860__all_dfs):
"""Extract raw EIA data from excel sheets into dataframes.

Args:
Expand All @@ -111,17 +111,18 @@ def extract_eia860(context, eia860_raw_dfs):
eia860m_raw_dfs = pudl.extract.eia860m.Extractor(ds).extract(
year_month=eia860m_date
)
eia860_raw_dfs = pudl.extract.eia860m.append_eia860m(
eia860_raw_dfs=eia860_raw_dfs, eia860m_raw_dfs=eia860m_raw_dfs
raw_eia860__all_dfs = pudl.extract.eia860m.append_eia860m(
eia860_raw_dfs=raw_eia860__all_dfs, eia860m_raw_dfs=eia860m_raw_dfs
)

# create descriptive table_names
eia860_raw_dfs = {
"raw_eia860__" + table_name: df for table_name, df in eia860_raw_dfs.items()
raw_eia860__all_dfs = {
"raw_eia860__" + table_name: df
for table_name, df in raw_eia860__all_dfs.items()
}
eia860_raw_dfs = dict(sorted(eia860_raw_dfs.items()))
raw_eia860__all_dfs = dict(sorted(raw_eia860__all_dfs.items()))

return (
Output(output_name=table_name, value=df)
for table_name, df in eia860_raw_dfs.items()
for table_name, df in raw_eia860__all_dfs.items()
)
12 changes: 6 additions & 6 deletions src/pudl/extract/eia861.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def get_dtypes(page, **partition):
}


eia861_raw_dfs = excel.raw_df_factory(Extractor, name="eia861")
raw_eia861__all_dfs = excel.raw_df_factory(Extractor, name="eia861")


@multi_asset(
Expand Down Expand Up @@ -102,7 +102,7 @@ def get_dtypes(page, **partition):
},
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_eia861(context, eia861_raw_dfs):
def extract_eia861(context, raw_eia861__all_dfs):
"""Extract raw EIA-861 data from Excel sheets into dataframes.

Args:
Expand All @@ -111,13 +111,13 @@ def extract_eia861(context, eia861_raw_dfs):
Returns:
A tuple of extracted EIA-861 dataframes.
"""
eia861_raw_dfs = {
raw_eia861__all_dfs = {
"raw_eia861__" + table_name.replace("_eia861", ""): df
for table_name, df in eia861_raw_dfs.items()
for table_name, df in raw_eia861__all_dfs.items()
}
eia861_raw_dfs = dict(sorted(eia861_raw_dfs.items()))
raw_eia861__all_dfs = dict(sorted(raw_eia861__all_dfs.items()))

return (
Output(output_name=table_name, value=df)
for table_name, df in eia861_raw_dfs.items()
for table_name, df in raw_eia861__all_dfs.items()
)
13 changes: 7 additions & 6 deletions src/pudl/extract/eia923.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ def get_dtypes(page, **partition):
)


eia923_raw_dfs = excel.raw_df_factory(Extractor, name="eia923")
raw_eia923__all_dfs = excel.raw_df_factory(Extractor, name="eia923")


# TODO (bendnorman): Figure out type hint for context keyword and mutli_asset return
@multi_asset(
outs={table_name: AssetOut() for table_name in sorted(eia_raw_table_names)},
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_eia923(context, eia923_raw_dfs):
def extract_eia923(context, raw_eia923__all_dfs):
"""Extract raw EIA-923 data from excel sheets into dataframes.

Args:
Expand All @@ -122,13 +122,14 @@ def extract_eia923(context, eia923_raw_dfs):
A tuple of extracted EIA dataframes.
"""
# create descriptive table_names
eia923_raw_dfs = {
"raw_eia923__" + table_name: df for table_name, df in eia923_raw_dfs.items()
raw_eia923__all_dfs = {
"raw_eia923__" + table_name: df
for table_name, df in raw_eia923__all_dfs.items()
}

eia923_raw_dfs = dict(sorted(eia923_raw_dfs.items()))
raw_eia923__all_dfs = dict(sorted(raw_eia923__all_dfs.items()))

return (
Output(output_name=table_name, value=df)
for table_name, df in eia923_raw_dfs.items()
for table_name, df in raw_eia923__all_dfs.items()
)
2 changes: 1 addition & 1 deletion src/pudl/extract/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,4 @@ def raw_dfs() -> dict[str, pd.DataFrame]:
# page in the spreadsheet based dataset using DynamicOut.collect()
return concat_pages(dfs.collect())

return graph_asset(name=f"{name}_raw_dfs")(raw_dfs)
return graph_asset(name=f"raw_{name}__all_dfs")(raw_dfs)
4 changes: 3 additions & 1 deletion src/pudl/extract/ferc1.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,9 @@ def create_raw_ferc1_assets() -> list[SourceAsset]:


@asset
def raw_xbrl_metadata_json(context) -> dict[str, dict[str, list[dict[str, Any]]]]:
def raw_ferc1_xbrl__metadata_json(
bendnorman marked this conversation as resolved.
Show resolved Hide resolved
context,
) -> dict[str, dict[str, list[dict[str, Any]]]]:
"""Extract the FERC 1 XBRL Taxonomy metadata we've stored as JSON.

Returns:
Expand Down
13 changes: 7 additions & 6 deletions src/pudl/extract/phmsagas.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ def process_renamed(self, newdata: pd.DataFrame, page: str, **partition):
"raw_phmsagas__yearly_miles_of_pipe_by_decade_installed",
)

phmsagas_raw_dfs = excel.raw_df_factory(Extractor, name="phmsagas")
raw_phmsagas__all_dfs = excel.raw_df_factory(Extractor, name="phmsagas")


# # TODO (bendnorman): Figure out type hint for context keyword and multi_asset return
@multi_asset(
outs={table_name: AssetOut() for table_name in sorted(raw_table_names)},
required_resource_keys={"datastore", "dataset_settings"},
)
def extract_phmsagas(context, phmsagas_raw_dfs):
def extract_phmsagas(context, raw_phmsagas__all_dfs):
"""Extract raw PHMSA gas data from excel sheets into dataframes.

Args:
Expand All @@ -89,12 +89,13 @@ def extract_phmsagas(context, phmsagas_raw_dfs):
A tuple of extracted PHMSA gas dataframes.
"""
# create descriptive table_names
phmsagas_raw_dfs = {
"raw_phmsagas__" + table_name: df for table_name, df in phmsagas_raw_dfs.items()
raw_phmsagas__all_dfs = {
"raw_phmsagas__" + table_name: df
for table_name, df in raw_phmsagas__all_dfs.items()
}
phmsagas_raw_dfs = dict(sorted(phmsagas_raw_dfs.items()))
raw_phmsagas__all_dfs = dict(sorted(raw_phmsagas__all_dfs.items()))

return (
Output(output_name=table_name, value=df)
for table_name, df in phmsagas_raw_dfs.items()
for table_name, df in raw_phmsagas__all_dfs.items()
)
Loading