diff --git a/numaprom/__init__.py b/numaprom/__init__.py index ed05ff3..c70820c 100644 --- a/numaprom/__init__.py +++ b/numaprom/__init__.py @@ -1,7 +1,7 @@ import logging import os -from numaprom._config import UnifiedConf, MetricConf, ServiceConf, NumapromConf +from numaprom._config import UnifiedConf, MetricConf, AppConf, NumapromConf def get_logger(name): @@ -25,4 +25,4 @@ def get_logger(name): return logger -__all__ = ["UnifiedConf", "MetricConf", "ServiceConf", "NumapromConf", "get_logger"] +__all__ = ["UnifiedConf", "MetricConf", "AppConf", "NumapromConf", "get_logger"] diff --git a/numaprom/_config.py b/numaprom/_config.py index 81e19a6..f030b72 100644 --- a/numaprom/_config.py +++ b/numaprom/_config.py @@ -26,8 +26,8 @@ class MetricConf: @dataclass -class ServiceConf: - service: str = "default" +class AppConf: + app: str = "default" namespace: str = "default" metric_configs: List[MetricConf] = field(default_factory=lambda: [MetricConf()]) unified_configs: List[UnifiedConf] = field(default_factory=list) @@ -35,4 +35,4 @@ class ServiceConf: @dataclass class NumapromConf: - configs: List[ServiceConf] + configs: List[AppConf] diff --git a/numaprom/_constants.py b/numaprom/_constants.py index 5f2aeca..3f8d6d1 100644 --- a/numaprom/_constants.py +++ b/numaprom/_constants.py @@ -3,6 +3,7 @@ NUMAPROM_DIR = os.path.dirname(__file__) ROOT_DIR = os.path.split(NUMAPROM_DIR)[0] TESTS_DIR = os.path.join(ROOT_DIR, "tests") +TESTS_RESOURCES = os.path.join(TESTS_DIR, "resources") DATA_DIR = os.path.join(NUMAPROM_DIR, "data") CONFIG_DIR = os.path.join(NUMAPROM_DIR, "configs") DEFAULT_CONFIG_DIR = os.path.join(NUMAPROM_DIR, "default-configs") @@ -17,3 +18,5 @@ INFERENCE_VTX_KEY = "inference" THRESHOLD_VTX_KEY = "threshold" POSTPROC_VTX_KEY = "postproc" + +CONFIG_PATHS = ["./numaprom/configs", "./numaprom/default-configs"] diff --git a/numaprom/tools.py b/numaprom/tools.py index 81fadc9..5f5f4c8 100644 --- a/numaprom/tools.py +++ b/numaprom/tools.py @@ -14,18 +14,15 @@ from botocore.session import get_session from mlflow.entities.model_registry import ModelVersion from mlflow.exceptions import RestException -from numalogic.config import NumalogicConf, PostprocessFactory +from numalogic.config import PostprocessFactory from numalogic.models.threshold import SigmoidThreshold from numalogic.registry import MLflowRegistry, ArtifactData -from omegaconf import OmegaConf from pynumaflow.function import Messages, Message -from numaprom import get_logger, MetricConf, ServiceConf, NumapromConf, UnifiedConf +from numaprom import get_logger, MetricConf from numaprom._constants import ( DEFAULT_TRACKING_URI, DEFAULT_PROMETHEUS_SERVER, - CONFIG_DIR, - DEFAULT_CONFIG_DIR, ) from numaprom.entities import TrainerPayload, StreamPayload from numaprom.clients.prometheus import Prometheus @@ -155,69 +152,6 @@ def save_model( return version -def get_all_configs(): - schema: NumapromConf = OmegaConf.structured(NumapromConf) - - conf = OmegaConf.load(os.path.join(CONFIG_DIR, "config.yaml")) - given_configs = OmegaConf.merge(schema, conf).configs - - conf = OmegaConf.load(os.path.join(DEFAULT_CONFIG_DIR, "config.yaml")) - default_configs = OmegaConf.merge(schema, conf).configs - - conf = OmegaConf.load(os.path.join(DEFAULT_CONFIG_DIR, "numalogic_config.yaml")) - schema: NumalogicConf = OmegaConf.structured(NumalogicConf) - default_numalogic = OmegaConf.merge(schema, conf) - - return given_configs, default_configs, default_numalogic - - -def get_service_config(metric: str, namespace: str): - given_configs, default_configs, default_numalogic = get_all_configs() - - # search and load from given configs - service_config = list(filter(lambda conf: (conf.namespace == namespace), given_configs)) - - # if not search and load from default configs - if not service_config: - for _conf in default_configs: - if metric in _conf.unified_configs[0].unified_metrics: - service_config = [_conf] - break - - # if not in default configs, initialize Namespace conf with default values - if not service_config: - service_config = OmegaConf.structured(ServiceConf) - else: - service_config = service_config[0] - - # loading and setting default numalogic config - for metric_config in service_config.metric_configs: - if OmegaConf.is_missing(metric_config, "numalogic_conf"): - metric_config.numalogic_conf = default_numalogic - - return service_config - - -def get_metric_config(metric: str, namespace: str) -> Optional[MetricConf]: - service_config = get_service_config(metric, namespace) - metric_config = list( - filter(lambda conf: (conf.metric == metric), service_config.metric_configs) - ) - if not metric_config: - return service_config.metric_configs[0] - return metric_config[0] - - -def get_unified_config(metric: str, namespace: str) -> Optional[UnifiedConf]: - service_config = get_service_config(metric, namespace) - unified_config = list( - filter(lambda conf: (metric in conf.unified_metrics), service_config.unified_configs) - ) - if not unified_config: - return None - return unified_config[0] - - def fetch_data( payload: TrainerPayload, metric_config: MetricConf, labels: dict, return_labels=None ) -> pd.DataFrame: diff --git a/numaprom/udf/inference.py b/numaprom/udf/inference.py index 0984d16..c97c964 100644 --- a/numaprom/udf/inference.py +++ b/numaprom/udf/inference.py @@ -14,9 +14,9 @@ from numaprom.entities import Status, StreamPayload, Header from numaprom.tools import ( load_model, - get_metric_config, msg_forward, ) +from numaprom.watcher import ConfigManager _LOGGER = get_logger(__name__) @@ -73,9 +73,7 @@ def inference(_: str, datum: Datum) -> bytes: return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) # Load config - metric_config = get_metric_config( - metric=payload.composite_keys["name"], namespace=payload.composite_keys["namespace"] - ) + metric_config = ConfigManager().get_metric_config(payload.composite_keys) numalogic_conf = metric_config.numalogic_conf # Load inference model diff --git a/numaprom/udf/postprocess.py b/numaprom/udf/postprocess.py index ebe2ac0..8b44c85 100644 --- a/numaprom/udf/postprocess.py +++ b/numaprom/udf/postprocess.py @@ -12,10 +12,9 @@ from numaprom.clients.redis import get_redis_client from numaprom.tools import ( msgs_forward, - get_unified_config, - get_metric_config, WindowScorer, ) +from numaprom.watcher import ConfigManager _LOGGER = get_logger(__name__) @@ -130,9 +129,7 @@ def __construct_unified_payload( def _publish(final_score: float, payload: StreamPayload) -> List[bytes]: - unified_config = get_unified_config( - metric=payload.composite_keys["name"], namespace=payload.composite_keys["namespace"] - ) + unified_config = ConfigManager().get_unified_config(payload.composite_keys) publisher_json = __construct_publisher_payload(payload, final_score).as_json() _LOGGER.info("%s - Payload sent to publisher: %s", payload.uuid, publisher_json) @@ -181,9 +178,7 @@ def postprocess(_: str, datum: Datum) -> List[bytes]: payload = StreamPayload(**orjson.loads(_in_msg)) # Load config - metric_config = get_metric_config( - metric=payload.composite_keys["name"], namespace=payload.composite_keys["namespace"] - ) + metric_config = ConfigManager().get_metric_config(payload.composite_keys) _LOGGER.debug("%s - Received Payload: %r ", payload.uuid, payload) diff --git a/numaprom/udf/preprocess.py b/numaprom/udf/preprocess.py index 25c6e0d..d662242 100644 --- a/numaprom/udf/preprocess.py +++ b/numaprom/udf/preprocess.py @@ -5,7 +5,8 @@ from numaprom import get_logger from numaprom.entities import Status, StreamPayload, Header -from numaprom.tools import msg_forward, load_model, get_metric_config +from numaprom.tools import msg_forward, load_model +from numaprom.watcher import ConfigManager _LOGGER = get_logger(__name__) @@ -19,9 +20,7 @@ def preprocess(_: str, datum: Datum) -> bytes: _LOGGER.info("%s - Received Payload: %r ", payload.uuid, payload) # Load config - metric_config = get_metric_config( - metric=payload.composite_keys["name"], namespace=payload.composite_keys["namespace"] - ) + metric_config = ConfigManager().get_metric_config(payload.composite_keys) preprocess_cfgs = metric_config.numalogic_conf.preprocess # Load preprocess artifact diff --git a/numaprom/udf/threshold.py b/numaprom/udf/threshold.py index 41c347f..4abf851 100644 --- a/numaprom/udf/threshold.py +++ b/numaprom/udf/threshold.py @@ -11,8 +11,8 @@ conditional_forward, calculate_static_thresh, load_model, - get_metric_config, ) +from numaprom.watcher import ConfigManager _LOGGER = get_logger(__name__) @@ -44,9 +44,8 @@ def threshold(_: str, datum: Datum) -> list[tuple[str, bytes]]: ) # Load config - metric_config = get_metric_config( - metric=payload.composite_keys["name"], namespace=payload.composite_keys["namespace"] - ) + cm = ConfigManager() + metric_config = cm.get_metric_config(payload.composite_keys) thresh_cfg = metric_config.numalogic_conf.threshold # Check if payload needs static inference diff --git a/numaprom/udf/window.py b/numaprom/udf/window.py index dbc1431..29718f8 100644 --- a/numaprom/udf/window.py +++ b/numaprom/udf/window.py @@ -12,7 +12,8 @@ from numaprom import get_logger from numaprom.entities import StreamPayload, Status, Header from numaprom.clients.redis import get_redis_client -from numaprom.tools import msg_forward, create_composite_keys, get_metric_config +from numaprom.tools import msg_forward, create_composite_keys +from numaprom.watcher import ConfigManager _LOGGER = get_logger(__name__) @@ -68,7 +69,9 @@ def window(_: str, datum: Datum) -> Optional[bytes]: _start_time = time.perf_counter() msg = orjson.loads(datum.value) - metric_config = get_metric_config(metric=msg["name"], namespace=msg["labels"]["namespace"]) + metric_config = ConfigManager().get_metric_config( + {"name": msg["name"], "namespace": msg["labels"]["namespace"]} + ) win_size = metric_config.numalogic_conf.model.conf["seq_len"] buff_size = int(os.getenv("BUFF_SIZE", 10 * win_size)) diff --git a/numaprom/udsink/train.py b/numaprom/udsink/train.py index 17d0c4b..43c287d 100644 --- a/numaprom/udsink/train.py +++ b/numaprom/udsink/train.py @@ -15,7 +15,8 @@ from numaprom import get_logger from numaprom.entities import TrainerPayload from numaprom.clients.redis import get_redis_client -from numaprom.tools import get_metric_config, save_model, fetch_data +from numaprom.tools import save_model, fetch_data +from numaprom.watcher import ConfigManager _LOGGER = get_logger(__name__) @@ -103,9 +104,7 @@ def train(datums: List[Datum]) -> Responses: responses.append(Response.as_success(_datum.id)) continue - metric_config = get_metric_config( - metric=payload.composite_keys["name"], namespace=payload.composite_keys["namespace"] - ) + metric_config = ConfigManager().get_metric_config(payload.composite_keys) model_cfg = metric_config.numalogic_conf.model train_df = fetch_data( diff --git a/numaprom/udsink/train_rollout.py b/numaprom/udsink/train_rollout.py index aaa57ea..bb525c0 100644 --- a/numaprom/udsink/train_rollout.py +++ b/numaprom/udsink/train_rollout.py @@ -15,7 +15,8 @@ from numaprom import get_logger from numaprom.entities import TrainerPayload from numaprom.clients.redis import get_redis_client -from numaprom.tools import get_metric_config, save_model, fetch_data +from numaprom.tools import save_model, fetch_data +from numaprom.watcher import ConfigManager _LOGGER = get_logger(__name__) @@ -117,10 +118,7 @@ def train_rollout(datums: Iterator[Datum]) -> Responses: responses.append(Response.as_success(_datum.id)) continue - metric_config = get_metric_config( - metric=payload.composite_keys["name"], namespace=payload.composite_keys["namespace"] - ) - + metric_config = ConfigManager().get_metric_config(payload.composite_keys) model_cfg = metric_config.numalogic_conf.model # ToDo: standardize the label name diff --git a/numaprom/watcher.py b/numaprom/watcher.py new file mode 100644 index 0000000..c66a595 --- /dev/null +++ b/numaprom/watcher.py @@ -0,0 +1,142 @@ +import os +import time +from functools import lru_cache +from typing import Optional + +from omegaconf import OmegaConf +from watchdog.observers import Observer +from numalogic.config import NumalogicConf +from watchdog.events import FileSystemEventHandler + +from numaprom._constants import CONFIG_DIR, DEFAULT_CONFIG_DIR +from numaprom import NumapromConf, get_logger, AppConf, MetricConf, UnifiedConf + +_LOGGER = get_logger(__name__) + + +class ConfigManager: + config = {} + + @staticmethod + def load_configs(): + schema: NumapromConf = OmegaConf.structured(NumapromConf) + + conf = OmegaConf.load(os.path.join(CONFIG_DIR, "config.yaml")) + app_configs = OmegaConf.merge(schema, conf).configs + + conf = OmegaConf.load(os.path.join(DEFAULT_CONFIG_DIR, "config.yaml")) + default_configs = OmegaConf.merge(schema, conf).configs + + conf = OmegaConf.load(os.path.join(DEFAULT_CONFIG_DIR, "numalogic_config.yaml")) + schema: NumalogicConf = OmegaConf.structured(NumalogicConf) + default_numalogic = OmegaConf.merge(schema, conf) + + return app_configs, default_configs, default_numalogic + + @classmethod + def update_configs(cls): + app_configs, default_configs, default_numalogic = cls.load_configs() + + cls.config["app_configs"] = dict() + for _config in app_configs: + cls.config["app_configs"][_config.namespace] = _config + + cls.config["default_configs"] = dict(map(lambda c: (c.namespace, c), default_configs)) + cls.config["default_numalogic"] = default_numalogic + + _LOGGER.info("Successfully updated configs - %s", cls.config) + return cls.config + + @classmethod + @lru_cache(maxsize=100) + def get_app_config(cls, metric: str, namespace: str) -> Optional[AppConf]: + if not cls.config: + cls.update_configs() + + app_config = None + + # search and load from app configs + if namespace in cls.config["app_configs"]: + app_config = cls.config["app_configs"][namespace] + + # if not search and load from default configs + if not app_config: + for key, _conf in cls.config["default_configs"].items(): + if metric in _conf.unified_configs[0].unified_metrics: + app_config = _conf + break + + # if not in default configs, initialize Namespace conf with default values + if not app_config: + app_config = OmegaConf.structured(AppConf) + + # loading and setting default numalogic config + for metric_config in app_config.metric_configs: + if OmegaConf.is_missing(metric_config, "numalogic_conf"): + metric_config.numalogic_conf = cls.config["default_numalogic"] + + return app_config + + @classmethod + def get_metric_config(cls, composite_keys: dict) -> Optional[MetricConf]: + app_config = cls.get_app_config( + metric=composite_keys["name"], namespace=composite_keys["namespace"] + ) + metric_config = list( + filter(lambda conf: (conf.metric == composite_keys["name"]), app_config.metric_configs) + ) + if not metric_config: + return app_config.metric_configs[0] + return metric_config[0] + + @classmethod + def get_unified_config(cls, composite_keys: dict) -> Optional[UnifiedConf]: + app_config = cls.get_app_config( + metric=composite_keys["name"], namespace=composite_keys["namespace"] + ) + unified_config = list( + filter( + lambda conf: (composite_keys["name"] in conf.unified_metrics), + app_config.unified_configs, + ) + ) + if not unified_config: + return None + return unified_config[0] + + +class ConfigHandler(FileSystemEventHandler): + def ___init__(self): + self.config_manger = ConfigManager() + + def on_any_event(self, event): + if event.event_type == "created" or event.event_type == "modified": + _file = os.path.basename(event.src_path) + _dir = os.path.basename(os.path.dirname(event.src_path)) + + _LOGGER.info("Watchdog received %s event - %s/%s", event.event_type, _dir, _file) + self.config_manger.get_app_config.cache_clear() + self.config_manger.update_configs() + + +class Watcher: + def __init__(self, directories=None, handler=FileSystemEventHandler()): + if directories is None: + directories = ["."] + self.observer = Observer() + self.handler = handler + self.directories = directories + + def run(self): + for directory in self.directories: + self.observer.schedule(self.handler, directory, recursive=True) + _LOGGER.info("\nWatcher Running in {}/\n".format(directory)) + + self.observer.start() + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + self.observer.stop() + self.observer.join() + _LOGGER.info("\nWatcher Terminated\n") diff --git a/poetry.lock b/poetry.lock index f7e1c02..e7dc5cf 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. +# This file is automatically @generated by Poetry and should not be changed by hand. [[package]] name = "aiohttp" @@ -597,18 +597,18 @@ lua = ["lupa (>=1.13,<2.0)"] [[package]] name = "filelock" -version = "3.10.7" +version = "3.11.0" description = "A platform independent file lock." category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "filelock-3.10.7-py3-none-any.whl", hash = "sha256:bde48477b15fde2c7e5a0713cbe72721cb5a5ad32ee0b8f419907960b9d75536"}, - {file = "filelock-3.10.7.tar.gz", hash = "sha256:892be14aa8efc01673b5ed6589dbccb95f9a8596f0507e232626155495c18105"}, + {file = "filelock-3.11.0-py3-none-any.whl", hash = "sha256:f08a52314748335c6460fc8fe40cd5638b85001225db78c2aa01c8c0db83b318"}, + {file = "filelock-3.11.0.tar.gz", hash = "sha256:3618c0da67adcc0506b015fd11ef7faf1b493f0b40d87728e19986b536890c37"}, ] [package.extras] -docs = ["furo (>=2022.12.7)", "sphinx (>=6.1.3)", "sphinx-autodoc-typehints (>=1.22,!=1.23.4)"] +docs = ["furo (>=2023.3.27)", "sphinx (>=6.1.3)", "sphinx-autodoc-typehints (>=1.22,!=1.23.4)"] testing = ["covdefaults (>=2.3)", "coverage (>=7.2.2)", "diff-cover (>=7.5)", "pytest (>=7.2.2)", "pytest-cov (>=4)", "pytest-mock (>=3.10)", "pytest-timeout (>=2.1)"] [[package]] @@ -2590,7 +2590,7 @@ files = [ ] [package.dependencies] -greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and platform_machine == \"aarch64\" or python_version >= \"3\" and platform_machine == \"ppc64le\" or python_version >= \"3\" and platform_machine == \"x86_64\" or python_version >= \"3\" and platform_machine == \"amd64\" or python_version >= \"3\" and platform_machine == \"AMD64\" or python_version >= \"3\" and platform_machine == \"win32\" or python_version >= \"3\" and platform_machine == \"WIN32\""} +greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")"} [package.extras] aiomysql = ["aiomysql", "greenlet (!=0.4.17)"] @@ -2852,6 +2852,46 @@ files = [ docs = ["Sphinx (>=1.8.1)", "docutils", "pylons-sphinx-themes (>=1.0.9)"] testing = ["coverage (>=5.0)", "pytest", "pytest-cover"] +[[package]] +name = "watchdog" +version = "3.0.0" +description = "Filesystem events monitoring" +category = "main" +optional = false +python-versions = ">=3.7" +files = [ + {file = "watchdog-3.0.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:336adfc6f5cc4e037d52db31194f7581ff744b67382eb6021c868322e32eef41"}, + {file = "watchdog-3.0.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:a70a8dcde91be523c35b2bf96196edc5730edb347e374c7de7cd20c43ed95397"}, + {file = "watchdog-3.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:adfdeab2da79ea2f76f87eb42a3ab1966a5313e5a69a0213a3cc06ef692b0e96"}, + {file = "watchdog-3.0.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:2b57a1e730af3156d13b7fdddfc23dea6487fceca29fc75c5a868beed29177ae"}, + {file = "watchdog-3.0.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:7ade88d0d778b1b222adebcc0927428f883db07017618a5e684fd03b83342bd9"}, + {file = "watchdog-3.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:7e447d172af52ad204d19982739aa2346245cc5ba6f579d16dac4bfec226d2e7"}, + {file = "watchdog-3.0.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:9fac43a7466eb73e64a9940ac9ed6369baa39b3bf221ae23493a9ec4d0022674"}, + {file = "watchdog-3.0.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:8ae9cda41fa114e28faf86cb137d751a17ffd0316d1c34ccf2235e8a84365c7f"}, + {file = "watchdog-3.0.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:25f70b4aa53bd743729c7475d7ec41093a580528b100e9a8c5b5efe8899592fc"}, + {file = "watchdog-3.0.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4f94069eb16657d2c6faada4624c39464f65c05606af50bb7902e036e3219be3"}, + {file = "watchdog-3.0.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:7c5f84b5194c24dd573fa6472685b2a27cc5a17fe5f7b6fd40345378ca6812e3"}, + {file = "watchdog-3.0.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:3aa7f6a12e831ddfe78cdd4f8996af9cf334fd6346531b16cec61c3b3c0d8da0"}, + {file = "watchdog-3.0.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:233b5817932685d39a7896b1090353fc8efc1ef99c9c054e46c8002561252fb8"}, + {file = "watchdog-3.0.0-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:13bbbb462ee42ec3c5723e1205be8ced776f05b100e4737518c67c8325cf6100"}, + {file = "watchdog-3.0.0-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:8f3ceecd20d71067c7fd4c9e832d4e22584318983cabc013dbf3f70ea95de346"}, + {file = "watchdog-3.0.0-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:c9d8c8ec7efb887333cf71e328e39cffbf771d8f8f95d308ea4125bf5f90ba64"}, + {file = "watchdog-3.0.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:0e06ab8858a76e1219e68c7573dfeba9dd1c0219476c5a44d5333b01d7e1743a"}, + {file = "watchdog-3.0.0-py3-none-manylinux2014_armv7l.whl", hash = "sha256:d00e6be486affb5781468457b21a6cbe848c33ef43f9ea4a73b4882e5f188a44"}, + {file = "watchdog-3.0.0-py3-none-manylinux2014_i686.whl", hash = "sha256:c07253088265c363d1ddf4b3cdb808d59a0468ecd017770ed716991620b8f77a"}, + {file = "watchdog-3.0.0-py3-none-manylinux2014_ppc64.whl", hash = "sha256:5113334cf8cf0ac8cd45e1f8309a603291b614191c9add34d33075727a967709"}, + {file = "watchdog-3.0.0-py3-none-manylinux2014_ppc64le.whl", hash = "sha256:51f90f73b4697bac9c9a78394c3acbbd331ccd3655c11be1a15ae6fe289a8c83"}, + {file = "watchdog-3.0.0-py3-none-manylinux2014_s390x.whl", hash = "sha256:ba07e92756c97e3aca0912b5cbc4e5ad802f4557212788e72a72a47ff376950d"}, + {file = "watchdog-3.0.0-py3-none-manylinux2014_x86_64.whl", hash = "sha256:d429c2430c93b7903914e4db9a966c7f2b068dd2ebdd2fa9b9ce094c7d459f33"}, + {file = "watchdog-3.0.0-py3-none-win32.whl", hash = "sha256:3ed7c71a9dccfe838c2f0b6314ed0d9b22e77d268c67e015450a29036a81f60f"}, + {file = "watchdog-3.0.0-py3-none-win_amd64.whl", hash = "sha256:4c9956d27be0bb08fc5f30d9d0179a855436e655f046d288e2bcc11adfae893c"}, + {file = "watchdog-3.0.0-py3-none-win_ia64.whl", hash = "sha256:5d9f3a10e02d7371cd929b5d8f11e87d4bad890212ed3901f9b4d68767bee759"}, + {file = "watchdog-3.0.0.tar.gz", hash = "sha256:4d98a320595da7a7c5a18fc48cb633c2e73cda78f93cac2ef42d42bf609a33f9"}, +] + +[package.extras] +watchmedo = ["PyYAML (>=3.10)"] + [[package]] name = "websocket-client" version = "1.5.1" @@ -3009,4 +3049,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = ">=3.9, <3.11" -content-hash = "77395fe0f49fdabc9a559f7f39ebda6e1ae909b3acf857493fdeb9ce326e7fc5" +content-hash = "2a8ab7f8e80dba20dc28d64f8aec7637c9aedcc68c0494de18ca6b0ea743400e" diff --git a/pyproject.toml b/pyproject.toml index 7fa1ed3..c313631 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "numalogic-prometheus" -version = "0.2.7" +version = "0.3.0" description = "ML inference on numaflow using numalogic on Prometheus metrics" authors = ["Numalogic developers"] packages = [{ include = "numaprom" }] @@ -26,6 +26,7 @@ numalogic = {version = "~0.3.7", extras = ["mlflow"]} boto3 = "^1.25.2" orjson = "^3.8.4" omegaconf = "^2.3.0" +watchdog = "^3.0.0" [tool.poetry.group.mlflowserver] optional = true diff --git a/starter.py b/starter.py index f9dd85e..101bc22 100644 --- a/starter.py +++ b/starter.py @@ -3,7 +3,9 @@ from pynumaflow.function import UserDefinedFunctionServicer from pynumaflow.sink import UserDefinedSinkServicer +from numaprom._constants import CONFIG_PATHS from numaprom.factory import HandlerFactory +from numaprom.watcher import Watcher, ConfigHandler if __name__ == "__main__": step_handler = HandlerFactory.get_handler(sys.argv[2]) @@ -17,3 +19,6 @@ raise ValueError(f"sys arg: {server_type} not understood!") server.start() + + w = Watcher(CONFIG_PATHS, ConfigHandler()) + w.run() diff --git a/tests/resources/config.yaml b/tests/resources/configs/config.yaml similarity index 100% rename from tests/resources/config.yaml rename to tests/resources/configs/config.yaml diff --git a/tests/resources/configs/default-config.yaml b/tests/resources/configs/default-config.yaml new file mode 100644 index 0000000..e48c3ec --- /dev/null +++ b/tests/resources/configs/default-config.yaml @@ -0,0 +1,38 @@ +configs: + - namespace: "default-argorollouts" + metric_configs: + - metric: "namespace_app_rollouts_http_request_error_rate" + composite_keys: [ "namespace", "name", "app", "rollouts_pod_template_hash" ] + static_threshold: 3 + static_threshold_wt: 0.7 + - metric: "namespace_app_rollouts_http_request_latency" + composite_keys: [ "namespace", "name", "app", "rollouts_pod_template_hash" ] + static_threshold: 3 + unified_configs: + - unified_metric_name: "namespace_app_rollouts_unified_anomaly" + unified_metrics: [ "namespace_app_rollouts_http_request_error_rate", + "namespace_app_rollouts_http_request_latency" ] + - namespace: "default-argocd" + metric_configs: + - metric: "namespace_app_http_server_requests_errors" + composite_keys: [ "namespace", "name" ] + static_threshold: 3 + - metric: "namespace_app_http_server_requests_error_rate" + composite_keys: [ "namespace", "name" ] + static_threshold: 3 + - metric: "namespace_app_http_server_requests_latency" + composite_keys: [ "namespace", "name" ] + static_threshold: 3 + - metric: "namespace_app_cpu_utilization" + composite_keys: [ "namespace", "name" ] + static_threshold: 80 + - metric: "namespace_app_memory_utilization" + composite_keys: [ "namespace", "name" ] + static_threshold: 80 + unified_configs: + - unified_metric_name: "namespace_app_unified_anomaly" + unified_metrics: [ "namespace_app_http_server_requests_errors", + "namespace_app_http_server_requests_error_rate", + "namespace_app_http_server_requests_latency", + "namespace_app_cpu_utilization", + "namespace_app_memory_utilization" ] diff --git a/tests/resources/numalogic_config.yaml b/tests/resources/configs/numalogic_config.yaml similarity index 100% rename from tests/resources/numalogic_config.yaml rename to tests/resources/configs/numalogic_config.yaml diff --git a/tests/test_tools.py b/tests/test_tools.py index 9768874..dc428f4 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -6,16 +6,13 @@ import numpy as np -from numaprom import tools from numaprom._constants import TESTS_DIR from numaprom.entities import StreamPayload from numaprom.tools import ( is_host_reachable, - get_metric_config, - get_service_config, - get_unified_config, WindowScorer, ) +from numaprom.watcher import ConfigManager from tests.tools import mock_configs DATA_DIR = os.path.join(TESTS_DIR, "resources", "data") @@ -27,7 +24,6 @@ def mock_resolver(*_, **__): raise socket.gaierror -@patch.object(tools, "get_all_configs", Mock(return_value=mock_configs())) class TestTools(unittest.TestCase): INFER_OUT = None @@ -38,79 +34,15 @@ def test_is_host_reachable(self): def test_is_host_reachable_err(self): self.assertFalse(is_host_reachable("google.com", max_retries=2, sleep_sec=1)) - def test_get_metric_config(self): - # from given config - metric_config = get_metric_config( - metric="rollout_latency", namespace="sandbox_numalogic_demo1" - ) - self.assertTrue(metric_config) - self.assertEqual(metric_config.metric, "rollout_latency") - - # from given default config - metric_config = get_metric_config( - metric="namespace_app_rollouts_http_request_error_rate", namespace="abc" - ) - self.assertTrue(metric_config) - self.assertEqual(metric_config.metric, "namespace_app_rollouts_http_request_error_rate") - - # default config - metric_config = get_metric_config(metric="random", namespace="abc") - self.assertTrue(metric_config) - self.assertEqual(metric_config.metric, "default") - - def test_get_service_config(self): - # from given config - service_config = get_service_config( - metric="rollout_latency", namespace="sandbox_numalogic_demo1" - ) - self.assertTrue(service_config) - self.assertEqual(service_config.namespace, "sandbox_numalogic_demo1") - - # from given default config - service_config = get_service_config( - metric="namespace_app_rollouts_http_request_error_rate", namespace="abc" - ) - self.assertTrue(service_config) - self.assertEqual(service_config.namespace, "default-argorollouts") - service_config = get_service_config( - metric="namespace_app_http_server_requests_error_rate", namespace="abc" - ) - self.assertTrue(service_config) - self.assertEqual(service_config.namespace, "default-argocd") - - # default config - service_config = get_service_config(metric="random", namespace="abc") - self.assertTrue(service_config) - self.assertEqual(service_config.namespace, "default") - - def test_get_unified_config(self): - # from given config - unified_config = get_unified_config( - metric="rollout_latency", namespace="sandbox_numalogic_demo1" - ) - self.assertTrue(unified_config) - self.assertTrue("rollout_latency" in unified_config.unified_metrics) - - # from given default config - unified_config = get_unified_config( - metric="namespace_app_rollouts_http_request_error_rate", namespace="abc" - ) - self.assertTrue(unified_config) - self.assertTrue( - "namespace_app_rollouts_http_request_error_rate" in unified_config.unified_metrics - ) - - # default config - will not have unified config - unified_config = get_unified_config(metric="random", namespace="abc") - self.assertFalse(unified_config) - -@patch.object(tools, "get_all_configs", Mock(return_value=mock_configs())) +@patch.object(ConfigManager, "load_configs", Mock(return_value=mock_configs())) class TestWindowScorer(unittest.TestCase): def test_get_winscore(self): - metric_conf = get_metric_config( - metric="namespace_app_rollouts_http_request_error_rate", - namespace="sandbox_numalogic_demo2", + metric_conf = ConfigManager().get_metric_config( + { + "name": "namespace_app_rollouts_http_request_error_rate", + "namespace": "sandbox_numalogic_demo2", + } ) stream = np.random.uniform(low=1, high=2, size=(10, 1)) payload = StreamPayload( diff --git a/tests/test_trainer.py b/tests/test_trainer.py index 71e1656..45da48f 100644 --- a/tests/test_trainer.py +++ b/tests/test_trainer.py @@ -8,9 +8,9 @@ from numalogic.registry import MLflowRegistry from pynumaflow.sink import Datum -from numaprom import tools from numaprom._constants import TESTS_DIR from numaprom.clients.prometheus import Prometheus +from numaprom.watcher import ConfigManager from tests.tools import ( mock_argocd_query_metric, mock_rollout_query_metric, @@ -35,7 +35,7 @@ def as_datum(data: Union[str, bytes, dict], msg_id="1") -> Datum: @patch("numaprom.tools.set_aws_session", Mock(return_value=None)) -@patch.object(tools, "get_all_configs", Mock(return_value=mock_configs())) +@patch.object(ConfigManager, "load_configs", Mock(return_value=mock_configs())) class TestTrainer(unittest.TestCase): train_payload = { "uuid": "123124543", diff --git a/tests/test_watcher.py b/tests/test_watcher.py new file mode 100644 index 0000000..a06e7b4 --- /dev/null +++ b/tests/test_watcher.py @@ -0,0 +1,127 @@ +import time +import unittest +from unittest.mock import patch, Mock + +from numaprom.watcher import ConfigManager +from tests.tools import mock_configs + + +@patch.object(ConfigManager, "load_configs", Mock(return_value=mock_configs())) +class TestConfigManager(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.cm = ConfigManager() + cls.payload = {"name": "rollout_latency", "namespace": "sandbox_numalogic_demo1"} + cls.argocd_payload = { + "name": "namespace_app_http_server_requests_error_rate", + "namespace": "abc", + } + cls.rollouts_payload = { + "name": "namespace_app_rollouts_http_request_error_rate", + "namespace": "abc", + } + cls.random_payload = {"name": "random", "namespace": "abc"} + + def test_update_configs(self): + config = self.cm.update_configs() + self.assertTrue(len(config), 3) + + def test_load_configs(self): + app_configs, default_configs, default_numalogic = self.cm.load_configs() + print(type(app_configs)) + print(type(default_configs)) + + def test_get_app_config(self): + # from given config + app_config = self.cm.get_app_config( + metric=self.payload["name"], namespace=self.payload["namespace"] + ) + self.assertTrue(app_config) + self.assertEqual(app_config.namespace, "sandbox_numalogic_demo1") + + # from given default config + app_config = self.cm.get_app_config( + metric=self.rollouts_payload["name"], namespace=self.rollouts_payload["namespace"] + ) + self.assertTrue(app_config) + self.assertEqual(app_config.namespace, "default-argorollouts") + + app_config = self.cm.get_app_config( + metric=self.argocd_payload["name"], namespace=self.argocd_payload["namespace"] + ) + self.assertTrue(app_config) + self.assertEqual(app_config.namespace, "default-argocd") + + # default config + service_config = self.cm.get_app_config( + metric=self.random_payload["name"], namespace=self.random_payload["namespace"] + ) + self.assertTrue(service_config) + self.assertEqual(service_config.namespace, "default") + + def test_get_metric_config(self): + # from given config + metric_config = self.cm.get_metric_config(self.payload) + self.assertTrue(metric_config) + self.assertEqual(metric_config.metric, "rollout_latency") + + # from given default config + metric_config = self.cm.get_metric_config(self.rollouts_payload) + self.assertTrue(metric_config) + self.assertEqual(metric_config.metric, "namespace_app_rollouts_http_request_error_rate") + + # default config + metric_config = self.cm.get_metric_config(self.random_payload) + self.assertTrue(metric_config) + self.assertEqual(metric_config.metric, "default") + + def test_get_unified_config(self): + # from given config + unified_config = self.cm.get_unified_config(self.payload) + self.assertTrue(unified_config) + self.assertTrue("rollout_latency" in unified_config.unified_metrics) + + # from given default config + unified_config = self.cm.get_unified_config(self.rollouts_payload) + self.assertTrue(unified_config) + self.assertTrue( + "namespace_app_rollouts_http_request_error_rate" in unified_config.unified_metrics + ) + + # default config - will not have unified config + unified_config = self.cm.get_unified_config(self.random_payload) + self.assertFalse(unified_config) + + def test_get_app_config_time(self): + _start_time = time.perf_counter() + ConfigManager().get_app_config( + metric=self.payload["name"], namespace=self.payload["namespace"] + ) + time1 = time.perf_counter() - _start_time + self.assertTrue(ConfigManager().get_app_config.cache_info().currsize >= 1) + _start_time = time.perf_counter() + ConfigManager().get_app_config( + metric=self.payload["name"], namespace=self.payload["namespace"] + ) + time2 = time.perf_counter() - _start_time + _start_time = time.perf_counter() + self.assertTrue(ConfigManager().get_app_config.cache_info().hits >= 1) + self.assertTrue(time2 <= time1) + + def test_get_metric_config_time(self): + _start_time = time.perf_counter() + ConfigManager().get_metric_config(self.payload) + time1 = time.perf_counter() - _start_time + _start_time = time.perf_counter() + ConfigManager().get_metric_config(self.payload) + time2 = time.perf_counter() - _start_time + self.assertTrue(time2 < time1) + + def test_get_unified_config_time(self): + _start_time = time.perf_counter() + ConfigManager().get_unified_config(self.payload) + time1 = time.perf_counter() - _start_time + _start_time = time.perf_counter() + ConfigManager().get_unified_config(self.payload) + time2 = time.perf_counter() - _start_time + self.assertTrue(time2 < time1) diff --git a/tests/tools.py b/tests/tools.py index 06fefae..b7a77b2 100644 --- a/tests/tools.py +++ b/tests/tools.py @@ -18,7 +18,7 @@ from pynumaflow.function import Datum, Messages from pynumaflow.function._dtypes import DROP -from numaprom._constants import TESTS_DIR, POSTPROC_VTX_KEY, DEFAULT_CONFIG_DIR +from numaprom._constants import TESTS_DIR, POSTPROC_VTX_KEY, DEFAULT_CONFIG_DIR, TESTS_RESOURCES from numaprom.factory import HandlerFactory from numaprom import NumapromConf @@ -255,14 +255,14 @@ def mock_rollout_query_metric2(*_, **__): def mock_configs(): schema: NumapromConf = OmegaConf.structured(NumapromConf) - conf = OmegaConf.load(os.path.join(TESTS_DIR, "resources", "config.yaml")) - given_configs = OmegaConf.merge(schema, conf).configs + conf = OmegaConf.load(os.path.join(TESTS_RESOURCES, "configs", "config.yaml")) + app_configs = OmegaConf.merge(schema, conf).configs - conf = OmegaConf.load(os.path.join(DEFAULT_CONFIG_DIR, "config.yaml")) + conf = OmegaConf.load(os.path.join(TESTS_RESOURCES, "configs", "default-config.yaml")) default_configs = OmegaConf.merge(schema, conf).configs - conf = OmegaConf.load(os.path.join(TESTS_DIR, "resources", "numalogic_config.yaml")) + conf = OmegaConf.load(os.path.join(TESTS_RESOURCES, "configs", "numalogic_config.yaml")) schema: NumalogicConf = OmegaConf.structured(NumalogicConf) default_numalogic = OmegaConf.merge(schema, conf) - return given_configs, default_configs, default_numalogic + return app_configs, default_configs, default_numalogic diff --git a/tests/udf/test_inference.py b/tests/udf/test_inference.py index dbb69e5..c870d20 100644 --- a/tests/udf/test_inference.py +++ b/tests/udf/test_inference.py @@ -6,9 +6,10 @@ from numalogic.registry import MLflowRegistry -from numaprom import tools +from numaprom import watcher from numaprom._constants import TESTS_DIR from numaprom.entities import Status, StreamPayload, Header +from numaprom.watcher import ConfigManager from tests import redis_client from tests.tools import ( get_inference_input, @@ -25,11 +26,11 @@ @patch("numaprom.tools.set_aws_session", Mock(return_value=None)) -@patch.object(tools, "get_all_configs", Mock(return_value=mock_configs())) +@patch.object(ConfigManager, "load_configs", Mock(return_value=mock_configs())) class TestInference(unittest.TestCase): @classmethod @patch("numaprom.tools.set_aws_session", Mock(return_value=None)) - @patch.object(tools, "get_all_configs", Mock(return_value=mock_configs())) + @patch.object(ConfigManager, "load_configs", Mock(return_value=mock_configs())) def setUpClass(cls) -> None: redis_client.flushall() cls.inference_input = get_inference_input(STREAM_DATA_PATH) diff --git a/tests/udf/test_postprocess.py b/tests/udf/test_postprocess.py index ded4b86..40884a6 100644 --- a/tests/udf/test_postprocess.py +++ b/tests/udf/test_postprocess.py @@ -5,9 +5,9 @@ from freezegun import freeze_time from unittest.mock import patch, Mock -from numaprom import tools from numaprom._constants import TESTS_DIR from numaprom.entities import PrometheusPayload, StreamPayload, Header +from numaprom.watcher import ConfigManager from tests import redis_client from tests.tools import ( get_postproc_input, @@ -22,7 +22,7 @@ @patch("numaprom.tools.set_aws_session", Mock(return_value=None)) -@patch.object(tools, "get_all_configs", Mock(return_value=mock_configs())) +@patch.object(ConfigManager, "load_configs", Mock(return_value=mock_configs())) class TestPostProcess(unittest.TestCase): postproc_input = None diff --git a/tests/udf/test_preprocess.py b/tests/udf/test_preprocess.py index 343731a..5d44543 100644 --- a/tests/udf/test_preprocess.py +++ b/tests/udf/test_preprocess.py @@ -6,9 +6,9 @@ from numalogic.registry import MLflowRegistry -from numaprom import tools from numaprom._constants import TESTS_DIR from numaprom.entities import Status, StreamPayload, Header +from numaprom.watcher import ConfigManager from tests.tools import get_prepoc_input, get_datum, return_preproc_clf, mock_configs # Make sure to import this in the end @@ -25,7 +25,7 @@ class TestPreprocess(unittest.TestCase): preproc_input = None @classmethod - @patch.object(tools, "get_all_configs", Mock(return_value=mock_configs())) + @patch.object(ConfigManager, "load_configs", Mock(return_value=mock_configs())) def setUpClass(cls) -> None: redis_client.flushall() cls.preproc_input = get_prepoc_input(STREAM_DATA_PATH) @@ -61,7 +61,7 @@ def test_preprocess_no_clf(self): self.assertIsInstance(payload, StreamPayload) @patch.object(MLflowRegistry, "load", Mock(return_value=return_preproc_clf())) - @patch.object(tools, "get_all_configs", Mock(return_value=mock_configs())) + @patch.object(ConfigManager, "load_configs", Mock(return_value=mock_configs())) def test_preprocess_with_nan(self): preproc_input = get_prepoc_input(STREAM_NAN_DATA_PATH) assert preproc_input.items(), print("input items is empty", preproc_input) diff --git a/tests/udf/test_threshold.py b/tests/udf/test_threshold.py index 814dc6e..98cc94a 100644 --- a/tests/udf/test_threshold.py +++ b/tests/udf/test_threshold.py @@ -6,9 +6,9 @@ from numalogic.registry import MLflowRegistry -from numaprom import tools from numaprom._constants import TESTS_DIR from numaprom.entities import Status, StreamPayload, TrainerPayload, Header +from numaprom.watcher import ConfigManager from tests import redis_client from tests.tools import ( get_threshold_input, @@ -25,7 +25,7 @@ @patch("numaprom.tools.set_aws_session", Mock(return_value=None)) -@patch.object(tools, "get_all_configs", Mock(return_value=mock_configs())) +@patch.object(ConfigManager, "load_configs", Mock(return_value=mock_configs())) class TestThreshold(unittest.TestCase): def setUp(self) -> None: redis_client.flushall() diff --git a/tests/udf/test_window.py b/tests/udf/test_window.py index 19aeeef..21e2870 100644 --- a/tests/udf/test_window.py +++ b/tests/udf/test_window.py @@ -5,9 +5,9 @@ from pynumaflow.function._dtypes import DROP -from numaprom import tools from numaprom._constants import TESTS_DIR from numaprom.entities import StreamPayload +from numaprom.watcher import ConfigManager from tests.tools import get_datum, get_stream_data, mockenv, mock_configs from tests import redis_client, window @@ -15,7 +15,7 @@ STREAM_DATA_PATH = os.path.join(DATA_DIR, "stream.json") -@patch.object(tools, "get_all_configs", Mock(return_value=mock_configs())) +@patch.object(ConfigManager, "load_configs", Mock(return_value=mock_configs())) class TestWindow(unittest.TestCase): @classmethod def setUpClass(cls) -> None: