Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DB rp empirical #4

Merged
merged 8 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions exploration/04_add_return_periods_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# ---
# jupyter:
# jupytext:
# cell_metadata_filter: -all
# custom_cell_magics: kql
# text_representation:
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.11.2
# kernelspec:
# display_name: hdx-floodscan
# language: python
# name: hdx-floodscan
# ---

# %%
# %matplotlib inline
# %load_ext autoreload
# %autoreload 2

import numpy as np

# %%
from src.utils import pg
from src.utils import return_periods as rp

# %%
df_current = pg.fs_last_90_days(mode="prod", admin_level=2, band="SFED")
df_yr_max = pg.fs_year_max(mode="prod", admin_level=2, band="SFED")

# %%
df_w_rps = rp.fs_add_rp(
df=df_current, df_maxima=df_yr_max, by=["iso3", "pcode"]
)

# %% [markdown]
# we can have a metadata tab + table that explains why the admin below are not
# included

# %%
rp.extract_nan_strata(df_current, by=["iso3", "pcode"])

# %%

max_rp = df_w_rps["RP"].max()
max_rp = df_w_rps[df_w_rps["RP"] != np.inf]["RP"].max()
print(max_rp)
2 changes: 2 additions & 0 deletions requirements.txt
zackarno marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ netCDF4==1.7.2
scipy==1.14.1
lmoments3==1.0.7
pyarrow==17.0.0
SQLAlchemy==2.0.33
psycopg2==2.9.10
67 changes: 67 additions & 0 deletions src/utils/pg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os

import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine

load_dotenv()

AZURE_DB_PW_PROD = os.getenv("AZURE_DB_PW_PROD")
AZURE_DB_PW_DEV = os.getenv("AZURE_DB_PW_DEV")


def get_engine(mode):
if mode == "prod":
pw = AZURE_DB_PW_PROD
else:
pw = AZURE_DB_PW_DEV

url = f"postgresql+psycopg2://chdadmin:{pw}@chd-rasterstats-{mode}.postgres.database.azure.com/postgres" # noqa: E501 E231
return create_engine(url)


def fs_year_max(mode, admin_level, band="SFED"):
engine = get_engine(mode)

query_yr_max = f"""
SELECT iso3, pcode, DATE_TRUNC('year', valid_date) AS year_date, MAX(mean) AS value
FROM floodscan
WHERE adm_level = {admin_level}
AND band = '{band}'
AND valid_date <= '2023-12-31'
GROUP BY iso3, pcode, year_date
"""
return pd.read_sql(sql=query_yr_max, con=engine)


def fs_last_90_days(mode, admin_level, band="SFED"):
engine = get_engine(mode)

query_last_90_days = f"""
SELECT iso3, pcode, valid_date, mean AS value
FROM floodscan
WHERE adm_level = {admin_level}
AND band = '{band}'
AND valid_date >= NOW() - INTERVAL '90 days'
"""
return pd.read_sql(sql=query_last_90_days, con=engine)


# this one is experimental - may mess around and see how it works on prod
zackarno marked this conversation as resolved.
Show resolved Hide resolved
def create_yr_max_view(mode, admin_level, band):
engine = get_engine(mode)
with engine.connect() as conn:
query = f"""
CREATE OR REPLACE VIEW floodscan_yearly_max AS
SELECT iso3, pcode, year_date, MAX(mean) AS value
FROM (
SELECT iso3, pcode, adm_level, valid_date, band, mean,
DATE_TRUNC('year', valid_date) AS year_date
FROM floodscan
WHERE adm_level = {admin_level}
AND band = '{band}'
AND valid_date <= '2023-12-31'
) AS filtered_floodscan
GROUP BY iso3, pcode, year_date
""" # noqa E231 E202
conn.execute(query)
103 changes: 103 additions & 0 deletions src/utils/return_periods.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,111 @@
import numpy as np
import pandas as pd
from lmoments3 import distr, lmom_ratios
from scipy.interpolate import interp1d
from scipy.stats import norm, pearson3, skew

# Empirical RP Functions


def fs_add_rp(df, df_maxima, by):
df_nans, df_maxima_nans = [
extract_nan_strata(x, by=by) for x in [df, df_maxima]
]

df_filt = df[
~df[by].apply(tuple, axis=1).isin(df_nans.apply(tuple, axis=1))
]
df_maxima_filt = df_maxima[
~df_maxima[by]
.apply(tuple, axis=1)
.isin(df_maxima_nans.apply(tuple, axis=1))
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the source of the SettingWithCopyWarning 👀

To avoid those coming up, can just add .copy() to each of these filtering steps like:

    df_filt = df[
        ~df[by].apply(tuple, axis=1).isin(df_nans.apply(tuple, axis=1))
    ].copy()
    df_maxima_filt = df_maxima[
        ~df_maxima[by]
        .apply(tuple, axis=1)
        .isin(df_maxima_nans.apply(tuple, axis=1))
    ].copy()

Anyways not necessary, up to you if you want to switch it.

Copy link
Collaborator Author

@zackarno zackarno Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea i had gotten rid of a bunch of them by using .loc , but not sure i how i could do that for these operations. Anyways i added the .copy() for now. good call, thanks!


df_rps = (
df_maxima_filt.groupby(by, group_keys=True)
.apply(empirical_rp, include_groups=False)
.reset_index()
.drop(columns=["level_2"])
)

interp_funcs = interpolation_functions_by(
df=df_rps, rp="RP", value="value", by=by
)

df_filt.loc[:, "RP"] = df_filt.apply(
lambda row: apply_interp(row, interp_funcs, by=by),
axis=1,
).astype(float)

df_filt.loc[:, "RP"] = clean_rps(df_filt["RP"], decimals=3, upper=10)

return df_filt


def extract_nan_strata(df, by=["iso3", "pcode"]):
return df[df["value"].isna()][by].drop_duplicates()


def interpolation_functions_by(df, rp, value, by=["iso3", "pcode"]):
"""
Generate interpolation functions for each group in the DataFrame.

Parameters:
df (pandas.DataFrame): The input DataFrame containing the data.
rp (str): The column name in the DataFrame representing the
return period values.
value (str): The column name in the DataFrame representing the values
to interpolate.
by (list of str, optional): The list of column names to group by.
Default is ["iso3", "pcode"].

Returns:
dict: A dictionary where keys are tuples of group values and values are
interpolation functions.
"""
interp_funcs = (
df.groupby(by)
.apply(
lambda group: interp1d(
zackarno marked this conversation as resolved.
Show resolved Hide resolved
group[value],
group[rp],
bounds_error=False,
fill_value=(1, np.nan),
),
include_groups=False,
)
.to_dict()
)

return interp_funcs


def apply_interp(row, interp_dict, by=["iso3", "pcode"]):
try:
key = tuple(row[col] for col in by)
interp_func = interp_dict[key]
return interp_func(row["value"])
except KeyError:
return np.nan


def clean_rps(x, decimals=3, upper=10):
x_round = x.round(3)
x_round[x_round > upper] = np.inf
return x_round


def empirical_rp(group):
if group["value"].isna().any():
raise ValueError("The value column contains NaN values")

group["RANK"] = group["value"].rank(ascending=False)
group["RP"] = (len(group) + 1) / group["RANK"]
return group


# LP3 Functions


def lp3_params(x, est_method="lmoments"):
x = np.asarray(x)
Expand Down