Skip to content
This repository was archived by the owner on Feb 14, 2024. It is now read-only.

AlexIoannides/kfp-local

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

THIS PROJECT IS ARCHIVED

Following the Kubeflow project releasing local execution features as part of Kubeflow Pipelines version 2.5.0.


Local Pipelines for Local People

A local executor for Kubeflow pipelines tasks composed entirely out of custom Python components.

Installing

The kfp-local package targets MacOS and Linux and can be installed using,

python -m pip install git+https://github.com/AlexIoannides/kfp-local.git@main

Windows Support

There is currently no support for running kfp-local on Windows.

Compile a Pipeline

For example, as defined in tests/resources/make_pipeline.py,

python -m tests.make_pipeline

This will produce a compiled pipeline contained in pipeline.json

Run Tasks from Compiled Pipeline Locally

Pipelines can be executed either from the command line using the kfpl command, or via Python using the kfp_local.pipeline.run_pipeline function. See the docstring for information on the latter and the CLI help for the former (kfpl --help).

Example Pipeline Execution using Nox for Task Isolation

To run stages from the pipeline defined in pipeline.json use the kfpl CLI - e.g.,

kfpl stage-0 stage-1 stage-2 stage-3 --pipeline pipeline.json --nox

This will use Nox for dependency isolation and print to stdout. The output should look like,

nox > Running session run_pipeline_task
nox > Creating virtual environment (virtualenv) using python3.10 in .nox/run_pipeline_task
nox > python -m pip install kfp==2.4.0
nox > sed -i .bak 's/\/gcs\///' .nox/run_pipeline_task/lib/python3.10/site-packages/kfp/dsl/types/artifact_types.py
nox > sh -c '
if ! [ -x "$(command -v pip)" ]; then
    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi

PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location '"'"'kfp==2.4.0'"'"' '"'"'--no-deps'"'"' '"'"'typing-extensions>=3.7.4,<5; python_version<"3.9"'"'"'  &&  python3 -m pip install --quiet --no-warn-script-location '"'"'numpy'"'"' && "$0" "$@"
' sh -ec 'program_path=$(mktemp -d)

printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
' '
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *

def stage_0(config: Dict[str, Any], messages: List[str], run_id: str = "42") -> int:
    """Stage 0."""
    from numpy import random

    print(f"RUN_ID = {run_id}")
    for n, msg in enumerate(messages):
        print(f"|- message-{n}: {msg}")
    return random.randint(config["seed_low"], config["seed_high"])

' --executor_input '{"inputs": {"parameterValues": {"config": {"seed_high": 42.0, "seed_low": 0.0}, "messages": ["foo", "bar"], "run_id": "001"}, "artifacts": {}}, "outputs": {"artifacts": {}, "outputFile": "object-storage-bucket/stage-0/output_metadata.json"}}' --function_to_execute stage_0
[KFP Executor 2023-11-07 13:21:47,199 INFO]: Looking for component `stage_0` in --component_module_path `/var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.TrHWwseBIp/ephemeral_component.py`
[KFP Executor 2023-11-07 13:21:47,199 INFO]: Loading KFP component "stage_0" from /var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.TrHWwseBIp/ephemeral_component.py (directory "/var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.TrHWwseBIp" and module name "ephemeral_component")
[KFP Executor 2023-11-07 13:21:47,200 INFO]: Got executor_input:
{
    "inputs": {
        "parameterValues": {
            "config": {
                "seed_high": 42.0,
                "seed_low": 0.0
            },
            "messages": [
                "foo",
                "bar"
            ],
            "run_id": "001"
        },
        "artifacts": {}
    },
    "outputs": {
        "artifacts": {},
        "outputFile": "object-storage-bucket/stage-0/output_metadata.json"
    }
}
RUN_ID = 001
|- message-0: foo
|- message-1: bar
[KFP Executor 2023-11-07 13:21:48,207 INFO]: Wrote executor output file to object-storage-bucket/stage-0/output_metadata.json.
nox > Session run_pipeline_task was successful.
nox > Running session run_pipeline_task
nox > Creating virtual environment (virtualenv) using python3.10 in .nox/run_pipeline_task
nox > python -m pip install kfp==2.4.0
nox > sed -i .bak 's/\/gcs\///' .nox/run_pipeline_task/lib/python3.10/site-packages/kfp/dsl/types/artifact_types.py
nox > sh -c '
if ! [ -x "$(command -v pip)" ]; then
    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi

PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location '"'"'kfp==2.4.0'"'"' '"'"'--no-deps'"'"' '"'"'typing-extensions>=3.7.4,<5; python_version<"3.9"'"'"'  &&  python3 -m pip install --quiet --no-warn-script-location '"'"'numpy'"'"' && "$0" "$@"
' sh -ec 'program_path=$(mktemp -d)

printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
' '
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *

def stage_1(n: int, data: dsl.Output[dsl.Dataset], seed: int) -> None:
    """Stage 1."""
    from numpy import random

    random.seed(seed)
    x = random.standard_normal(n)
    with open(data.path, "w") as file:
        x.tofile(file)

' --executor_input '{"inputs": {"parameterValues": {"n": 1000, "seed": 23}, "artifacts": {}}, "outputs": {"artifacts": {"data": {"artifacts": [{"name": "data", "uri": "gs://object-storage-bucket/data"}]}}, "outputFile": "object-storage-bucket/stage-1/output_metadata.json"}}' --function_to_execute stage_1
[KFP Executor 2023-11-07 13:21:55,481 INFO]: Looking for component `stage_1` in --component_module_path `/var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.YtqXEolyXe/ephemeral_component.py`
[KFP Executor 2023-11-07 13:21:55,482 INFO]: Loading KFP component "stage_1" from /var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.YtqXEolyXe/ephemeral_component.py (directory "/var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.YtqXEolyXe" and module name "ephemeral_component")
[KFP Executor 2023-11-07 13:21:55,482 INFO]: Got executor_input:
{
    "inputs": {
        "parameterValues": {
            "n": 1000,
            "seed": 23
        },
        "artifacts": {}
    },
    "outputs": {
        "artifacts": {
            "data": {
                "artifacts": [
                    {
                        "name": "data",
                        "uri": "gs://object-storage-bucket/data"
                    }
                ]
            }
        },
        "outputFile": "object-storage-bucket/stage-1/output_metadata.json"
    }
}
[KFP Executor 2023-11-07 13:21:56,218 INFO]: Wrote executor output file to object-storage-bucket/stage-1/output_metadata.json.
nox > Session run_pipeline_task was successful.
nox > Running session run_pipeline_task
nox > Creating virtual environment (virtualenv) using python3.10 in .nox/run_pipeline_task
nox > python -m pip install kfp==2.4.0
nox > sed -i .bak 's/\/gcs\///' .nox/run_pipeline_task/lib/python3.10/site-packages/kfp/dsl/types/artifact_types.py
nox > sh -c '
if ! [ -x "$(command -v pip)" ]; then
    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi

PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location '"'"'kfp==2.4.0'"'"' '"'"'--no-deps'"'"' '"'"'typing-extensions>=3.7.4,<5; python_version<"3.9"'"'"'  &&  python3 -m pip install --quiet --no-warn-script-location '"'"'numpy'"'"' && "$0" "$@"
' sh -ec 'program_path=$(mktemp -d)

printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
' '
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *

def stage_2(data: dsl.Input[dsl.Dataset]) -> Dict[str, Any]:
    """Stage 2."""
    import numpy as np

    x = np.fromfile(data.path)
    return {"average": x.mean(), "std": x.std()}

' --executor_input '{"inputs": {"parameterValues": {}, "artifacts": {"data": {"name": "data", "artifacts": [{"uri": "gs://object-storage-bucket/data"}]}}}, "outputs": {"artifacts": {}, "outputFile": "object-storage-bucket/stage-2/output_metadata.json"}}' --function_to_execute stage_2
[KFP Executor 2023-11-07 13:22:03,723 INFO]: Looking for component `stage_2` in --component_module_path `/var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.zX0TENC8H2/ephemeral_component.py`
[KFP Executor 2023-11-07 13:22:03,723 INFO]: Loading KFP component "stage_2" from /var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.zX0TENC8H2/ephemeral_component.py (directory "/var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.zX0TENC8H2" and module name "ephemeral_component")
[KFP Executor 2023-11-07 13:22:03,724 INFO]: Got executor_input:
{
    "inputs": {
        "parameterValues": {},
        "artifacts": {
            "data": {
                "name": "data",
                "artifacts": [
                    {
                        "uri": "gs://object-storage-bucket/data"
                    }
                ]
            }
        }
    },
    "outputs": {
        "artifacts": {},
        "outputFile": "object-storage-bucket/stage-2/output_metadata.json"
    }
}
[KFP Executor 2023-11-07 13:22:04,564 INFO]: Wrote executor output file to object-storage-bucket/stage-2/output_metadata.json.
nox > Session run_pipeline_task was successful.
nox > Running session run_pipeline_task
nox > Creating virtual environment (virtualenv) using python3.10 in .nox/run_pipeline_task
nox > python -m pip install kfp==2.4.0
nox > sed -i .bak 's/\/gcs\///' .nox/run_pipeline_task/lib/python3.10/site-packages/kfp/dsl/types/artifact_types.py
nox > sh -c '
if ! [ -x "$(command -v pip)" ]; then
    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
fi

PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location '"'"'kfp==2.4.0'"'"' '"'"'--no-deps'"'"' '"'"'typing-extensions>=3.7.4,<5; python_version<"3.9"'"'"'  &&  python3 -m pip install --quiet --no-warn-script-location '"'"'numpy'"'"' && "$0" "$@"
' sh -ec 'program_path=$(mktemp -d)

printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main                         --component_module_path                         "$program_path/ephemeral_component.py"                         "$@"
' '
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *

def stage_3(aggs: Dict[str, float]) -> None:
    """Stage 3."""
    print(f"x_average={aggs['"'"'average'"'"']}")
    print(f"x_std={aggs['"'"'std'"'"']}")

' --executor_input '{"inputs": {"parameterValues": {"aggs": {"average": -0.061227051591934534, "std": 0.9617933551902902}}, "artifacts": {}}, "outputs": {"artifacts": {}, "outputFile": "object-storage-bucket/stage-3/output_metadata.json"}}' --function_to_execute stage_3
[KFP Executor 2023-11-07 13:22:12,126 INFO]: Looking for component `stage_3` in --component_module_path `/var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.ZQ68zUMrVG/ephemeral_component.py`
[KFP Executor 2023-11-07 13:22:12,126 INFO]: Loading KFP component "stage_3" from /var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.ZQ68zUMrVG/ephemeral_component.py (directory "/var/folders/xk/fqw466sd3l7fztkl08x8hnk80000gn/T/tmp.ZQ68zUMrVG" and module name "ephemeral_component")
[KFP Executor 2023-11-07 13:22:12,127 INFO]: Got executor_input:
{
    "inputs": {
        "parameterValues": {
            "aggs": {
                "average": -0.061227051591934534,
                "std": 0.9617933551902902
            }
        },
        "artifacts": {}
    },
    "outputs": {
        "artifacts": {},
        "outputFile": "object-storage-bucket/stage-3/output_metadata.json"
    }
}
x_average=-0.061227051591934534
x_std=0.9617933551902902
[KFP Executor 2023-11-07 13:22:12,127 INFO]: Wrote executor output file to object-storage-bucket/stage-3/output_metadata.json.
nox > Session run_pipeline_task was successful.

About

Local pipelines for local people

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages