diff --git a/exploration/04_add_return_periods_example.py b/exploration/04_add_return_periods_example.py new file mode 100644 index 0000000..9426366 --- /dev/null +++ b/exploration/04_add_return_periods_example.py @@ -0,0 +1,48 @@ +# --- +# jupyter: +# jupytext: +# cell_metadata_filter: -all +# custom_cell_magics: kql +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# jupytext_version: 1.11.2 +# kernelspec: +# display_name: hdx-floodscan +# language: python +# name: hdx-floodscan +# --- + +# %% +# %matplotlib inline +# %load_ext autoreload +# %autoreload 2 + +import numpy as np + +# %% +from src.utils import pg +from src.utils import return_periods as rp + +# %% +df_current = pg.fs_last_90_days(mode="prod", admin_level=2, band="SFED") +df_yr_max = pg.fs_year_max(mode="prod", admin_level=2, band="SFED") + +# %% +df_w_rps = rp.fs_add_rp( + df=df_current, df_maxima=df_yr_max, by=["iso3", "pcode"] +) + +# %% [markdown] +# we can have a metadata tab + table that explains why the admin below are not +# included + +# %% +rp.extract_nan_strata(df_current, by=["iso3", "pcode"]) + +# %% + +max_rp = df_w_rps["RP"].max() +max_rp = df_w_rps[df_w_rps["RP"] != np.inf]["RP"].max() +print(max_rp) diff --git a/requirements.txt b/requirements.txt index eb0c5b7..f9fde1b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,5 @@ netCDF4==1.7.2 scipy==1.14.1 lmoments3==1.0.7 pyarrow==17.0.0 +SQLAlchemy==2.0.33 +psycopg2==2.9.10 diff --git a/src/utils/pg.py b/src/utils/pg.py new file mode 100644 index 0000000..2601860 --- /dev/null +++ b/src/utils/pg.py @@ -0,0 +1,67 @@ +import os + +import pandas as pd +from dotenv import load_dotenv +from sqlalchemy import create_engine + +load_dotenv() + +AZURE_DB_PW_PROD = os.getenv("AZURE_DB_PW_PROD") +AZURE_DB_PW_DEV = os.getenv("AZURE_DB_PW_DEV") + + +def get_engine(mode): + if mode == "prod": + pw = AZURE_DB_PW_PROD + else: + pw = AZURE_DB_PW_DEV + + url = f"postgresql+psycopg2://chdadmin:{pw}@chd-rasterstats-{mode}.postgres.database.azure.com/postgres" # noqa: E501 E231 + return create_engine(url) + + +def fs_year_max(mode, admin_level, band="SFED"): + engine = get_engine(mode) + + query_yr_max = f""" + SELECT iso3, pcode, DATE_TRUNC('year', valid_date) AS year_date, MAX(mean) AS value + FROM floodscan + WHERE adm_level = {admin_level} + AND band = '{band}' + AND valid_date <= '2023-12-31' + GROUP BY iso3, pcode, year_date + """ + return pd.read_sql(sql=query_yr_max, con=engine) + + +def fs_last_90_days(mode, admin_level, band="SFED"): + engine = get_engine(mode) + + query_last_90_days = f""" + SELECT iso3, pcode, valid_date, mean AS value + FROM floodscan + WHERE adm_level = {admin_level} + AND band = '{band}' + AND valid_date >= NOW() - INTERVAL '90 days' + """ + return pd.read_sql(sql=query_last_90_days, con=engine) + + +# this one is experimental - may mess around and see how it works on prod +def create_yr_max_view(mode, admin_level, band): + engine = get_engine(mode) + with engine.connect() as conn: + query = f""" + CREATE OR REPLACE VIEW floodscan_yearly_max AS + SELECT iso3, pcode, year_date, MAX(mean) AS value + FROM ( + SELECT iso3, pcode, adm_level, valid_date, band, mean, + DATE_TRUNC('year', valid_date) AS year_date + FROM floodscan + WHERE adm_level = {admin_level} + AND band = '{band}' + AND valid_date <= '2023-12-31' + ) AS filtered_floodscan + GROUP BY iso3, pcode, year_date + """ # noqa E231 E202 + conn.execute(query) diff --git a/src/utils/return_periods.py b/src/utils/return_periods.py index 9d65bdb..8f10c8b 100644 --- a/src/utils/return_periods.py +++ b/src/utils/return_periods.py @@ -1,8 +1,111 @@ import numpy as np import pandas as pd from lmoments3 import distr, lmom_ratios +from scipy.interpolate import interp1d from scipy.stats import norm, pearson3, skew +# Empirical RP Functions + + +def fs_add_rp(df, df_maxima, by): + df_nans, df_maxima_nans = [ + extract_nan_strata(x, by=by) for x in [df, df_maxima] + ] + + df_filt = df[ + ~df[by].apply(tuple, axis=1).isin(df_nans.apply(tuple, axis=1)) + ].copy() + df_maxima_filt = df_maxima[ + ~df_maxima[by] + .apply(tuple, axis=1) + .isin(df_maxima_nans.apply(tuple, axis=1)) + ].copy() + + df_rps = ( + df_maxima_filt.groupby(by, group_keys=True) + .apply(empirical_rp, include_groups=False) + .reset_index() + .drop(columns=["level_2"]) + ) + + interp_funcs = interpolation_functions_by( + df=df_rps, rp="RP", value="value", by=by + ) + + df_filt.loc[:, "RP"] = df_filt.apply( + lambda row: apply_interp(row, interp_funcs, by=by), + axis=1, + ).astype(float) + + df_filt.loc[:, "RP"] = clean_rps(df_filt["RP"], decimals=3, upper=10) + + return df_filt + + +def extract_nan_strata(df, by=["iso3", "pcode"]): + return df[df["value"].isna()][by].drop_duplicates() + + +def interpolation_functions_by(df, rp, value, by=["iso3", "pcode"]): + """ + Generate interpolation functions for each group in the DataFrame. + + Parameters: + df (pandas.DataFrame): The input DataFrame containing the data. + rp (str): The column name in the DataFrame representing the + return period values. + value (str): The column name in the DataFrame representing the values + to interpolate. + by (list of str, optional): The list of column names to group by. + Default is ["iso3", "pcode"]. + + Returns: + dict: A dictionary where keys are tuples of group values and values are + interpolation functions. + """ + interp_funcs = ( + df.groupby(by) + .apply( + lambda group: interp1d( + group[value], + group[rp], + bounds_error=False, + fill_value=(1, np.nan), + ), + include_groups=False, + ) + .to_dict() + ) + + return interp_funcs + + +def apply_interp(row, interp_dict, by=["iso3", "pcode"]): + try: + key = tuple(row[col] for col in by) + interp_func = interp_dict[key] + return interp_func(row["value"]) + except KeyError: + return np.nan + + +def clean_rps(x, decimals=3, upper=10): + x_round = x.round(3) + x_round[x_round > upper] = np.inf + return x_round + + +def empirical_rp(group): + if group["value"].isna().any(): + raise ValueError("The value column contains NaN values") + + group["RANK"] = group["value"].rank(ascending=False) + group["RP"] = (len(group) + 1) / group["RANK"] + return group + + +# LP3 Functions + def lp3_params(x, est_method="lmoments"): x = np.asarray(x)