From 89b7208b3868cbaed1e1dc00d778af4d649d5558 Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Tue, 16 May 2023 14:34:22 +0000 Subject: [PATCH 01/15] feat: fly.io cloudprovider --- dask_cloudprovider/__init__.py | 6 + dask_cloudprovider/cloudprovider.yaml | 8 + dask_cloudprovider/fly/__init__.py | 1 + dask_cloudprovider/fly/machine.py | 260 +++++++++++++++++++ dask_cloudprovider/fly/tests/test_droplet.py | 76 ++++++ setup.py | 1 + 6 files changed, 352 insertions(+) create mode 100644 dask_cloudprovider/fly/__init__.py create mode 100644 dask_cloudprovider/fly/machine.py create mode 100644 dask_cloudprovider/fly/tests/test_droplet.py diff --git a/dask_cloudprovider/__init__.py b/dask_cloudprovider/__init__.py index ced23cb2..06ec6f8c 100755 --- a/dask_cloudprovider/__init__.py +++ b/dask_cloudprovider/__init__.py @@ -42,3 +42,9 @@ def __getattr__(name): "DigitalOcean cluster managers must be imported from the digitalocean subpackage. " f"Please import dask_cloudprovider.digitalocean.{name}" ) + + if name in ["FlyCluster"]: + raise ImportError( + "Fly.io cluster managers must be imported from the fly subpackage. " + f"Please import dask_cloudprovider.fly.{name}" + ) \ No newline at end of file diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index 3e7c5ce7..4819bee8 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -116,3 +116,11 @@ cloudprovider: image: "ubuntu-20.04" # Operating System image to use docker_image: "daskdev/dask:latest" # docker image to use bootstrap: true # It is assumed that the OS image does not have Docker and needs bootstrapping. Set this to false if using a custom image with Docker already installed. + + fly: + token: null # API token for interacting with the Fly API + region: "lax" # Region to launch Droplets in + size: "shared-cpu-1x" # Droplet size to launch, default is 1GB RAM, 1 vCPU + docker_image: "daskdev/dask:latest" # Operating System image to use + memory_mb: 1024 # Memory in MB to use for the scheduler and workers + cpus: 1 # Number of CPUs to use for the scheduler and workers \ No newline at end of file diff --git a/dask_cloudprovider/fly/__init__.py b/dask_cloudprovider/fly/__init__.py new file mode 100644 index 00000000..b55c832a --- /dev/null +++ b/dask_cloudprovider/fly/__init__.py @@ -0,0 +1 @@ +from .fly import FlyCluster diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py new file mode 100644 index 00000000..fa79734e --- /dev/null +++ b/dask_cloudprovider/fly/machine.py @@ -0,0 +1,260 @@ +import asyncio + +import dask +from dask_cloudprovider.generic.vmcluster import ( + VMCluster, + VMInterface, + SchedulerMixin, + WorkerMixin, +) + +try: + import fly_python_sdk +except ImportError as e: + msg = ( + "Dask Cloud Provider Fly.io requirements are not installed.\n\n" + "Please pip install as follows:\n\n" + ' pip install "dask-cloudprovider[fly]" --upgrade # or python -m pip install' + ) + raise ImportError(msg) from e + + +class FlyMachine(VMInterface): + def __init__( + self, + cluster: str, + config, + *args, + region: str = None, + # size: str = None, + docker_image = None, + env_vars = None, + extra_bootstrap = None, + machine_id = None, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.machine = None + self.cluster = cluster + self.config = config + self.region = region + self.size = self.config.get("size", "shared-cpu-1x") + self.cpus = self.config.get("cpus", 1) + self.memory_mb = self.config.get("memory_mb", 1024) + self.image = None + self.gpu_instance = False + self.bootstrap = True + self.extra_bootstrap = extra_bootstrap + self.docker_image = docker_image + self.env_vars = env_vars + + async def create_vm(self): + config = fly_python_sdk.models.machines.FlyMachineConfig( + env=self.env_vars, + init=fly_python_sdk.models.machines.FlyMachineConfigInit( + cmd=[self.command], + ), + image=self.image, + metadata=None, + restart=None, + services=[ + fly_python_sdk.models.machines.FlyMachineConfigServices( + ports=[ + fly_python_sdk.models.machines.FlyMachineConfigServicesPorts( + port=8786, + handlers=["http", "tls"] + ), + ], + protocol="tcp", + internal_port=8786, + ), + fly_python_sdk.models.machines.FlyMachineConfigServices( + ports=[ + fly_python_sdk.models.machines.FlyMachineConfigServicesPorts( + port=8787, + handlers=["http", "tls"] + ), + ], + protocol="tcp", + internal_port=8787, + ), + ], + guest=fly_python_sdk.models.machines.FlyMachineConfigGuest( + cpu_kind="shared", + cpus=self.cpus, + memory_mb=self.memory_mb, + ), + size=self.size, + metrics=None, + processes=[ + fly_python_sdk.models.machines.FlyMachineConfigProcess( + name="app", + cmd=[self.command], + env=self.env_vars, + ) + ] + ) + self.machine = fly_python_sdk.create_machine( + app_name=self.config.get("app_name"), + token=self.config.get("token"), + name=self.name, + region=self.region, + image=self.image, + size_slug=self.size, + backups=False, + user_data=self.cluster.render_process_cloud_init(self), + ) + self.cluster._log(f"Created machine {self.name}") + return self.machine.private_ip, None + + async def destroy_vm(self): + self.destroy_machine( + app_name=self.config.get("app_name"), + machine_id=self.machine.id, + ) + self.cluster._log(f"Terminated droplet {self.name}") + + +class FlyMachineScheduler(SchedulerMixin, FlyMachine): + """Scheduler running on a Fly.io Machine.""" + + +class FlyMachineWorker(WorkerMixin, FlyMachine): + """Worker running on a Fly.io Machine.""" + + +class FlyMachineCluster(VMCluster): + """Cluster running on Fly.io Machines. + + VMs in Fly.io (FLY) are referred to as machines. This cluster manager constructs a Dask cluster + running on VMs. + + When configuring your cluster you may find it useful to install the ``flyctl`` tool for querying the + CLY API for available options. + + https://fly.io/docs/hands-on/install-flyctl/ + + Parameters + ---------- + region: str + The DO region to launch you cluster in. A full list can be obtained with ``doctl compute region list``. + size: str + The VM size slug. You can get a full list with ``doctl compute size list``. + The default is ``s-1vcpu-1gb`` which is 1GB RAM and 1 vCPU + image: str + The image ID to use for the host OS. This should be a Ubuntu variant. + You can list available images with ``doctl compute image list --public | grep ubuntu.*x64``. + worker_module: str + The Dask worker module to start on worker VMs. + n_workers: int + Number of workers to initialise the cluster with. Defaults to ``0``. + worker_module: str + The Python module to run for the worker. Defaults to ``distributed.cli.dask_worker`` + worker_options: dict + Params to be passed to the worker class. + See :class:`distributed.worker.Worker` for default worker class. + If you set ``worker_module`` then refer to the docstring for the custom worker class. + scheduler_options: dict + Params to be passed to the scheduler class. + See :class:`distributed.scheduler.Scheduler`. + docker_image: string (optional) + The Docker image to run on all instances. + + This image must have a valid Python environment and have ``dask`` installed in order for the + ``dask-scheduler`` and ``dask-worker`` commands to be available. It is recommended the Python + environment matches your local environment where ``EC2Cluster`` is being created from. + + For GPU instance types the Docker image much have NVIDIA drivers and ``dask-cuda`` installed. + + By default the ``daskdev/dask:latest`` image will be used. + docker_args: string (optional) + Extra command line arguments to pass to Docker. + extra_bootstrap: list[str] (optional) + Extra commands to be run during the bootstrap phase. + env_vars: dict (optional) + Environment variables to be passed to the worker. + silence_logs: bool + Whether or not we should silence logging when setting up the cluster. + asynchronous: bool + If this is intended to be used directly within an event loop with + async/await + security : Security or bool, optional + Configures communication security in this cluster. Can be a security + object, or True. If True, temporary self-signed credentials will + be created automatically. Default is ``True``. + debug: bool, optional + More information will be printed when constructing clusters to enable debugging. + + Examples + -------- + + Create the cluster. + + >>> from dask_cloudprovider.digitalocean import DropletCluster + >>> cluster = DropletCluster(n_workers=1) + Creating scheduler instance + Created droplet dask-38b817c1-scheduler + Waiting for scheduler to run + Scheduler is running + Creating worker instance + Created droplet dask-38b817c1-worker-dc95260d + + Connect a client. + + >>> from dask.distributed import Client + >>> client = Client(cluster) + + Do some work. + + >>> import dask.array as da + >>> arr = da.random.random((1000, 1000), chunks=(100, 100)) + >>> arr.mean().compute() + 0.5001550986751964 + + Close the cluster + + >>> client.close() + >>> cluster.close() + Terminated droplet dask-38b817c1-worker-dc95260d + Terminated droplet dask-38b817c1-scheduler + + You can also do this all in one go with context managers to ensure the cluster is + created and cleaned up. + + >>> with DropletCluster(n_workers=1) as cluster: + ... with Client(cluster) as client: + ... print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute()) + Creating scheduler instance + Created droplet dask-48efe585-scheduler + Waiting for scheduler to run + Scheduler is running + Creating worker instance + Created droplet dask-48efe585-worker-5181aaf1 + 0.5000558682356162 + Terminated droplet dask-48efe585-worker-5181aaf1 + Terminated droplet dask-48efe585-scheduler + + """ + + def __init__( + self, + region: str = None, + size: str = None, + image: str = None, + debug: bool = False, + **kwargs, + ): + self.config = dask.config.get("cloudprovider.fly", {}) + self.scheduler_class = FlyMachineScheduler + self.worker_class = FlyMachineWorker + self.debug = debug + self.options = { + "cluster": self, + "config": self.config, + "region": region if region is not None else self.config.get("region"), + "size": size if size is not None else self.config.get("size"), + "image": image if image is not None else self.config.get("image"), + } + self.scheduler_options = {**self.options} + self.worker_options = {**self.options} + super().__init__(debug=debug, **kwargs) diff --git a/dask_cloudprovider/fly/tests/test_droplet.py b/dask_cloudprovider/fly/tests/test_droplet.py new file mode 100644 index 00000000..8be129eb --- /dev/null +++ b/dask_cloudprovider/fly/tests/test_droplet.py @@ -0,0 +1,76 @@ +import pytest + +import dask + +digitalocean = pytest.importorskip("digitalocean") + +from dask_cloudprovider.digitalocean.droplet import DropletCluster +from dask.distributed import Client +from distributed.core import Status + + +async def skip_without_credentials(config): + if config.get("token") is None: + pytest.skip( + """ + You must configure a Digital Ocean API token to run this test. + + Either set this in your config + + # cloudprovider.yaml + cloudprovider: + digitalocean: + token: "yourtoken" + + Or by setting it as an environment variable + + export DASK_CLOUDPROVIDER__DIGITALOCEAN__TOKEN="yourtoken" + + """ + ) + + +@pytest.fixture +async def config(): + return dask.config.get("cloudprovider.digitalocean", {}) + + +@pytest.fixture +@pytest.mark.external +async def cluster(config): + await skip_without_credentials(config) + async with DropletCluster(asynchronous=True) as cluster: + yield cluster + + +@pytest.mark.asyncio +@pytest.mark.external +async def test_init(): + cluster = DropletCluster(asynchronous=True) + assert cluster.status == Status.created + + +@pytest.mark.asyncio +@pytest.mark.timeout(600) +@pytest.mark.external +async def test_create_cluster(cluster): + assert cluster.status == Status.running + + cluster.scale(1) + await cluster + assert len(cluster.workers) == 1 + + async with Client(cluster, asynchronous=True) as client: + + def inc(x): + return x + 1 + + assert await client.submit(inc, 10).result() == 11 + + +@pytest.mark.asyncio +async def test_get_cloud_init(): + cloud_init = DropletCluster.get_cloud_init( + docker_args="--privileged", + ) + assert " --privileged " in cloud_init diff --git a/setup.py b/setup.py index 7d4e96ca..3dd9855e 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,7 @@ "digitalocean": ["python-digitalocean>=1.15.0"], "gcp": ["google-api-python-client>=1.12.5", "google-auth>=1.23.0"], "hetzner": ["hcloud>=1.10.0"], + "fly": ["fly-python-sdk>=0.1.0"], } extras_require["all"] = set(pkg for pkgs in extras_require.values() for pkg in pkgs) From b800713edd163b41e95cff16544e31387058ddab Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Wed, 17 May 2023 02:23:35 -0700 Subject: [PATCH 02/15] customize the sdk for this project --- dask_cloudprovider/fly/__init__.py | 2 +- dask_cloudprovider/fly/machine.py | 163 ++++---- dask_cloudprovider/fly/sdk/__init__.py | 8 + dask_cloudprovider/fly/sdk/constants.py | 56 +++ dask_cloudprovider/fly/sdk/exceptions.py | 40 ++ dask_cloudprovider/fly/sdk/fly.py | 348 ++++++++++++++++++ dask_cloudprovider/fly/sdk/models/__init__.py | 4 + .../fly/sdk/models/apps/__init__.py | 23 ++ .../fly/sdk/models/machines/__init__.py | 149 ++++++++ setup.py | 5 +- 10 files changed, 731 insertions(+), 67 deletions(-) create mode 100644 dask_cloudprovider/fly/sdk/__init__.py create mode 100644 dask_cloudprovider/fly/sdk/constants.py create mode 100644 dask_cloudprovider/fly/sdk/exceptions.py create mode 100644 dask_cloudprovider/fly/sdk/fly.py create mode 100644 dask_cloudprovider/fly/sdk/models/__init__.py create mode 100644 dask_cloudprovider/fly/sdk/models/apps/__init__.py create mode 100644 dask_cloudprovider/fly/sdk/models/machines/__init__.py diff --git a/dask_cloudprovider/fly/__init__.py b/dask_cloudprovider/fly/__init__.py index b55c832a..147733f1 100644 --- a/dask_cloudprovider/fly/__init__.py +++ b/dask_cloudprovider/fly/__init__.py @@ -1 +1 @@ -from .fly import FlyCluster +from . import FlyMachine, FlyMachineWorker, FlyMachineScheduler, FlyMachineCluster diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index fa79734e..16519ed3 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -1,5 +1,4 @@ -import asyncio - +import uuid import dask from dask_cloudprovider.generic.vmcluster import ( VMCluster, @@ -9,7 +8,15 @@ ) try: - import fly_python_sdk + from .sdk.models.machines import ( + FlyMachineConfig, + FlyMachineConfigInit, + FlyMachineConfigServices, + FlyMachineRequestConfigServicesPort, + FlyMachineConfigGuest, + FlyMachineConfigProcess + ) + from .sdk.fly import Fly except ImportError as e: msg = ( "Dask Cloud Provider Fly.io requirements are not installed.\n\n" @@ -25,42 +32,60 @@ def __init__( cluster: str, config, *args, - region: str = None, - # size: str = None, - docker_image = None, - env_vars = None, + region: str = "sjc", + vm_size: str = "shared-cpu-1x", + memory_mb = 1024, + cpus = 1, + image = "daskdev/dask:latest", + env_vars = {}, extra_bootstrap = None, - machine_id = None, + metadata = {}, + restart = {}, **kwargs, ): super().__init__(*args, **kwargs) self.machine = None self.cluster = cluster self.config = config - self.region = region - self.size = self.config.get("size", "shared-cpu-1x") - self.cpus = self.config.get("cpus", 1) - self.memory_mb = self.config.get("memory_mb", 1024) - self.image = None + self.region = self.config.get("region", region or "sjc") + self.vm_size = self.config.get("vm_size", vm_size or "shared-cpu-1x") + self.cpus = self.config.get("cpus", cpus or 1) + self.memory_mb = self.config.get("memory_mb", memory_mb or 1024) + self.image = self.config.get("image", image or "daskdev/dask:latest") self.gpu_instance = False self.bootstrap = True self.extra_bootstrap = extra_bootstrap - self.docker_image = docker_image - self.env_vars = env_vars + self.env_vars = self.config.get("env_vars", env_vars or {}) + self.metadata = self.config.get("metadata", metadata or {}) + self.restart = restart or {} + self.app_name = self.cluster.app_name + # We need the token + self.api_token = config.get("token") + if self.api_token is None: + raise ValueError("Fly.io API token must be provided") + self.fly = Fly(api_token=self.api_token) async def create_vm(self): - config = fly_python_sdk.models.machines.FlyMachineConfig( + machine_config = FlyMachineConfig( env=self.env_vars, - init=fly_python_sdk.models.machines.FlyMachineConfigInit( + init=FlyMachineConfigInit( cmd=[self.command], ), image=self.image, - metadata=None, - restart=None, + metadata=self.metadata, + restart=self.restart, services=[ - fly_python_sdk.models.machines.FlyMachineConfigServices( + FlyMachineConfigServices( ports=[ - fly_python_sdk.models.machines.FlyMachineConfigServicesPorts( + FlyMachineRequestConfigServicesPort( + port=80, + handlers=["http"] + ), + FlyMachineRequestConfigServicesPort( + port=443, + handlers=["http", "tls"] + ), + FlyMachineRequestConfigServicesPort( port=8786, handlers=["http", "tls"] ), @@ -68,9 +93,9 @@ async def create_vm(self): protocol="tcp", internal_port=8786, ), - fly_python_sdk.models.machines.FlyMachineConfigServices( + FlyMachineConfigServices( ports=[ - fly_python_sdk.models.machines.FlyMachineConfigServicesPorts( + FlyMachineRequestConfigServicesPort( port=8787, handlers=["http", "tls"] ), @@ -79,7 +104,7 @@ async def create_vm(self): internal_port=8787, ), ], - guest=fly_python_sdk.models.machines.FlyMachineConfigGuest( + guest=FlyMachineConfigGuest( cpu_kind="shared", cpus=self.cpus, memory_mb=self.memory_mb, @@ -87,32 +112,28 @@ async def create_vm(self): size=self.size, metrics=None, processes=[ - fly_python_sdk.models.machines.FlyMachineConfigProcess( + FlyMachineConfigProcess( name="app", cmd=[self.command], env=self.env_vars, ) ] ) - self.machine = fly_python_sdk.create_machine( - app_name=self.config.get("app_name"), - token=self.config.get("token"), - name=self.name, - region=self.region, - image=self.image, - size_slug=self.size, - backups=False, - user_data=self.cluster.render_process_cloud_init(self), + self.machine = self.fly.create_machine( + app_name=self.config.get("app_name"), # The name of the new Fly.io app. + config=machine_config, # A FlyMachineConfig object containing creation details. + name=self.name, # The name of the machine. + region=self.region, # The deployment region for the machine. ) self.cluster._log(f"Created machine {self.name}") return self.machine.private_ip, None async def destroy_vm(self): - self.destroy_machine( + self.fly.destroy_machine( app_name=self.config.get("app_name"), machine_id=self.machine.id, ) - self.cluster._log(f"Terminated droplet {self.name}") + self.cluster._log(f"Terminated machine {self.name}") class FlyMachineScheduler(SchedulerMixin, FlyMachine): @@ -137,13 +158,18 @@ class FlyMachineCluster(VMCluster): Parameters ---------- region: str - The DO region to launch you cluster in. A full list can be obtained with ``doctl compute region list``. - size: str - The VM size slug. You can get a full list with ``doctl compute size list``. - The default is ``s-1vcpu-1gb`` which is 1GB RAM and 1 vCPU + The FLY region to launch your cluster in. A full list can be obtained with ``flyctl platform regions``. + vm_size: str + The VM size slug. You can get a full list with ``flyctl platform sizes``. + The default is ``shared-cpu-1x`` which is 256GB RAM and 1 vCPU image: str - The image ID to use for the host OS. This should be a Ubuntu variant. - You can list available images with ``doctl compute image list --public | grep ubuntu.*x64``. + The Docker image to run on all instances. + + This image must have a valid Python environment and have ``dask`` installed in order for the + ``dask-scheduler`` and ``dask-worker`` commands to be available. It is recommended the Python + environment matches your local environment where ``FlyMachineCluster`` is being created from. + + By default the ``daskdev/dask:latest`` image will be used. worker_module: str The Dask worker module to start on worker VMs. n_workers: int @@ -157,18 +183,6 @@ class FlyMachineCluster(VMCluster): scheduler_options: dict Params to be passed to the scheduler class. See :class:`distributed.scheduler.Scheduler`. - docker_image: string (optional) - The Docker image to run on all instances. - - This image must have a valid Python environment and have ``dask`` installed in order for the - ``dask-scheduler`` and ``dask-worker`` commands to be available. It is recommended the Python - environment matches your local environment where ``EC2Cluster`` is being created from. - - For GPU instance types the Docker image much have NVIDIA drivers and ``dask-cuda`` installed. - - By default the ``daskdev/dask:latest`` image will be used. - docker_args: string (optional) - Extra command line arguments to pass to Docker. extra_bootstrap: list[str] (optional) Extra commands to be run during the bootstrap phase. env_vars: dict (optional) @@ -190,14 +204,14 @@ class FlyMachineCluster(VMCluster): Create the cluster. - >>> from dask_cloudprovider.digitalocean import DropletCluster - >>> cluster = DropletCluster(n_workers=1) + >>> from dask_cloudprovider.fly import FlyMachineCluster + >>> cluster = FlyMachineCluster(n_workers=1) Creating scheduler instance - Created droplet dask-38b817c1-scheduler + Created machine dask-38b817c1-scheduler Waiting for scheduler to run Scheduler is running Creating worker instance - Created droplet dask-38b817c1-worker-dc95260d + Created machine dask-38b817c1-worker-dc95260d Connect a client. @@ -215,24 +229,24 @@ class FlyMachineCluster(VMCluster): >>> client.close() >>> cluster.close() - Terminated droplet dask-38b817c1-worker-dc95260d - Terminated droplet dask-38b817c1-scheduler + Terminated machine dask-38b817c1-worker-dc95260d + Terminated machine dask-38b817c1-scheduler You can also do this all in one go with context managers to ensure the cluster is created and cleaned up. - >>> with DropletCluster(n_workers=1) as cluster: + >>> with FlyMachineCluster(n_workers=1) as cluster: ... with Client(cluster) as client: ... print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute()) Creating scheduler instance - Created droplet dask-48efe585-scheduler + Created machine dask-48efe585-scheduler Waiting for scheduler to run Scheduler is running Creating worker instance - Created droplet dask-48efe585-worker-5181aaf1 + Created machine dask-48efe585-worker-5181aaf1 0.5000558682356162 - Terminated droplet dask-48efe585-worker-5181aaf1 - Terminated droplet dask-48efe585-scheduler + Terminated machine dask-48efe585-worker-5181aaf1 + Terminated machine dask-48efe585-scheduler """ @@ -257,4 +271,23 @@ def __init__( } self.scheduler_options = {**self.options} self.worker_options = {**self.options} + self.app_name = f'dask-{str(uuid.uuid4())[:8]}' + self.api_token = self.config.get("token") + self.app = None super().__init__(debug=debug, **kwargs) + + def create_app(self): + """Create a Fly.io app.""" + if self.app is None: + self._log("Not creating app as it already exists") + return + self.app = self.fly.create_app(name=self.app_name) + self._log(f"Created app {self.app_name}") + + def delete_app(self): + """Delete a Fly.io app.""" + if self.app is None: + self._log("Not deleting app as it does not exist") + return + self.fly.delete_app(name=self.app_name) + self._log(f"Deleted app {self.app_name}") \ No newline at end of file diff --git a/dask_cloudprovider/fly/sdk/__init__.py b/dask_cloudprovider/fly/sdk/__init__.py new file mode 100644 index 00000000..610b6f50 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/__init__.py @@ -0,0 +1,8 @@ +"""A Python SDK for interacting with the Fly.io API.""" + +__version__ = "0.1" + +from . import constants, exceptions +from . import fly +from . import models +from . import sdk \ No newline at end of file diff --git a/dask_cloudprovider/fly/sdk/constants.py b/dask_cloudprovider/fly/sdk/constants.py new file mode 100644 index 00000000..9facf696 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/constants.py @@ -0,0 +1,56 @@ +# @see https://github.com/bwhli/fly-python-sdk/blob/main/fly_python_sdk/constants.py + +FLY_REGIONS = [ + "ams", # Amsterdam, Netherlands + "arn", # Stockholm, Sweden + "bog", # Bogotá, Colombia + "bos", # Boston, Massachusetts (US) + "cdg", # Paris, France + "den", # Denver, Colorado (US) + "dfw", # Dallas, Texas (US) + "ewr", # Secaucus, NJ (US) + "fra", # Frankfurt, Germany + "gdl", # Guadalajara, Mexico + "gig", # Rio de Janeiro, Brazil + "gru", # São Paulo + "hkg", # Hong Kong, Hong Kong + "iad", # Ashburn, Virginia (US) + "jnb", # Johannesburg, South Africa + "lax", # Los Angeles, California (US + "lhr", # London, United Kingdom + "maa", # Chennai (Madras), India + "mad", # Madrid, Spain + "mia", # Miami, Florida (US) + "nrt", # Tokyo, Japan + "ord", # Chicago, Illinois (US) + "otp", # Bucharest, Romania + "qro", # Querétaro, Mexico + "scl", # Santiago, Chile + "sea", # Seattle, Washington (US) + "sin", # Singapore, Singapore + "sjc", # San Jose, California (US) + "syd", # Sydney, Australia + "waw", # Warsaw, Poland + "yul", # Montreal, Canada + "yyz", # Toronto, Canada +] + +FLY_APP_VM_SIZES = [ + "shared-cpu-1x", + "dedicated-cpu-1x", + "dedicated-cpu-2x", + "dedicated-cpu-4x", + "dedicated-cpu-8x", +] + +FLY_MACHINE_VM_SIZES = [ + "shared-cpu-1x", + "shared-cpu-2x", + "shared-cpu-4x", + "shared-cpu-8x", + "performance-1x", + "performance-2x", + "performance-4x", + "performance-8x", + "performance-16x", +] \ No newline at end of file diff --git a/dask_cloudprovider/fly/sdk/exceptions.py b/dask_cloudprovider/fly/sdk/exceptions.py new file mode 100644 index 00000000..b62ce60a --- /dev/null +++ b/dask_cloudprovider/fly/sdk/exceptions.py @@ -0,0 +1,40 @@ +# @see https://github.com/bwhli/fly-python-sdk/blob/main/fly_python_sdk/exceptions.py + +class AppInterfaceError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message + + +class MissingApiHostnameError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message + + +class MissingApiTokenError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message + + +class MissingMachineIdsError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message + + +class MachineInterfaceError(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return self.message diff --git a/dask_cloudprovider/fly/sdk/fly.py b/dask_cloudprovider/fly/sdk/fly.py new file mode 100644 index 00000000..972cc97e --- /dev/null +++ b/dask_cloudprovider/fly/sdk/fly.py @@ -0,0 +1,348 @@ +# @see https://github.com/bwhli/fly-python-sdk/blob/main/fly_python_sdk/fly.py + +import os +from typing import Union + +import httpx +from pydantic import BaseModel + +from sdk.exceptions import ( + AppInterfaceError, + MachineInterfaceError, + MissingMachineIdsError, +) +from sdk.models.apps import FlyAppCreateRequest, FlyAppDetailsResponse, FlyAppDeleteRequest, FlyAppDeleteResponse +from sdk.models.machines import FlyMachineConfig, FlyMachineDetails + +class Fly: + """ + A class for interacting with the Fly.io platform. + """ + + def __init__(self, api_token: str) -> None: + self.api_token = api_token + self.api_version = 1 + + ######## + # Apps # + ######## + + async def create_app( + self, + app_name: str, + org_slug: str, + ) -> None: + """Creates a new app on Fly.io. + + Args: + app_name: The name of the new Fly.io app. + org_slug: The slug of the organization to create the app within. If None, the personal organization will be used. + """ + path = "apps" + app_details = FlyAppCreateRequest(app_name=app_name, org_slug=org_slug) + r = await self._make_api_post_request(path, app_details.dict()) + + # Raise an exception if HTTP status code is not 201. + if r.status_code != 201: + raise AppInterfaceError( + message=f"Unable to create {app_name} in {org_slug}!" + ) + + return FlyMachineDetails(**r.json()) + + async def get_app( + self, + app_name: str, + ) -> FlyAppDetailsResponse: + """Returns information about a Fly.io application. + + Args: + app_name: The name of the new Fly.io app. + """ + path = f"apps/{app_name}" + r = await self._make_api_get_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise AppInterfaceError(message=f"Unable to get {app_name}!") + + return FlyAppDetailsResponse(**r.json()) + + async def delete_app( + self, + app_name: str, + org_slug: str = None, + force: bool = False, + ) -> None: + """Deletes a Fly.io application. + + Args: + app_name: The name of the new Fly.io app. + org_slug: The slug of the organization. If None, the personal organization will be used. + force: If True, the app will be deleted even if it has machines. + """ + path = f"apps/{app_name}" + app_details = FlyAppDeleteRequest( + app_name=app_name, org_slug=org_slug, force=force + ) + r = await self._make_api_delete_request(path, app_details.dict()) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise AppInterfaceError(message=f"Unable to delete {app_name}!") + + return FlyAppDeleteResponse(**r.json()) + + ############ + # Machines # + ############ + + async def list_machines( + self, + app_name: str, + ids_only: bool = False, + ) -> Union[list[FlyMachineDetails], list[str]]: + """Returns a list of machines that belong to a Fly.io application. + + Args: + ids_only: If True, only machine IDs will be returned. Defaults to False. + """ + path = f"apps/{app_name}/machines" + r = await self._make_api_get_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise AppInterfaceError(message=f"Unable to get machines in {app_name}!") + + # Create a FlyMachineDetails object for each machine. + machines = [FlyMachineDetails(**machine) for machine in r.json()] + + # Filter and return a list of ids if ids_only is True. + if ids_only is True: + return [machine.id for machine in machines] + + return machines + + async def create_machine( + self, + app_name: str, + config: FlyMachineConfig, + name: str = None, + region: str = None, + ): + """Creates a Fly.io machine. + + Args: + app_name: The name of the new Fly.io app. + config: A FlyMachineConfig object containing creation details. + name: The name of the machine. + region: The deployment region for the machine. + """ + path = f"apps/{app_name}/machines" + + # Create Pydantic model for machine creation requests. + class _FlyMachineCreateRequest(BaseModel): + name: Union[str, None] = None + region: Union[str, None] = None + config: FlyMachineConfig + + # Create FlyMachineCreateRequest object + machine_create_request = _FlyMachineCreateRequest( + name=name, + region=region, + config=config, + ) + + r = await self._make_api_post_request( + path, + payload=machine_create_request.dict(exclude_defaults=True), + ) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise MachineInterfaceError( + message=f"{r.status_code}: Unable to create machine!" + ) + + return FlyMachineDetails(**r.json()) + + async def delete_machine( + self, + app_name: str, + machine_id: str, + ) -> None: + """Deletes a Fly.io machine. + + Args: + app_name: The name of the new Fly.io app. + machine_id: The id string for a Fly.io machine. + """ + path = f"apps/{app_name}/machines/{machine_id}" + r = await self._make_api_delete_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise MachineInterfaceError( + message=f"Unable to delete {machine_id} in {app_name}!" + ) + + return + + async def delete_machines( + self, + app_name: str, + machine_ids: list[str] = [], + delete_all: bool = False, + ) -> None: + """Deletes multiple Fly.io machines. + + Args: + app_name: The name of the new Fly.io app. + machine_ids: An array of machine IDs to delete. + delete_all: Delete all machines in the app if True. + """ + # If delete_all is True, override provided machine_ids. + if delete_all is True: + machine_ids = self.list_machines(app_name, ids_only=True) + + # Raise an exception if there are no machine IDs to delete. + if len(machine_ids) == 0: + raise MissingMachineIdsError( + "Please provide at least one machine ID to delete." + ) + + # Stop machines. + for machine_id in machine_ids: + self.stop_machine(app_name, machine_id) + + # Delete machines. + for machine_id in machine_ids: + self.delete_machine(app_name, machine_id) + + return + + async def get_machine( + self, + app_name: str, + machine_id: str, + ) -> FlyMachineDetails: + """Returns information about a Fly.io machine. + + Args: + app_name: The name of the new Fly.io app. + machine_id: The id string for a Fly.io machine. + """ + path = f"apps/{app_name}/machines/{machine_id}" + r = await self._make_api_get_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise MachineInterfaceError( + message=f"Unable to delete {machine_id} in {app_name}!" + ) + + return FlyMachineDetails(**r.json()) + + async def start_machine( + self, + app_name: str, + machine_id: str, + ) -> None: + """Starts a Fly.io machine. + + Args: + app_name: The name of the new Fly.io app. + machine_id: The id string for a Fly.io machine. + """ + path = f"apps/{app_name}/machines/{machine_id}/start" + r = await self._make_api_post_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise MachineInterfaceError( + message=f"Unable to start {machine_id} in {app_name}!" + ) + + return + + async def stop_machine( + self, + app_name: str, + machine_id: str, + ) -> None: + """Stop a Fly.io machine. + + Args: + app_name: The name of the new Fly.io app. + machine_id: The id string for a Fly.io machine. + """ + path = f"apps/{app_name}/machines/{machine_id}/stop" + r = await self._make_api_post_request(path) + + # Raise an exception if HTTP status code is not 200. + if r.status_code != 200: + raise MachineInterfaceError( + message=f"Unable to stop {machine_id} in {app_name}!" + ) + + return + + ############# + # Utilities # + ############# + + async def _make_api_delete_request( + self, + path: str, + ) -> httpx.Response: + """An internal function for making DELETE requests to the Fly.io API.""" + api_hostname = self._get_api_hostname() + url = f"{api_hostname}/v{self.api_version}/{path}" + async with httpx.AsyncClient() as client: + r = await client.delete(url, headers=self._generate_headers()) + r.raise_for_status() + return r + + async def _make_api_get_request( + self, + path: str, + ) -> httpx.Response: + """An internal function for making GET requests to the Fly.io API.""" + api_hostname = self._get_api_hostname() + url = f"{api_hostname}/v{self.api_version}/{path}" + async with httpx.AsyncClient() as client: + r = await client.get(url, headers=self._generate_headers()) + r.raise_for_status() + return r + + async def _make_api_post_request( + self, + path: str, + payload: dict = {}, + ) -> httpx.Response: + """An internal function for making POST requests to the Fly.io API.""" + api_hostname = self._get_api_hostname() + url = f"{api_hostname}/v{self.api_version}/{path}" + async with httpx.AsyncClient() as client: + r = await client.post(url, headers=self._generate_headers(), json=payload) + r.raise_for_status() + return r + + def _generate_headers(self) -> dict: + """Returns a dictionary containing headers for requests to the Fly.io API.""" + headers = { + "Authorization": f"Bearer {self.api_token}", + "Content-Type": "application/json", + } + return headers + + def _get_api_hostname(self) -> str: + """Returns the hostname that will be used to connect to the Fly.io API. + + Returns: + The hostname that will be used to connect to the Fly.io API. + If the FLY_API_HOSTNAME environment variable is not set, + the hostname returned will default to https://api.machines.dev. + """ + api_hostname = os.getenv("FLY_API_HOSTNAME", "https://api.machines.dev") + return api_hostname diff --git a/dask_cloudprovider/fly/sdk/models/__init__.py b/dask_cloudprovider/fly/sdk/models/__init__.py new file mode 100644 index 00000000..f345b025 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/models/__init__.py @@ -0,0 +1,4 @@ +from . import ( + apps, + machines, +) \ No newline at end of file diff --git a/dask_cloudprovider/fly/sdk/models/apps/__init__.py b/dask_cloudprovider/fly/sdk/models/apps/__init__.py new file mode 100644 index 00000000..1b01d5ba --- /dev/null +++ b/dask_cloudprovider/fly/sdk/models/apps/__init__.py @@ -0,0 +1,23 @@ +from pydantic import BaseModel +from typing import Union + + +class FlyAppCreateRequest(BaseModel): + app_name: str + org_slug: str + +class FlyAppDetailsResponse(BaseModel): + name: str + status: str + organization: dict + +class FlyAppDeleteRequest(BaseModel): + app_name: str + org_slug: Union[str, None] = None + force: bool = False + +class FlyAppDeleteResponse(BaseModel): + app_name: str + org_slug: str + status: str + organization: dict diff --git a/dask_cloudprovider/fly/sdk/models/machines/__init__.py b/dask_cloudprovider/fly/sdk/models/machines/__init__.py new file mode 100644 index 00000000..655b3494 --- /dev/null +++ b/dask_cloudprovider/fly/sdk/models/machines/__init__.py @@ -0,0 +1,149 @@ +# @see https://github.com/bwhli/fly-python-sdk/blob/main/fly_python_sdk/models/machines/__init__.py + +import logging +from datetime import datetime +from ipaddress import IPv6Address +from typing import Union + +from pydantic import BaseModel, validator + +# FlyMachineConfig.checks + + +class FlyMachineConfigCheck(BaseModel): + """Model for FlyMachineConfig.checks""" + + port: int + type: str + interval: str + timeout: str + method: str + path: str + + +# FlyMachineConfig.guest + + +class FlyMachineConfigGuest(BaseModel): + """Model for FlyMachineConfig.guest""" + + cpu_kind: str + cpus: int + memory_mb: int + + +# FlyMachineConfig.init + + +class FlyMachineConfigInit(BaseModel): + """Model for FlyMachineConfig.init""" + + exec: Union[str,None] + entrypoint: Union[str,None] + cmd: Union[str,None] + tty: bool + + +# FlyMachineConfig.mounts + + +class FlyMachineConfigMount(BaseModel): + volume: str + path: str + + +# FlyMachineConfig.processes + + +class FlyMachineConfigProcess(BaseModel): + name: str + entrypoint: list[str] + cmd: list[str] + env: dict[str, str] + user: Union[str,None] = None + + +# FlyMachineConfig.services.port + + +class FlyMachineRequestConfigServicesPort(BaseModel): + """Model for FlyMachineConfig.services.port""" + + port: int + handlers: list[str] + + @validator("port") + def validate_port(cls, port: int) -> int: + assert port >= 0 and port <= 65536 + return port + + @validator("handlers") + def validate_handlers(cls, handlers: list[str]) -> list[str]: + logging.debug(handlers) + # Only run validation if there is 1 or more handlers. + if len(handlers) > 0: + # Convert handlers to lowercase. + handlers = [handler.casefold() for handler in handlers] + assert ( + all(handler in ["http", "tcp", "tls", "udp"] for handler in handlers) + is True + ) + return handlers + + +# FlyMachineConfig.services + + +class FlyMachineConfigServices(BaseModel): + """Model for FlyMachineConfig.services""" + + ports: list[FlyMachineRequestConfigServicesPort] + protocol: str + internal_port: int + + @validator("internal_port") + def validate_internal_port(cls, internal_port: int) -> int: + assert internal_port >= 0 and internal_port <= 65536 + return internal_port + + @validator("protocol") + def validate_protocol(cls, protocol: str) -> str: + assert protocol in ["http", "tcp", "udp"] + return protocol + + +class FlyMachineConfig(BaseModel): + env: Union[dict[str, str], None] = None + init: Union[FlyMachineConfigInit, None] = None + image: str + metadata: Union[dict[str, str], None] = None + restart: Union[dict[str, str], None] = None + services: Union[list[FlyMachineConfigServices], None] = None + guest: Union[FlyMachineConfigGuest, None] = None + size: str = None + metrics: Union[None, Union[dict[str, str], dict[str, int]]] = None + processes: Union[list[FlyMachineConfigProcess], None] = None + schedule: Union[str, None] = None + mounts: Union[list[FlyMachineConfigMount], None] = None + checks: Union[dict[str, FlyMachineConfigCheck], None] = None + auto_destroy: bool = False + + +class FlyMachineImageRef(BaseModel): + registry: str + repository: str + tag: str + digest: str + + +class FlyMachineDetails(BaseModel): + id: str + name: str + state: str + region: str + instance_id: str + private_ip: IPv6Address + config: FlyMachineConfig + image_ref: FlyMachineImageRef + created_at: datetime + updated_at: datetime diff --git a/setup.py b/setup.py index 3dd9855e..0abf74e0 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,10 @@ "digitalocean": ["python-digitalocean>=1.15.0"], "gcp": ["google-api-python-client>=1.12.5", "google-auth>=1.23.0"], "hetzner": ["hcloud>=1.10.0"], - "fly": ["fly-python-sdk>=0.1.0"], + "fly": [ + "httpx>=0.24.0", + "pydantic>=1.10.7" + ], } extras_require["all"] = set(pkg for pkgs in extras_require.values() for pkg in pkgs) From a9ef03bd3d5939b64067e9a12b16d1704f38b3f8 Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Wed, 17 May 2023 02:26:11 -0700 Subject: [PATCH 03/15] preliminary test --- .../{test_droplet.py => test_machine.py} | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) rename dask_cloudprovider/fly/tests/{test_droplet.py => test_machine.py} (63%) diff --git a/dask_cloudprovider/fly/tests/test_droplet.py b/dask_cloudprovider/fly/tests/test_machine.py similarity index 63% rename from dask_cloudprovider/fly/tests/test_droplet.py rename to dask_cloudprovider/fly/tests/test_machine.py index 8be129eb..4b9e8518 100644 --- a/dask_cloudprovider/fly/tests/test_droplet.py +++ b/dask_cloudprovider/fly/tests/test_machine.py @@ -2,9 +2,9 @@ import dask -digitalocean = pytest.importorskip("digitalocean") +# sdk = pytest.importorskip(".sdk") -from dask_cloudprovider.digitalocean.droplet import DropletCluster +from dask_cloudprovider.fly.machine import FlyMachineCluster from dask.distributed import Client from distributed.core import Status @@ -13,18 +13,18 @@ async def skip_without_credentials(config): if config.get("token") is None: pytest.skip( """ - You must configure a Digital Ocean API token to run this test. + You must configure a Fly.io API token to run this test. Either set this in your config # cloudprovider.yaml cloudprovider: - digitalocean: + fly: token: "yourtoken" Or by setting it as an environment variable - export DASK_CLOUDPROVIDER__DIGITALOCEAN__TOKEN="yourtoken" + export DASK_CLOUDPROVIDER__FLY__TOKEN="yourtoken" """ ) @@ -32,21 +32,21 @@ async def skip_without_credentials(config): @pytest.fixture async def config(): - return dask.config.get("cloudprovider.digitalocean", {}) + return dask.config.get("cloudprovider.fly", {}) @pytest.fixture @pytest.mark.external async def cluster(config): await skip_without_credentials(config) - async with DropletCluster(asynchronous=True) as cluster: + async with FlyMachineCluster(asynchronous=True) as cluster: yield cluster @pytest.mark.asyncio @pytest.mark.external async def test_init(): - cluster = DropletCluster(asynchronous=True) + cluster = FlyMachineCluster(asynchronous=True) assert cluster.status == Status.created @@ -68,9 +68,9 @@ def inc(x): assert await client.submit(inc, 10).result() == 11 -@pytest.mark.asyncio -async def test_get_cloud_init(): - cloud_init = DropletCluster.get_cloud_init( - docker_args="--privileged", - ) - assert " --privileged " in cloud_init +# @pytest.mark.asyncio +# async def test_get_cloud_init(): +# cloud_init = FlyMachineCluster.get_cloud_init( +# docker_args="--privileged", +# ) +# assert " --privileged " in cloud_init From 0d551a1f2754fc082551bbe2e157768a47dae0ae Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Wed, 17 May 2023 02:28:20 -0700 Subject: [PATCH 04/15] trigger actions workflow From 765370e07567961857c8bb96c9494def940df68a Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Wed, 17 May 2023 02:35:12 -0700 Subject: [PATCH 05/15] pre-commit hooks --- dask_cloudprovider/__init__.py | 2 +- dask_cloudprovider/fly/machine.py | 44 ++++++++----------- dask_cloudprovider/fly/sdk/__init__.py | 2 +- dask_cloudprovider/fly/sdk/constants.py | 2 +- dask_cloudprovider/fly/sdk/exceptions.py | 1 + dask_cloudprovider/fly/sdk/fly.py | 11 ++++- dask_cloudprovider/fly/sdk/models/__init__.py | 2 +- .../fly/sdk/models/apps/__init__.py | 3 ++ .../fly/sdk/models/machines/__init__.py | 8 ++-- setup.py | 5 +-- 10 files changed, 41 insertions(+), 39 deletions(-) diff --git a/dask_cloudprovider/__init__.py b/dask_cloudprovider/__init__.py index 06ec6f8c..c6d40996 100755 --- a/dask_cloudprovider/__init__.py +++ b/dask_cloudprovider/__init__.py @@ -47,4 +47,4 @@ def __getattr__(name): raise ImportError( "Fly.io cluster managers must be imported from the fly subpackage. " f"Please import dask_cloudprovider.fly.{name}" - ) \ No newline at end of file + ) diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index 16519ed3..6eb9ab51 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -14,7 +14,7 @@ FlyMachineConfigServices, FlyMachineRequestConfigServicesPort, FlyMachineConfigGuest, - FlyMachineConfigProcess + FlyMachineConfigProcess, ) from .sdk.fly import Fly except ImportError as e: @@ -34,13 +34,13 @@ def __init__( *args, region: str = "sjc", vm_size: str = "shared-cpu-1x", - memory_mb = 1024, - cpus = 1, - image = "daskdev/dask:latest", - env_vars = {}, - extra_bootstrap = None, - metadata = {}, - restart = {}, + memory_mb=1024, + cpus=1, + image="daskdev/dask:latest", + env_vars={}, + extra_bootstrap=None, + metadata={}, + restart={}, **kwargs, ): super().__init__(*args, **kwargs) @@ -77,17 +77,12 @@ async def create_vm(self): services=[ FlyMachineConfigServices( ports=[ + FlyMachineRequestConfigServicesPort(port=80, handlers=["http"]), FlyMachineRequestConfigServicesPort( - port=80, - handlers=["http"] + port=443, handlers=["http", "tls"] ), FlyMachineRequestConfigServicesPort( - port=443, - handlers=["http", "tls"] - ), - FlyMachineRequestConfigServicesPort( - port=8786, - handlers=["http", "tls"] + port=8786, handlers=["http", "tls"] ), ], protocol="tcp", @@ -96,8 +91,7 @@ async def create_vm(self): FlyMachineConfigServices( ports=[ FlyMachineRequestConfigServicesPort( - port=8787, - handlers=["http", "tls"] + port=8787, handlers=["http", "tls"] ), ], protocol="tcp", @@ -117,13 +111,13 @@ async def create_vm(self): cmd=[self.command], env=self.env_vars, ) - ] + ], ) self.machine = self.fly.create_machine( - app_name=self.config.get("app_name"), # The name of the new Fly.io app. - config=machine_config, # A FlyMachineConfig object containing creation details. - name=self.name, # The name of the machine. - region=self.region, # The deployment region for the machine. + app_name=self.config.get("app_name"), # The name of the new Fly.io app. + config=machine_config, # A FlyMachineConfig object containing creation details. + name=self.name, # The name of the machine. + region=self.region, # The deployment region for the machine. ) self.cluster._log(f"Created machine {self.name}") return self.machine.private_ip, None @@ -271,7 +265,7 @@ def __init__( } self.scheduler_options = {**self.options} self.worker_options = {**self.options} - self.app_name = f'dask-{str(uuid.uuid4())[:8]}' + self.app_name = f"dask-{str(uuid.uuid4())[:8]}" self.api_token = self.config.get("token") self.app = None super().__init__(debug=debug, **kwargs) @@ -290,4 +284,4 @@ def delete_app(self): self._log("Not deleting app as it does not exist") return self.fly.delete_app(name=self.app_name) - self._log(f"Deleted app {self.app_name}") \ No newline at end of file + self._log(f"Deleted app {self.app_name}") diff --git a/dask_cloudprovider/fly/sdk/__init__.py b/dask_cloudprovider/fly/sdk/__init__.py index 610b6f50..d5c204e6 100644 --- a/dask_cloudprovider/fly/sdk/__init__.py +++ b/dask_cloudprovider/fly/sdk/__init__.py @@ -5,4 +5,4 @@ from . import constants, exceptions from . import fly from . import models -from . import sdk \ No newline at end of file +from . import sdk diff --git a/dask_cloudprovider/fly/sdk/constants.py b/dask_cloudprovider/fly/sdk/constants.py index 9facf696..eced3377 100644 --- a/dask_cloudprovider/fly/sdk/constants.py +++ b/dask_cloudprovider/fly/sdk/constants.py @@ -53,4 +53,4 @@ "performance-4x", "performance-8x", "performance-16x", -] \ No newline at end of file +] diff --git a/dask_cloudprovider/fly/sdk/exceptions.py b/dask_cloudprovider/fly/sdk/exceptions.py index b62ce60a..a937f442 100644 --- a/dask_cloudprovider/fly/sdk/exceptions.py +++ b/dask_cloudprovider/fly/sdk/exceptions.py @@ -1,5 +1,6 @@ # @see https://github.com/bwhli/fly-python-sdk/blob/main/fly_python_sdk/exceptions.py + class AppInterfaceError(Exception): def __init__(self, message): self.message = message diff --git a/dask_cloudprovider/fly/sdk/fly.py b/dask_cloudprovider/fly/sdk/fly.py index 972cc97e..5ab8abbb 100644 --- a/dask_cloudprovider/fly/sdk/fly.py +++ b/dask_cloudprovider/fly/sdk/fly.py @@ -11,9 +11,15 @@ MachineInterfaceError, MissingMachineIdsError, ) -from sdk.models.apps import FlyAppCreateRequest, FlyAppDetailsResponse, FlyAppDeleteRequest, FlyAppDeleteResponse +from sdk.models.apps import ( + FlyAppCreateRequest, + FlyAppDetailsResponse, + FlyAppDeleteRequest, + FlyAppDeleteResponse, +) from sdk.models.machines import FlyMachineConfig, FlyMachineDetails + class Fly: """ A class for interacting with the Fly.io platform. @@ -36,7 +42,8 @@ async def create_app( Args: app_name: The name of the new Fly.io app. - org_slug: The slug of the organization to create the app within. If None, the personal organization will be used. + org_slug: The slug of the organization to create the app within. + If None, the personal organization will be used. """ path = "apps" app_details = FlyAppCreateRequest(app_name=app_name, org_slug=org_slug) diff --git a/dask_cloudprovider/fly/sdk/models/__init__.py b/dask_cloudprovider/fly/sdk/models/__init__.py index f345b025..19087351 100644 --- a/dask_cloudprovider/fly/sdk/models/__init__.py +++ b/dask_cloudprovider/fly/sdk/models/__init__.py @@ -1,4 +1,4 @@ from . import ( apps, machines, -) \ No newline at end of file +) diff --git a/dask_cloudprovider/fly/sdk/models/apps/__init__.py b/dask_cloudprovider/fly/sdk/models/apps/__init__.py index 1b01d5ba..1253cdb9 100644 --- a/dask_cloudprovider/fly/sdk/models/apps/__init__.py +++ b/dask_cloudprovider/fly/sdk/models/apps/__init__.py @@ -6,16 +6,19 @@ class FlyAppCreateRequest(BaseModel): app_name: str org_slug: str + class FlyAppDetailsResponse(BaseModel): name: str status: str organization: dict + class FlyAppDeleteRequest(BaseModel): app_name: str org_slug: Union[str, None] = None force: bool = False + class FlyAppDeleteResponse(BaseModel): app_name: str org_slug: str diff --git a/dask_cloudprovider/fly/sdk/models/machines/__init__.py b/dask_cloudprovider/fly/sdk/models/machines/__init__.py index 655b3494..2f3cbf58 100644 --- a/dask_cloudprovider/fly/sdk/models/machines/__init__.py +++ b/dask_cloudprovider/fly/sdk/models/machines/__init__.py @@ -38,9 +38,9 @@ class FlyMachineConfigGuest(BaseModel): class FlyMachineConfigInit(BaseModel): """Model for FlyMachineConfig.init""" - exec: Union[str,None] - entrypoint: Union[str,None] - cmd: Union[str,None] + exec: Union[str, None] + entrypoint: Union[str, None] + cmd: Union[str, None] tty: bool @@ -60,7 +60,7 @@ class FlyMachineConfigProcess(BaseModel): entrypoint: list[str] cmd: list[str] env: dict[str, str] - user: Union[str,None] = None + user: Union[str, None] = None # FlyMachineConfig.services.port diff --git a/setup.py b/setup.py index 0abf74e0..37ef3df6 100644 --- a/setup.py +++ b/setup.py @@ -15,10 +15,7 @@ "digitalocean": ["python-digitalocean>=1.15.0"], "gcp": ["google-api-python-client>=1.12.5", "google-auth>=1.23.0"], "hetzner": ["hcloud>=1.10.0"], - "fly": [ - "httpx>=0.24.0", - "pydantic>=1.10.7" - ], + "fly": ["httpx>=0.24.0", "pydantic>=1.10.7"], } extras_require["all"] = set(pkg for pkgs in extras_require.values() for pkg in pkgs) From 9e23f17f664bbb6e136d71e5ae75d77de3682866 Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Wed, 17 May 2023 19:05:31 -0700 Subject: [PATCH 06/15] checkpointing --- dask_cloudprovider/__init__.py | 2 +- dask_cloudprovider/cloudprovider.yaml | 8 +- dask_cloudprovider/fly/__init__.py | 2 +- dask_cloudprovider/fly/machine.py | 293 ++++++++++++++---- dask_cloudprovider/fly/sdk/__init__.py | 3 +- dask_cloudprovider/fly/sdk/fly.py | 30 +- .../fly/sdk/models/apps/__init__.py | 7 +- .../fly/sdk/models/machines/__init__.py | 10 +- dask_cloudprovider/utils/socket.py | 19 ++ 9 files changed, 284 insertions(+), 90 deletions(-) diff --git a/dask_cloudprovider/__init__.py b/dask_cloudprovider/__init__.py index c6d40996..19f8b39f 100755 --- a/dask_cloudprovider/__init__.py +++ b/dask_cloudprovider/__init__.py @@ -43,7 +43,7 @@ def __getattr__(name): f"Please import dask_cloudprovider.digitalocean.{name}" ) - if name in ["FlyCluster"]: + if name in ["FlyMachineCluster"]: raise ImportError( "Fly.io cluster managers must be imported from the fly subpackage. " f"Please import dask_cloudprovider.fly.{name}" diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index 4819bee8..13b02197 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -120,7 +120,9 @@ cloudprovider: fly: token: null # API token for interacting with the Fly API region: "lax" # Region to launch Droplets in - size: "shared-cpu-1x" # Droplet size to launch, default is 1GB RAM, 1 vCPU - docker_image: "daskdev/dask:latest" # Operating System image to use + vm_size: "shared-cpu-1x" # Droplet size to launch, default is 1GB RAM, 1 vCPU + image: "ghcr.io/dask/dask:latest-py3.10" # Operating System image to use memory_mb: 1024 # Memory in MB to use for the scheduler and workers - cpus: 1 # Number of CPUs to use for the scheduler and workers \ No newline at end of file + cpus: 1 # Number of CPUs to use for the scheduler and workers + app_name: null # Name of Fly app to use. If it is blank, a random name will be generated. + org_slug: null # Organization slug to use. If it is blank, the personal organization will be used. \ No newline at end of file diff --git a/dask_cloudprovider/fly/__init__.py b/dask_cloudprovider/fly/__init__.py index 147733f1..1fbcc454 100644 --- a/dask_cloudprovider/fly/__init__.py +++ b/dask_cloudprovider/fly/__init__.py @@ -1 +1 @@ -from . import FlyMachine, FlyMachineWorker, FlyMachineScheduler, FlyMachineCluster +from .machine import FlyMachine, FlyMachineWorker, FlyMachineScheduler, FlyMachineCluster diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index 6eb9ab51..1dbc1b1a 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -1,21 +1,21 @@ import uuid import dask +import asyncio +import warnings from dask_cloudprovider.generic.vmcluster import ( VMCluster, VMInterface, SchedulerMixin, WorkerMixin, ) +from distributed.core import Status +from distributed.worker import Worker as _Worker +from distributed.scheduler import Scheduler as _Scheduler +from distributed.utils import cli_keywords +from dask_cloudprovider.utils.socket import async_socket_open try: - from .sdk.models.machines import ( - FlyMachineConfig, - FlyMachineConfigInit, - FlyMachineConfigServices, - FlyMachineRequestConfigServicesPort, - FlyMachineConfigGuest, - FlyMachineConfigProcess, - ) + from .sdk.models import machines from .sdk.fly import Fly except ImportError as e: msg = ( @@ -26,71 +26,86 @@ raise ImportError(msg) from e +# logger = logging.getLogger(__name__) + class FlyMachine(VMInterface): def __init__( self, cluster: str, config, *args, - region: str = "sjc", - vm_size: str = "shared-cpu-1x", - memory_mb=1024, - cpus=1, - image="daskdev/dask:latest", - env_vars={}, - extra_bootstrap=None, - metadata={}, - restart={}, + region: str = None, + vm_size: str = None, + memory_mb: int = None, + cpus: int = None, + image: str = None, + env_vars = None, + extra_bootstrap = None, + metadata = None, + restart = None, **kwargs, ): + print("machine args: ") + print(args) + print("machine kwargs: ") + print(kwargs) super().__init__(*args, **kwargs) self.machine = None self.cluster = cluster self.config = config - self.region = self.config.get("region", region or "sjc") - self.vm_size = self.config.get("vm_size", vm_size or "shared-cpu-1x") - self.cpus = self.config.get("cpus", cpus or 1) - self.memory_mb = self.config.get("memory_mb", memory_mb or 1024) - self.image = self.config.get("image", image or "daskdev/dask:latest") + self.region = region + self.vm_size = vm_size + self.cpus = 1 + self.memory_mb = 1024 + self.image = image self.gpu_instance = False self.bootstrap = True self.extra_bootstrap = extra_bootstrap - self.env_vars = self.config.get("env_vars", env_vars or {}) - self.metadata = self.config.get("metadata", metadata or {}) - self.restart = restart or {} + self.env_vars = env_vars + self.metadata = metadata + self.restart = restart self.app_name = self.cluster.app_name + self.set_env = 'DASK_INTERNAL__INHERIT_CONFIG="{}"'.format( + dask.config.serialize(dask.config.global_config) + ) # We need the token - self.api_token = config.get("token") + self.api_token = self.cluster.api_token if self.api_token is None: raise ValueError("Fly.io API token must be provided") - self.fly = Fly(api_token=self.api_token) + # set extra images + if "EXTRA_PIP_PACKAGES" in self.env_vars: + self.env_vars["EXTRA_PIP_PACKAGES"] += "dask[distributed]" + else: + self.env_vars["EXTRA_PIP_PACKAGES"] = " dask[distributed]" async def create_vm(self): - machine_config = FlyMachineConfig( + machine_config = machines.FlyMachineConfig( env=self.env_vars, - init=FlyMachineConfigInit( - cmd=[self.command], - ), + # init=machines.FlyMachineConfigInit( + # cmd=self.command, + # ), image=self.image, metadata=self.metadata, restart=self.restart, services=[ - FlyMachineConfigServices( + machines.FlyMachineConfigServices( ports=[ - FlyMachineRequestConfigServicesPort(port=80, handlers=["http"]), - FlyMachineRequestConfigServicesPort( + machines.FlyMachineRequestConfigServicesPort( + port=80, handlers=["http"] + ), + machines.FlyMachineRequestConfigServicesPort( port=443, handlers=["http", "tls"] ), - FlyMachineRequestConfigServicesPort( + machines.FlyMachineRequestConfigServicesPort( port=8786, handlers=["http", "tls"] ), ], protocol="tcp", internal_port=8786, ), - FlyMachineConfigServices( + machines.FlyMachineConfigServices( ports=[ - FlyMachineRequestConfigServicesPort( + machines.FlyMachineRequestConfigServicesPort( port=8787, handlers=["http", "tls"] ), ], @@ -98,45 +113,152 @@ async def create_vm(self): internal_port=8787, ), ], - guest=FlyMachineConfigGuest( + guest=machines.FlyMachineConfigGuest( cpu_kind="shared", cpus=self.cpus, memory_mb=self.memory_mb, ), - size=self.size, + size=self.vm_size, metrics=None, processes=[ - FlyMachineConfigProcess( + machines.FlyMachineConfigProcess( name="app", cmd=[self.command], env=self.env_vars, + user="root", + # entrypoint=["tini", "-g", "--", "/usr/bin/prepare.sh"] + # entrypoint=["/bin/bash", "/usr/bin/prepare.sh"] + entrypoint=["/bin/bash", "-c"] ) ], ) - self.machine = self.fly.create_machine( - app_name=self.config.get("app_name"), # The name of the new Fly.io app. + await self.wait_for_app() + self.machine = await self.cluster._fly().create_machine( + app_name=self.cluster.app_name, # The name of the new Fly.io app. config=machine_config, # A FlyMachineConfig object containing creation details. name=self.name, # The name of the machine. region=self.region, # The deployment region for the machine. ) self.cluster._log(f"Created machine {self.name}") - return self.machine.private_ip, None - + self.address = f'tcp://[{self.machine.private_ip}]:8786' + # self.external_address = f'tls://{self.cluster.app_name}.fly.dev:443' + self.host = f'{self.machine.id}.vm.{self.cluster.app_name}.internal' + self.internal_ip = self.machine.private_ip + self.port = 8786 + # self.address = f'tcp://{self.host}:{self.port}' + # self.cluster._log("FlyMachine.create_vm END") + return self.address, self.external_address + async def destroy_vm(self): - self.fly.destroy_machine( - app_name=self.config.get("app_name"), + if self.machine is None: + self.cluster._log("Not Terminating Machine: Machine does not exist") + return + await self.cluster._fly().delete_machine( + app_name=self.cluster.app_name, machine_id=self.machine.id, + force=True, ) self.cluster._log(f"Terminated machine {self.name}") + async def wait_for_scheduler(self): + self.cluster._log(f"Waiting for scheduler to run at {self.host}:{self.port}") + while not asyncio.create_task(async_socket_open(self.host, self.port)): + await asyncio.sleep(1) + # self.cluster._log("Scheduler is running") + + async def wait_for_app(self): + self.cluster._log("FlyMachine.wait_for_app") + while self.cluster.app_name is None or self.cluster.app is None: + self.cluster._log("Waiting for app to be created...") + await asyncio.sleep(1) + # self.cluster._log("FlyMachine.wait_for_app END") class FlyMachineScheduler(SchedulerMixin, FlyMachine): """Scheduler running on a Fly.io Machine.""" + def __init__( + self, + *args, + scheduler_options: dict = {}, + **kwargs, + ): + print("scheduler args: ") + print(args) + print("scheduler kwargs: ") + print(kwargs) + super().__init__(*args, **kwargs) + self.name = f"dask-{self.cluster.uuid}-scheduler" + self.port = scheduler_options.get("port", 8786) + self.command = " ".join( + [ + self.set_env, + "dask", + "scheduler" + ] + + cli_keywords(scheduler_options, cls=_Scheduler) + ) + + async def start(self): + self.cluster._log(f"Starting scheduler on {self.name}") + if self.cluster.app is None: + await self.cluster.create_app() + await self.cluster.wait_for_app() + await self.start_scheduler() + self.status = Status.running + # self.cluster._log("FlyMachineScheduler.start END") + + async def start_scheduler(self): + self.cluster._log("Creating scheduler instance") + address, external_address = await self.create_vm() + await self.wait_for_scheduler() + self.cluster._log(f"Scheduler running at {address}") + self.cluster.scheduler_internal_address = address + self.cluster.scheduler + # self.cluster.scheduler_external_address = external_address + self.cluster.scheduler_port = self.port + # self.cluster._log("FlyMachineScheduler.start_scheduler END") + class FlyMachineWorker(WorkerMixin, FlyMachine): """Worker running on a Fly.io Machine.""" + def __init__( + self, + scheduler: str, + cluster: str, + *args, + worker_class: str = "FlyMachineScheduler", + worker_options: dict = {}, + **kwargs, + ): + print("worker args: ") + print(args) + print("worker kwargs: ") + print(kwargs) + super().__init__(scheduler=scheduler, cluster=cluster, **kwargs) + self.scheduler = scheduler + self.cluster = cluster + self.worker_class = worker_class + self.name = f"dask-{self.cluster.uuid}-worker-{str(uuid.uuid4())[:8]}" + self.command = " ".join( + [ + self.set_env, + "dask", + "worker", + self.scheduler, + ] + + cli_keywords(worker_options, cls=_Worker), + ) + + async def start(self): + self.cluster._log("FlyMachineWorker.start") + await super().start() + await self.start_worker() + + async def start_worker(self): + self.cluster._log("Creating worker instance") + self.address, self.external_address = await self.create_vm() + # self.address = self.internal_ip class FlyMachineCluster(VMCluster): """Cluster running on Fly.io Machines. @@ -163,7 +285,7 @@ class FlyMachineCluster(VMCluster): ``dask-scheduler`` and ``dask-worker`` commands to be available. It is recommended the Python environment matches your local environment where ``FlyMachineCluster`` is being created from. - By default the ``daskdev/dask:latest`` image will be used. + By default the ``ghcr.io/dask/dask:latest`` image will be used. worker_module: str The Dask worker module to start on worker VMs. n_workers: int @@ -247,41 +369,82 @@ class FlyMachineCluster(VMCluster): def __init__( self, region: str = None, - size: str = None, + vm_size: str = None, image: str = None, + token: str = None, + memory_mb: int = None, + cpus: int = None, debug: bool = False, + app_name: str = None, **kwargs, ): self.config = dask.config.get("cloudprovider.fly", {}) self.scheduler_class = FlyMachineScheduler self.worker_class = FlyMachineWorker self.debug = debug + self.app = None + self.app_name = app_name + self._client = None self.options = { "cluster": self, "config": self.config, "region": region if region is not None else self.config.get("region"), - "size": size if size is not None else self.config.get("size"), + "vm_size": vm_size if vm_size is not None else self.config.get("vm_size"), "image": image if image is not None else self.config.get("image"), + "token": token if token is not None else self.config.get("token"), + "memory_mb": memory_mb if memory_mb is not None else self.config.get("memory_mb"), + "cpus": cpus if cpus is not None else self.config.get("cpus"), + "app_name": self.app_name, + "protocol": self.config.get("protocol", "tcp"), + "security": self.config.get("security", False), + "host": "fly-local-6pn", + "security": self.config.get("security", False), + } + self.scheduler_options = { + **self.options, } - self.scheduler_options = {**self.options} self.worker_options = {**self.options} - self.app_name = f"dask-{str(uuid.uuid4())[:8]}" - self.api_token = self.config.get("token") - self.app = None - super().__init__(debug=debug, **kwargs) + self.api_token = self.options["token"] + super().__init__( + debug=debug, + # worker_options=self.worker_options, + # scheduler_options=self.scheduler_options, + security=self.options["security"], + **kwargs + ) - def create_app(self): + def _fly(self): + if self._client is None: + self._client = Fly(api_token=self.api_token) + return self._client + + async def create_app(self): """Create a Fly.io app.""" - if self.app is None: - self._log("Not creating app as it already exists") + if self.app_name is not None: + warnings.warn("Not creating app as it already exists") return - self.app = self.fly.create_app(name=self.app_name) - self._log(f"Created app {self.app_name}") - - def delete_app(self): + app_name = f"dask-{str(uuid.uuid4())[:8]}" + try: + warnings.warn(f"trying to create app {app_name}") + self.app = await self._fly().create_app(app_name=app_name) + warnings.warn(f"Created app {app_name}") + self.app_name = app_name + except Exception as e: + warnings.warn(f"Failed to create app {app_name}") + self.app = "failed" + # self.app_name = "failed" + raise e + + async def delete_app(self): """Delete a Fly.io app.""" - if self.app is None: - self._log("Not deleting app as it does not exist") + if self.app_name is None: + warnings.warn("Not deleting app as it does not exist") return - self.fly.delete_app(name=self.app_name) - self._log(f"Deleted app {self.app_name}") + await self._fly().delete_app(app_name=self.app_name) + warnings.warn(f"Deleted app {self.app_name}") + + async def wait_for_app(self): + """Wait for the Fly.io app to be ready.""" + while self.app is None or self.app_name is None: + warnings.warn("Waiting for app to be created") + await asyncio.sleep(1) \ No newline at end of file diff --git a/dask_cloudprovider/fly/sdk/__init__.py b/dask_cloudprovider/fly/sdk/__init__.py index d5c204e6..ace5a033 100644 --- a/dask_cloudprovider/fly/sdk/__init__.py +++ b/dask_cloudprovider/fly/sdk/__init__.py @@ -2,7 +2,8 @@ __version__ = "0.1" -from . import constants, exceptions +from . import constants +from . import exceptions from . import fly from . import models from . import sdk diff --git a/dask_cloudprovider/fly/sdk/fly.py b/dask_cloudprovider/fly/sdk/fly.py index 5ab8abbb..2a156102 100644 --- a/dask_cloudprovider/fly/sdk/fly.py +++ b/dask_cloudprovider/fly/sdk/fly.py @@ -6,18 +6,19 @@ import httpx from pydantic import BaseModel -from sdk.exceptions import ( +from .exceptions import ( AppInterfaceError, MachineInterfaceError, MissingMachineIdsError, ) -from sdk.models.apps import ( +from .models.apps import ( FlyAppCreateRequest, + FlyAppCreateResponse, FlyAppDetailsResponse, FlyAppDeleteRequest, FlyAppDeleteResponse, ) -from sdk.models.machines import FlyMachineConfig, FlyMachineDetails +from .models.machines import FlyMachineConfig, FlyMachineDetails class Fly: @@ -36,8 +37,8 @@ def __init__(self, api_token: str) -> None: async def create_app( self, app_name: str, - org_slug: str, - ) -> None: + org_slug: str = "personal", + ) -> FlyAppCreateResponse: """Creates a new app on Fly.io. Args: @@ -51,11 +52,11 @@ async def create_app( # Raise an exception if HTTP status code is not 201. if r.status_code != 201: + error_msg = r.json().get("error", {}).get("message", "Unknown error!") raise AppInterfaceError( - message=f"Unable to create {app_name} in {org_slug}!" + message=f"Unable to create {app_name} in {org_slug}! Error: {error_msg}" ) - - return FlyMachineDetails(**r.json()) + return FlyAppCreateResponse(app_name=app_name, org_slug=org_slug) async def get_app( self, @@ -78,7 +79,7 @@ async def get_app( async def delete_app( self, app_name: str, - org_slug: str = None, + org_slug: str = "personal", force: bool = False, ) -> None: """Deletes a Fly.io application. @@ -177,14 +178,16 @@ async def delete_machine( self, app_name: str, machine_id: str, + force: bool = False, ) -> None: """Deletes a Fly.io machine. Args: app_name: The name of the new Fly.io app. machine_id: The id string for a Fly.io machine. + force: If True, the machine will be deleted even if it is running. """ - path = f"apps/{app_name}/machines/{machine_id}" + path = f"apps/{app_name}/machines/{machine_id}?force={str(force).lower()}" r = await self._make_api_delete_request(path) # Raise an exception if HTTP status code is not 200. @@ -200,6 +203,7 @@ async def delete_machines( app_name: str, machine_ids: list[str] = [], delete_all: bool = False, + force: bool = False, ) -> None: """Deletes multiple Fly.io machines. @@ -207,6 +211,7 @@ async def delete_machines( app_name: The name of the new Fly.io app. machine_ids: An array of machine IDs to delete. delete_all: Delete all machines in the app if True. + force: Delete even if running """ # If delete_all is True, override provided machine_ids. if delete_all is True: @@ -224,7 +229,7 @@ async def delete_machines( # Delete machines. for machine_id in machine_ids: - self.delete_machine(app_name, machine_id) + self.delete_machine(app_name, machine_id, force=force) return @@ -330,8 +335,9 @@ async def _make_api_post_request( """An internal function for making POST requests to the Fly.io API.""" api_hostname = self._get_api_hostname() url = f"{api_hostname}/v{self.api_version}/{path}" + headers = self._generate_headers() async with httpx.AsyncClient() as client: - r = await client.post(url, headers=self._generate_headers(), json=payload) + r = await client.post(url, headers=headers, json=payload) r.raise_for_status() return r diff --git a/dask_cloudprovider/fly/sdk/models/apps/__init__.py b/dask_cloudprovider/fly/sdk/models/apps/__init__.py index 1253cdb9..5cb056d6 100644 --- a/dask_cloudprovider/fly/sdk/models/apps/__init__.py +++ b/dask_cloudprovider/fly/sdk/models/apps/__init__.py @@ -4,8 +4,11 @@ class FlyAppCreateRequest(BaseModel): app_name: str - org_slug: str + org_slug: Union[str, None] = None +class FlyAppCreateResponse(BaseModel): + app_name: str + org_slug: Union[str, None] = None class FlyAppDetailsResponse(BaseModel): name: str @@ -21,6 +24,6 @@ class FlyAppDeleteRequest(BaseModel): class FlyAppDeleteResponse(BaseModel): app_name: str - org_slug: str + org_slug: Union[str, None] = None status: str organization: dict diff --git a/dask_cloudprovider/fly/sdk/models/machines/__init__.py b/dask_cloudprovider/fly/sdk/models/machines/__init__.py index 2f3cbf58..44197fb6 100644 --- a/dask_cloudprovider/fly/sdk/models/machines/__init__.py +++ b/dask_cloudprovider/fly/sdk/models/machines/__init__.py @@ -41,7 +41,7 @@ class FlyMachineConfigInit(BaseModel): exec: Union[str, None] entrypoint: Union[str, None] cmd: Union[str, None] - tty: bool + tty: bool = False # FlyMachineConfig.mounts @@ -56,10 +56,10 @@ class FlyMachineConfigMount(BaseModel): class FlyMachineConfigProcess(BaseModel): - name: str - entrypoint: list[str] - cmd: list[str] - env: dict[str, str] + name: str = "app" + entrypoint: Union[list[str], None] = None + cmd: Union[list[str], None] = None + env: Union[dict[str, str], None] = None user: Union[str, None] = None diff --git a/dask_cloudprovider/utils/socket.py b/dask_cloudprovider/utils/socket.py index 9acd1315..49e5d6ac 100644 --- a/dask_cloudprovider/utils/socket.py +++ b/dask_cloudprovider/utils/socket.py @@ -1,4 +1,5 @@ import socket +import asyncio def is_socket_open(ip, port): @@ -9,3 +10,21 @@ def is_socket_open(ip, port): return True except Exception: return False + +def is_ipv6_socket_open(ip, port): + connection = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + try: + connection.connect((ip, int(port))) + connection.shutdown(2) + return True + except Exception: + return False + +async def async_socket_open(address, port): + loop = asyncio.get_event_loop() + try: + res = await loop.sock_connect(address, port) + res.close() + return True + except Exception: + return False \ No newline at end of file From 4baba11aa394b0dbfa0139e351135dc8b5a663d8 Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Thu, 18 May 2023 17:17:45 -0700 Subject: [PATCH 07/15] working --- dask_cloudprovider/fly/machine.py | 143 +++++++----------- dask_cloudprovider/fly/sdk/__init__.py | 1 - dask_cloudprovider/fly/sdk/fly.py | 20 ++- .../fly/sdk/models/apps/__init__.py | 3 - 4 files changed, 66 insertions(+), 101 deletions(-) diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index 1dbc1b1a..65049314 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -1,6 +1,7 @@ import uuid import dask import asyncio +import json import warnings from dask_cloudprovider.generic.vmcluster import ( VMCluster, @@ -25,9 +26,6 @@ ) raise ImportError(msg) from e - -# logger = logging.getLogger(__name__) - class FlyMachine(VMInterface): def __init__( self, @@ -45,18 +43,14 @@ def __init__( restart = None, **kwargs, ): - print("machine args: ") - print(args) - print("machine kwargs: ") - print(kwargs) super().__init__(*args, **kwargs) self.machine = None self.cluster = cluster self.config = config self.region = region self.vm_size = vm_size - self.cpus = 1 - self.memory_mb = 1024 + self.cpus = cpus + self.memory_mb = memory_mb self.image = image self.gpu_instance = False self.bootstrap = True @@ -65,25 +59,14 @@ def __init__( self.metadata = metadata self.restart = restart self.app_name = self.cluster.app_name - self.set_env = 'DASK_INTERNAL__INHERIT_CONFIG="{}"'.format( - dask.config.serialize(dask.config.global_config) - ) - # We need the token + self.env_vars['DASK_INTERNAL__INHERIT_CONFIG'] = dask.config.serialize(dask.config.global_config) self.api_token = self.cluster.api_token if self.api_token is None: raise ValueError("Fly.io API token must be provided") - # set extra images - if "EXTRA_PIP_PACKAGES" in self.env_vars: - self.env_vars["EXTRA_PIP_PACKAGES"] += "dask[distributed]" - else: - self.env_vars["EXTRA_PIP_PACKAGES"] = " dask[distributed]" async def create_vm(self): machine_config = machines.FlyMachineConfig( env=self.env_vars, - # init=machines.FlyMachineConfigInit( - # cmd=self.command, - # ), image=self.image, metadata=self.metadata, restart=self.restart, @@ -97,7 +80,8 @@ async def create_vm(self): port=443, handlers=["http", "tls"] ), machines.FlyMachineRequestConfigServicesPort( - port=8786, handlers=["http", "tls"] + port=8786, + handlers=["http", "tls"] ), ], protocol="tcp", @@ -106,7 +90,8 @@ async def create_vm(self): machines.FlyMachineConfigServices( ports=[ machines.FlyMachineRequestConfigServicesPort( - port=8787, handlers=["http", "tls"] + port=8787, + handlers=["http", "tls"] ), ], protocol="tcp", @@ -118,17 +103,12 @@ async def create_vm(self): cpus=self.cpus, memory_mb=self.memory_mb, ), - size=self.vm_size, metrics=None, processes=[ machines.FlyMachineConfigProcess( name="app", - cmd=[self.command], + cmd=self.command, env=self.env_vars, - user="root", - # entrypoint=["tini", "-g", "--", "/usr/bin/prepare.sh"] - # entrypoint=["/bin/bash", "/usr/bin/prepare.sh"] - entrypoint=["/bin/bash", "-c"] ) ], ) @@ -139,14 +119,13 @@ async def create_vm(self): name=self.name, # The name of the machine. region=self.region, # The deployment region for the machine. ) - self.cluster._log(f"Created machine {self.name}") - self.address = f'tcp://[{self.machine.private_ip}]:8786' - # self.external_address = f'tls://{self.cluster.app_name}.fly.dev:443' + self.cluster._log(f"Created machine name={self.name} id={self.machine.id}") self.host = f'{self.machine.id}.vm.{self.cluster.app_name}.internal' self.internal_ip = self.machine.private_ip self.port = 8786 # self.address = f'tcp://{self.host}:{self.port}' - # self.cluster._log("FlyMachine.create_vm END") + self.address = f'{self.cluster.protocol}://[{self.machine.private_ip}]:8786' + # self.external_address = f'tls://{self.cluster.app_name}.fly.dev:443' return self.address, self.external_address async def destroy_vm(self): @@ -162,16 +141,16 @@ async def destroy_vm(self): async def wait_for_scheduler(self): self.cluster._log(f"Waiting for scheduler to run at {self.host}:{self.port}") - while not asyncio.create_task(async_socket_open(self.host, self.port)): + while not asyncio.create_task(async_socket_open(self.internal_ip, self.port)): await asyncio.sleep(1) - # self.cluster._log("Scheduler is running") + self.cluster._log("Scheduler is running") + return True async def wait_for_app(self): - self.cluster._log("FlyMachine.wait_for_app") + self.cluster._log("Waiting for app to be created...") while self.cluster.app_name is None or self.cluster.app is None: - self.cluster._log("Waiting for app to be created...") await asyncio.sleep(1) - # self.cluster._log("FlyMachine.wait_for_app END") + return True class FlyMachineScheduler(SchedulerMixin, FlyMachine): """Scheduler running on a Fly.io Machine.""" @@ -182,21 +161,14 @@ def __init__( scheduler_options: dict = {}, **kwargs, ): - print("scheduler args: ") - print(args) - print("scheduler kwargs: ") - print(kwargs) super().__init__(*args, **kwargs) self.name = f"dask-{self.cluster.uuid}-scheduler" self.port = scheduler_options.get("port", 8786) - self.command = " ".join( - [ - self.set_env, - "dask", - "scheduler" - ] - + cli_keywords(scheduler_options, cls=_Scheduler) - ) + self.command = [ + "python", + "-m", + "distributed.cli.dask_scheduler", + ]+ cli_keywords(scheduler_options, cls=_Scheduler) async def start(self): self.cluster._log(f"Starting scheduler on {self.name}") @@ -205,7 +177,6 @@ async def start(self): await self.cluster.wait_for_app() await self.start_scheduler() self.status = Status.running - # self.cluster._log("FlyMachineScheduler.start END") async def start_scheduler(self): self.cluster._log("Creating scheduler instance") @@ -213,10 +184,13 @@ async def start_scheduler(self): await self.wait_for_scheduler() self.cluster._log(f"Scheduler running at {address}") self.cluster.scheduler_internal_address = address - self.cluster.scheduler - # self.cluster.scheduler_external_address = external_address + self.cluster.scheduler_external_address = external_address self.cluster.scheduler_port = self.port - # self.cluster._log("FlyMachineScheduler.start_scheduler END") + + async def close(self): + await super().close() + if self.cluster.app_name is not None: + await self.cluster.delete_app() class FlyMachineWorker(WorkerMixin, FlyMachine): @@ -224,41 +198,41 @@ class FlyMachineWorker(WorkerMixin, FlyMachine): def __init__( self, - scheduler: str, - cluster: str, *args, - worker_class: str = "FlyMachineScheduler", + worker_module: str = None, + worker_class: str = None, worker_options: dict = {}, **kwargs, ): - print("worker args: ") - print(args) - print("worker kwargs: ") - print(kwargs) - super().__init__(scheduler=scheduler, cluster=cluster, **kwargs) - self.scheduler = scheduler - self.cluster = cluster - self.worker_class = worker_class - self.name = f"dask-{self.cluster.uuid}-worker-{str(uuid.uuid4())[:8]}" - self.command = " ".join( - [ - self.set_env, - "dask", - "worker", + super().__init__(*args, **kwargs) + if worker_module is not None: + self.worker_module = worker_module + self.command = [ + "python", + "-m", + self.worker_module, self.scheduler, + "--name", + str(self.name), + ] + cli_keywords(worker_options, cls=_Worker, cmd=self.worker_module) + if worker_class is not None: + self.worker_class = worker_class + self.command = [ + "python", + "-m", + "distributed.cli.dask_spec", + self.scheduler, + "--spec", + json.dumps( + { + "cls": self.worker_class, + "opts": { + **worker_options, + "name": self.name, + }, + } + ), ] - + cli_keywords(worker_options, cls=_Worker), - ) - - async def start(self): - self.cluster._log("FlyMachineWorker.start") - await super().start() - await self.start_worker() - - async def start_worker(self): - self.cluster._log("Creating worker instance") - self.address, self.external_address = await self.create_vm() - # self.address = self.internal_ip class FlyMachineCluster(VMCluster): """Cluster running on Fly.io Machines. @@ -407,8 +381,6 @@ def __init__( self.api_token = self.options["token"] super().__init__( debug=debug, - # worker_options=self.worker_options, - # scheduler_options=self.scheduler_options, security=self.options["security"], **kwargs ) @@ -432,7 +404,6 @@ async def create_app(self): except Exception as e: warnings.warn(f"Failed to create app {app_name}") self.app = "failed" - # self.app_name = "failed" raise e async def delete_app(self): diff --git a/dask_cloudprovider/fly/sdk/__init__.py b/dask_cloudprovider/fly/sdk/__init__.py index ace5a033..bd7c8862 100644 --- a/dask_cloudprovider/fly/sdk/__init__.py +++ b/dask_cloudprovider/fly/sdk/__init__.py @@ -6,4 +6,3 @@ from . import exceptions from . import fly from . import models -from . import sdk diff --git a/dask_cloudprovider/fly/sdk/fly.py b/dask_cloudprovider/fly/sdk/fly.py index 2a156102..d8db797f 100644 --- a/dask_cloudprovider/fly/sdk/fly.py +++ b/dask_cloudprovider/fly/sdk/fly.py @@ -79,27 +79,25 @@ async def get_app( async def delete_app( self, app_name: str, - org_slug: str = "personal", force: bool = False, ) -> None: """Deletes a Fly.io application. Args: app_name: The name of the new Fly.io app. - org_slug: The slug of the organization. If None, the personal organization will be used. - force: If True, the app will be deleted even if it has machines. + force: If True, the app will be deleted even if it has active machines. """ - path = f"apps/{app_name}" - app_details = FlyAppDeleteRequest( - app_name=app_name, org_slug=org_slug, force=force - ) - r = await self._make_api_delete_request(path, app_details.dict()) + path = f"apps/{app_name}?force={str(force).lower()}" + r = await self._make_api_delete_request(path) # Raise an exception if HTTP status code is not 200. - if r.status_code != 200: - raise AppInterfaceError(message=f"Unable to delete {app_name}!") + if r.status_code != 202: + raise AppInterfaceError(message=f"Unable to delete {app_name}! status_code={r.status_code}") - return FlyAppDeleteResponse(**r.json()) + return FlyAppDeleteResponse( + status=r.status_code, + app_name=app_name, + ) ############ # Machines # diff --git a/dask_cloudprovider/fly/sdk/models/apps/__init__.py b/dask_cloudprovider/fly/sdk/models/apps/__init__.py index 5cb056d6..7b365e4a 100644 --- a/dask_cloudprovider/fly/sdk/models/apps/__init__.py +++ b/dask_cloudprovider/fly/sdk/models/apps/__init__.py @@ -19,11 +19,8 @@ class FlyAppDetailsResponse(BaseModel): class FlyAppDeleteRequest(BaseModel): app_name: str org_slug: Union[str, None] = None - force: bool = False class FlyAppDeleteResponse(BaseModel): app_name: str - org_slug: Union[str, None] = None status: str - organization: dict From ac1b2152131057fd06a027dac7c9111fab13610b Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Sun, 21 May 2023 08:23:10 -0700 Subject: [PATCH 08/15] tidy up some loose ends --- dask_cloudprovider/cloudprovider.yaml | 4 +- dask_cloudprovider/fly/machine.py | 133 ++++++++++++++------------ dask_cloudprovider/fly/sdk/fly.py | 7 +- 3 files changed, 75 insertions(+), 69 deletions(-) diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index 13b02197..8b6044de 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -121,8 +121,8 @@ cloudprovider: token: null # API token for interacting with the Fly API region: "lax" # Region to launch Droplets in vm_size: "shared-cpu-1x" # Droplet size to launch, default is 1GB RAM, 1 vCPU - image: "ghcr.io/dask/dask:latest-py3.10" # Operating System image to use + image: "daskdev/dask:latest-py3.10" # Operating System image to use memory_mb: 1024 # Memory in MB to use for the scheduler and workers cpus: 1 # Number of CPUs to use for the scheduler and workers app_name: null # Name of Fly app to use. If it is blank, a random name will be generated. - org_slug: null # Organization slug to use. If it is blank, the personal organization will be used. \ No newline at end of file + org_slug: "personal" # Organization slug to use. If it is blank, the personal organization will be used. \ No newline at end of file diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index 65049314..1b07ffe2 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -61,8 +61,9 @@ def __init__( self.app_name = self.cluster.app_name self.env_vars['DASK_INTERNAL__INHERIT_CONFIG'] = dask.config.serialize(dask.config.global_config) self.api_token = self.cluster.api_token + self._client = None if self.api_token is None: - raise ValueError("Fly.io API token must be provided") + raise ValueError("[fly.io] API token must be provided") async def create_vm(self): machine_config = machines.FlyMachineConfig( @@ -112,14 +113,14 @@ async def create_vm(self): ) ], ) - await self.wait_for_app() - self.machine = await self.cluster._fly().create_machine( + # await self.wait_for_app() + self.machine = await self._fly().create_machine( app_name=self.cluster.app_name, # The name of the new Fly.io app. config=machine_config, # A FlyMachineConfig object containing creation details. name=self.name, # The name of the machine. region=self.region, # The deployment region for the machine. ) - self.cluster._log(f"Created machine name={self.name} id={self.machine.id}") + self.cluster._log(f"[fly.io] Created machine name={self.name} id={self.machine.id}") self.host = f'{self.machine.id}.vm.{self.cluster.app_name}.internal' self.internal_ip = self.machine.private_ip self.port = 8786 @@ -130,14 +131,14 @@ async def create_vm(self): async def destroy_vm(self): if self.machine is None: - self.cluster._log("Not Terminating Machine: Machine does not exist") + self.cluster._log("[fly.io] Not Terminating Machine: Machine does not exist") return - await self.cluster._fly().delete_machine( + await self._fly().delete_machine( app_name=self.cluster.app_name, machine_id=self.machine.id, force=True, ) - self.cluster._log(f"Terminated machine {self.name}") + self.cluster._log(f"[fly.io] Terminated machine {self.name}") async def wait_for_scheduler(self): self.cluster._log(f"Waiting for scheduler to run at {self.host}:{self.port}") @@ -147,11 +148,16 @@ async def wait_for_scheduler(self): return True async def wait_for_app(self): - self.cluster._log("Waiting for app to be created...") - while self.cluster.app_name is None or self.cluster.app is None: + self.cluster._log("[fly.io] Waiting for app to be created...") + while self.cluster.app_name is None or self.app is None: await asyncio.sleep(1) return True + def _fly(self): + if self._client is None: + self._client = Fly(api_token=self.api_token) + return self._client + class FlyMachineScheduler(SchedulerMixin, FlyMachine): """Scheduler running on a Fly.io Machine.""" @@ -172,9 +178,8 @@ def __init__( async def start(self): self.cluster._log(f"Starting scheduler on {self.name}") - if self.cluster.app is None: - await self.cluster.create_app() - await self.cluster.wait_for_app() + if self.cluster.app_name is None: + await self.create_app() await self.start_scheduler() self.status = Status.running @@ -190,7 +195,37 @@ async def start_scheduler(self): async def close(self): await super().close() if self.cluster.app_name is not None: - await self.cluster.delete_app() + await self.delete_app() + + async def create_app(self): + """Create a Fly.io app.""" + if self.cluster.app_name is not None: + self.cluster._log("[fly.io] Not creating app as it already exists") + return + app_name = f"dask-{str(uuid.uuid4())[:8]}" + try: + self.cluster._log(f"[fly.io] Trying to create app {app_name}") + self.app = await self._fly().create_app(app_name=app_name) + self.cluster._log(f"[fly.io] Created app {app_name}") + self.cluster.app_name = app_name + except Exception as e: + self.cluster._log(f"[fly.io] Failed to create app {app_name}") + self.app = "failed" + raise e + + async def delete_app(self): + """Delete a Fly.io app.""" + if self.cluster.app_name is None: + self.cluster._log("[fly.io] Not deleting app as it does not exist") + return + await self._fly().delete_app(app_name=self.cluster.app_name) + self.cluster._log(f"[fly.io] Deleted app {self.cluster.app_name}") + + async def wait_for_app(self): + """Wait for the Fly.io app to be ready.""" + while self.app is None or self.cluster.app_name is None: + self.cluster._log("[fly.io] Waiting for app to be created") + await asyncio.sleep(1) class FlyMachineWorker(WorkerMixin, FlyMachine): @@ -286,7 +321,7 @@ class FlyMachineCluster(VMCluster): Configures communication security in this cluster. Can be a security object, or True. If True, temporary self-signed credentials will be created automatically. Default is ``True``. - debug: bool, optional + debug : bool, optional More information will be printed when constructing clusters to enable debugging. Examples @@ -296,12 +331,16 @@ class FlyMachineCluster(VMCluster): >>> from dask_cloudprovider.fly import FlyMachineCluster >>> cluster = FlyMachineCluster(n_workers=1) + Starting scheduler on dask-e058d78e-scheduler + [fly.io] Trying to create app dask-122f0e5f + [fly.io] Created app dask-122f0e5f Creating scheduler instance - Created machine dask-38b817c1-scheduler - Waiting for scheduler to run + [fly.io] Created machine name=dask-e058d78e-scheduler id=6e82d4e6a02d58 + Waiting for scheduler to run at 6e82d4e6a02d58.vm.dask-122f0e5f.internal:8786 Scheduler is running + Scheduler running at tcp://[fdaa:1:53b:a7b:112:2bed:ccd1:2]:8786 Creating worker instance - Created machine dask-38b817c1-worker-dc95260d + [fly.io] Created machine name=dask-e058d78e-worker-7b24cb61 id=32873e0a095985 Connect a client. @@ -319,8 +358,9 @@ class FlyMachineCluster(VMCluster): >>> client.close() >>> cluster.close() - Terminated machine dask-38b817c1-worker-dc95260d - Terminated machine dask-38b817c1-scheduler + [fly.io] Terminated machine dask-e058d78e-worker-7b24cb61 + [fly.io] Terminated machine dask-e058d78e-scheduler + [fly.io] Deleted app dask-122f0e5f You can also do this all in one go with context managers to ensure the cluster is created and cleaned up. @@ -328,15 +368,20 @@ class FlyMachineCluster(VMCluster): >>> with FlyMachineCluster(n_workers=1) as cluster: ... with Client(cluster) as client: ... print(da.random.random((1000, 1000), chunks=(100, 100)).mean().compute()) + Starting scheduler on dask-e058d78e-scheduler + [fly.io] Trying to create app dask-122f0e5f + [fly.io] Created app dask-122f0e5f Creating scheduler instance - Created machine dask-48efe585-scheduler - Waiting for scheduler to run + [fly.io] Created machine name=dask-e058d78e-scheduler id=6e82d4e6a02d58 + Waiting for scheduler to run at 6e82d4e6a02d58.vm.dask-122f0e5f.internal:8786 Scheduler is running + Scheduler running at tcp://[fdaa:1:53b:a7b:112:2bed:ccd1:2]:8786 Creating worker instance - Created machine dask-48efe585-worker-5181aaf1 + [fly.io] Created machine name=dask-e058d78e-worker-7b24cb61 id=32873e0a095985 0.5000558682356162 - Terminated machine dask-48efe585-worker-5181aaf1 - Terminated machine dask-48efe585-scheduler + [fly.io] Terminated machine dask-e058d78e-worker-7b24cb61 + [fly.io] Terminated machine dask-e058d78e-scheduler + [fly.io] Deleted app dask-122f0e5f """ @@ -356,7 +401,6 @@ def __init__( self.scheduler_class = FlyMachineScheduler self.worker_class = FlyMachineWorker self.debug = debug - self.app = None self.app_name = app_name self._client = None self.options = { @@ -374,9 +418,7 @@ def __init__( "host": "fly-local-6pn", "security": self.config.get("security", False), } - self.scheduler_options = { - **self.options, - } + self.scheduler_options = {**self.options} self.worker_options = {**self.options} self.api_token = self.options["token"] super().__init__( @@ -384,38 +426,3 @@ def __init__( security=self.options["security"], **kwargs ) - - def _fly(self): - if self._client is None: - self._client = Fly(api_token=self.api_token) - return self._client - - async def create_app(self): - """Create a Fly.io app.""" - if self.app_name is not None: - warnings.warn("Not creating app as it already exists") - return - app_name = f"dask-{str(uuid.uuid4())[:8]}" - try: - warnings.warn(f"trying to create app {app_name}") - self.app = await self._fly().create_app(app_name=app_name) - warnings.warn(f"Created app {app_name}") - self.app_name = app_name - except Exception as e: - warnings.warn(f"Failed to create app {app_name}") - self.app = "failed" - raise e - - async def delete_app(self): - """Delete a Fly.io app.""" - if self.app_name is None: - warnings.warn("Not deleting app as it does not exist") - return - await self._fly().delete_app(app_name=self.app_name) - warnings.warn(f"Deleted app {self.app_name}") - - async def wait_for_app(self): - """Wait for the Fly.io app to be ready.""" - while self.app is None or self.app_name is None: - warnings.warn("Waiting for app to be created") - await asyncio.sleep(1) \ No newline at end of file diff --git a/dask_cloudprovider/fly/sdk/fly.py b/dask_cloudprovider/fly/sdk/fly.py index d8db797f..f966cfc4 100644 --- a/dask_cloudprovider/fly/sdk/fly.py +++ b/dask_cloudprovider/fly/sdk/fly.py @@ -15,7 +15,6 @@ FlyAppCreateRequest, FlyAppCreateResponse, FlyAppDetailsResponse, - FlyAppDeleteRequest, FlyAppDeleteResponse, ) from .models.machines import FlyMachineConfig, FlyMachineDetails @@ -308,7 +307,7 @@ async def _make_api_delete_request( """An internal function for making DELETE requests to the Fly.io API.""" api_hostname = self._get_api_hostname() url = f"{api_hostname}/v{self.api_version}/{path}" - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(verify=False) as client: r = await client.delete(url, headers=self._generate_headers()) r.raise_for_status() return r @@ -320,7 +319,7 @@ async def _make_api_get_request( """An internal function for making GET requests to the Fly.io API.""" api_hostname = self._get_api_hostname() url = f"{api_hostname}/v{self.api_version}/{path}" - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(verify=False) as client: r = await client.get(url, headers=self._generate_headers()) r.raise_for_status() return r @@ -334,7 +333,7 @@ async def _make_api_post_request( api_hostname = self._get_api_hostname() url = f"{api_hostname}/v{self.api_version}/{path}" headers = self._generate_headers() - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(verify=False) as client: r = await client.post(url, headers=headers, json=payload) r.raise_for_status() return r From f8273e6bbf2dbf6b862270ef340ada467f510833 Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Sun, 21 May 2023 08:31:40 -0700 Subject: [PATCH 09/15] adding docs --- dask_cloudprovider/fly/machine.py | 4 ++++ doc/source/fly.rst | 37 +++++++++++++++++++++++++++++++ doc/source/index.rst | 1 + doc/source/installation.rst | 1 + 4 files changed, 43 insertions(+) create mode 100644 doc/source/fly.rst diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index 1b07ffe2..2f6e4a56 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -275,6 +275,10 @@ class FlyMachineCluster(VMCluster): VMs in Fly.io (FLY) are referred to as machines. This cluster manager constructs a Dask cluster running on VMs. + _Note: By default, the cluster will instantiate a new Fly.io app. The app will be deleted when + the cluster is closed. If you want to use an existing app, you can pass the app name to the + ``app_name`` parameter._ + When configuring your cluster you may find it useful to install the ``flyctl`` tool for querying the CLY API for available options. diff --git a/doc/source/fly.rst b/doc/source/fly.rst new file mode 100644 index 00000000..263a9297 --- /dev/null +++ b/doc/source/fly.rst @@ -0,0 +1,37 @@ +Fly.io +============ + +.. currentmodule:: dask_cloudprovider.fly + +.. autosummary:: + FlyMachineCluster + +Overview +-------- + +Authentication +^^^^^^^^^^^^^^ + +To authenticate with Fly you must first generate a +`personal access token `_. + +Then you must put this in your Dask configuration at ``cloudprovider.fly.token``. This can be done by +adding the token to your YAML configuration or exporting an environment variable. + +.. code-block:: yaml + + # ~/.config/dask/cloudprovider.yaml + + cloudprovider: + fly: + token: "yourtoken" + +.. code-block:: console + + $ export DASK_CLOUDPROVIDER__FLY__TOKEN="yourtoken" + +FlyMachine +---------- + +.. autoclass:: FlyMachineCluster + :members: \ No newline at end of file diff --git a/doc/source/index.rst b/doc/source/index.rst index c2e57968..20d19166 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -67,6 +67,7 @@ this code. aws.rst digitalocean.rst + fly.rst gcp.rst azure.rst hetzner.rst diff --git a/doc/source/installation.rst b/doc/source/installation.rst index b953cbd6..6bf29317 100644 --- a/doc/source/installation.rst +++ b/doc/source/installation.rst @@ -16,6 +16,7 @@ You can also restrict your install to just a specific cloud provider by giving t $ pip install dask-cloudprovider[azure]  # or $ pip install dask-cloudprovider[azureml]  # or $ pip install dask-cloudprovider[digitalocean]  # or + $ pip install dask-cloudprovider[fly]  # or $ pip install dask-cloudprovider[gcp] Conda From 927b7f2072d2a6b52cd8b6336de65fc64fb060ba Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Sun, 21 May 2023 08:42:49 -0700 Subject: [PATCH 10/15] fix pre-commit lint errors --- dask_cloudprovider/fly/__init__.py | 7 ++- dask_cloudprovider/fly/machine.py | 54 ++++++++++--------- dask_cloudprovider/fly/sdk/fly.py | 4 +- .../fly/sdk/models/apps/__init__.py | 2 + dask_cloudprovider/utils/socket.py | 4 +- 5 files changed, 43 insertions(+), 28 deletions(-) diff --git a/dask_cloudprovider/fly/__init__.py b/dask_cloudprovider/fly/__init__.py index 1fbcc454..944de53a 100644 --- a/dask_cloudprovider/fly/__init__.py +++ b/dask_cloudprovider/fly/__init__.py @@ -1 +1,6 @@ -from .machine import FlyMachine, FlyMachineWorker, FlyMachineScheduler, FlyMachineCluster +from .machine import ( + FlyMachine, + FlyMachineWorker, + FlyMachineScheduler, + FlyMachineCluster, +) diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index 2f6e4a56..a78c30ad 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -26,6 +26,7 @@ ) raise ImportError(msg) from e + class FlyMachine(VMInterface): def __init__( self, @@ -37,10 +38,10 @@ def __init__( memory_mb: int = None, cpus: int = None, image: str = None, - env_vars = None, - extra_bootstrap = None, - metadata = None, - restart = None, + env_vars=None, + extra_bootstrap=None, + metadata=None, + restart=None, **kwargs, ): super().__init__(*args, **kwargs) @@ -59,7 +60,9 @@ def __init__( self.metadata = metadata self.restart = restart self.app_name = self.cluster.app_name - self.env_vars['DASK_INTERNAL__INHERIT_CONFIG'] = dask.config.serialize(dask.config.global_config) + self.env_vars["DASK_INTERNAL__INHERIT_CONFIG"] = dask.config.serialize( + dask.config.global_config + ) self.api_token = self.cluster.api_token self._client = None if self.api_token is None: @@ -81,8 +84,7 @@ async def create_vm(self): port=443, handlers=["http", "tls"] ), machines.FlyMachineRequestConfigServicesPort( - port=8786, - handlers=["http", "tls"] + port=8786, handlers=["http", "tls"] ), ], protocol="tcp", @@ -91,8 +93,7 @@ async def create_vm(self): machines.FlyMachineConfigServices( ports=[ machines.FlyMachineRequestConfigServicesPort( - port=8787, - handlers=["http", "tls"] + port=8787, handlers=["http", "tls"] ), ], protocol="tcp", @@ -113,25 +114,28 @@ async def create_vm(self): ) ], ) - # await self.wait_for_app() self.machine = await self._fly().create_machine( app_name=self.cluster.app_name, # The name of the new Fly.io app. config=machine_config, # A FlyMachineConfig object containing creation details. name=self.name, # The name of the machine. region=self.region, # The deployment region for the machine. ) - self.cluster._log(f"[fly.io] Created machine name={self.name} id={self.machine.id}") - self.host = f'{self.machine.id}.vm.{self.cluster.app_name}.internal' + self.cluster._log( + f"[fly.io] Created machine name={self.name} id={self.machine.id}" + ) + self.host = f"{self.machine.id}.vm.{self.cluster.app_name}.internal" self.internal_ip = self.machine.private_ip self.port = 8786 # self.address = f'tcp://{self.host}:{self.port}' - self.address = f'{self.cluster.protocol}://[{self.machine.private_ip}]:8786' + self.address = f"{self.cluster.protocol}://[{self.machine.private_ip}]:8786" # self.external_address = f'tls://{self.cluster.app_name}.fly.dev:443' return self.address, self.external_address - + async def destroy_vm(self): if self.machine is None: - self.cluster._log("[fly.io] Not Terminating Machine: Machine does not exist") + self.cluster._log( + "[fly.io] Not Terminating Machine: Machine does not exist" + ) return await self._fly().delete_machine( app_name=self.cluster.app_name, @@ -146,7 +150,7 @@ async def wait_for_scheduler(self): await asyncio.sleep(1) self.cluster._log("Scheduler is running") return True - + async def wait_for_app(self): self.cluster._log("[fly.io] Waiting for app to be created...") while self.cluster.app_name is None or self.app is None: @@ -158,6 +162,7 @@ def _fly(self): self._client = Fly(api_token=self.api_token) return self._client + class FlyMachineScheduler(SchedulerMixin, FlyMachine): """Scheduler running on a Fly.io Machine.""" @@ -174,7 +179,7 @@ def __init__( "python", "-m", "distributed.cli.dask_scheduler", - ]+ cli_keywords(scheduler_options, cls=_Scheduler) + ] + cli_keywords(scheduler_options, cls=_Scheduler) async def start(self): self.cluster._log(f"Starting scheduler on {self.name}") @@ -182,7 +187,7 @@ async def start(self): await self.create_app() await self.start_scheduler() self.status = Status.running - + async def start_scheduler(self): self.cluster._log("Creating scheduler instance") address, external_address = await self.create_vm() @@ -191,7 +196,7 @@ async def start_scheduler(self): self.cluster.scheduler_internal_address = address self.cluster.scheduler_external_address = external_address self.cluster.scheduler_port = self.port - + async def close(self): await super().close() if self.cluster.app_name is not None: @@ -269,6 +274,7 @@ def __init__( ), ] + class FlyMachineCluster(VMCluster): """Cluster running on Fly.io Machines. @@ -414,7 +420,9 @@ def __init__( "vm_size": vm_size if vm_size is not None else self.config.get("vm_size"), "image": image if image is not None else self.config.get("image"), "token": token if token is not None else self.config.get("token"), - "memory_mb": memory_mb if memory_mb is not None else self.config.get("memory_mb"), + "memory_mb": memory_mb + if memory_mb is not None + else self.config.get("memory_mb"), "cpus": cpus if cpus is not None else self.config.get("cpus"), "app_name": self.app_name, "protocol": self.config.get("protocol", "tcp"), @@ -425,8 +433,4 @@ def __init__( self.scheduler_options = {**self.options} self.worker_options = {**self.options} self.api_token = self.options["token"] - super().__init__( - debug=debug, - security=self.options["security"], - **kwargs - ) + super().__init__(debug=debug, security=self.options["security"], **kwargs) diff --git a/dask_cloudprovider/fly/sdk/fly.py b/dask_cloudprovider/fly/sdk/fly.py index f966cfc4..d3efd713 100644 --- a/dask_cloudprovider/fly/sdk/fly.py +++ b/dask_cloudprovider/fly/sdk/fly.py @@ -91,7 +91,9 @@ async def delete_app( # Raise an exception if HTTP status code is not 200. if r.status_code != 202: - raise AppInterfaceError(message=f"Unable to delete {app_name}! status_code={r.status_code}") + raise AppInterfaceError( + message=f"Unable to delete {app_name}! status_code={r.status_code}" + ) return FlyAppDeleteResponse( status=r.status_code, diff --git a/dask_cloudprovider/fly/sdk/models/apps/__init__.py b/dask_cloudprovider/fly/sdk/models/apps/__init__.py index 7b365e4a..9a6ac3e3 100644 --- a/dask_cloudprovider/fly/sdk/models/apps/__init__.py +++ b/dask_cloudprovider/fly/sdk/models/apps/__init__.py @@ -6,10 +6,12 @@ class FlyAppCreateRequest(BaseModel): app_name: str org_slug: Union[str, None] = None + class FlyAppCreateResponse(BaseModel): app_name: str org_slug: Union[str, None] = None + class FlyAppDetailsResponse(BaseModel): name: str status: str diff --git a/dask_cloudprovider/utils/socket.py b/dask_cloudprovider/utils/socket.py index 49e5d6ac..d6f6f963 100644 --- a/dask_cloudprovider/utils/socket.py +++ b/dask_cloudprovider/utils/socket.py @@ -11,6 +11,7 @@ def is_socket_open(ip, port): except Exception: return False + def is_ipv6_socket_open(ip, port): connection = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) try: @@ -20,6 +21,7 @@ def is_ipv6_socket_open(ip, port): except Exception: return False + async def async_socket_open(address, port): loop = asyncio.get_event_loop() try: @@ -27,4 +29,4 @@ async def async_socket_open(address, port): res.close() return True except Exception: - return False \ No newline at end of file + return False From a4caed8d9ac6af1f5416167cb7db778b258f3af3 Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Thu, 26 Oct 2023 18:08:18 -0700 Subject: [PATCH 11/15] fix pydantic error --- dask_cloudprovider/fly/sdk/models/apps/__init__.py | 2 +- dask_cloudprovider/fly/sdk/models/machines/__init__.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dask_cloudprovider/fly/sdk/models/apps/__init__.py b/dask_cloudprovider/fly/sdk/models/apps/__init__.py index 9a6ac3e3..034584a0 100644 --- a/dask_cloudprovider/fly/sdk/models/apps/__init__.py +++ b/dask_cloudprovider/fly/sdk/models/apps/__init__.py @@ -25,4 +25,4 @@ class FlyAppDeleteRequest(BaseModel): class FlyAppDeleteResponse(BaseModel): app_name: str - status: str + status: int diff --git a/dask_cloudprovider/fly/sdk/models/machines/__init__.py b/dask_cloudprovider/fly/sdk/models/machines/__init__.py index 44197fb6..ad89b181 100644 --- a/dask_cloudprovider/fly/sdk/models/machines/__init__.py +++ b/dask_cloudprovider/fly/sdk/models/machines/__init__.py @@ -38,9 +38,9 @@ class FlyMachineConfigGuest(BaseModel): class FlyMachineConfigInit(BaseModel): """Model for FlyMachineConfig.init""" - exec: Union[str, None] - entrypoint: Union[str, None] - cmd: Union[str, None] + exec: Union[str, None] = None + entrypoint: Union[str, None] = None + cmd: Union[str, None] = None tty: bool = False From 0463a2fda92dca9c31cf3206aad5d3a2cdccf6b0 Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Fri, 27 Oct 2023 13:58:59 -0700 Subject: [PATCH 12/15] fix docs --- ci/scripts/test_imports.sh | 1 + dask_cloudprovider/cloudprovider.yaml | 2 +- dask_cloudprovider/fly/machine.py | 7 ++++--- doc/source/fly.rst | 4 ++-- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/ci/scripts/test_imports.sh b/ci/scripts/test_imports.sh index c50a56bf..80ac3b8f 100644 --- a/ci/scripts/test_imports.sh +++ b/ci/scripts/test_imports.sh @@ -19,3 +19,4 @@ test_import "aws" "import dask_cloudprovider.aws" test_import "azure" "import dask_cloudprovider.azure" test_import "digitalocean" "import dask_cloudprovider.digitalocean" test_import "gcp" "import dask_cloudprovider.gcp" +test_import "fly" "import dask_cloudprovider.fly" diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index 121ee64b..1d2c09ab 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -122,7 +122,7 @@ cloudprovider: token: null # API token for interacting with the Fly API region: "lax" # Region to launch Droplets in vm_size: "shared-cpu-1x" # Droplet size to launch, default is 1GB RAM, 1 vCPU - image: "daskdev/dask:latest-py3.10" # Operating System image to use + image: "daskdev/dask:latest" # Operating System image to use memory_mb: 1024 # Memory in MB to use for the scheduler and workers cpus: 1 # Number of CPUs to use for the scheduler and workers app_name: null # Name of Fly app to use. If it is blank, a random name will be generated. diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index a78c30ad..dd5e3983 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -281,9 +281,10 @@ class FlyMachineCluster(VMCluster): VMs in Fly.io (FLY) are referred to as machines. This cluster manager constructs a Dask cluster running on VMs. - _Note: By default, the cluster will instantiate a new Fly.io app. The app will be deleted when - the cluster is closed. If you want to use an existing app, you can pass the app name to the - ``app_name`` parameter._ + .. note:: + By default, the cluster will instantiate a new Fly.io app. The app will be deleted when + the cluster is closed. If you want to use an existing app, you can pass the app name to the + ``app_name`` parameter. When configuring your cluster you may find it useful to install the ``flyctl`` tool for querying the CLY API for available options. diff --git a/doc/source/fly.rst b/doc/source/fly.rst index 263a9297..c856bf3a 100644 --- a/doc/source/fly.rst +++ b/doc/source/fly.rst @@ -1,5 +1,5 @@ Fly.io -============ +====== .. currentmodule:: dask_cloudprovider.fly @@ -34,4 +34,4 @@ FlyMachine ---------- .. autoclass:: FlyMachineCluster - :members: \ No newline at end of file + :members: From dc83835c9bf0fe32185ba857700d3567e3f93a4f Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Sun, 29 Oct 2023 04:18:47 -0700 Subject: [PATCH 13/15] fix ipv6 port --- dask_cloudprovider/fly/machine.py | 29 ++++++++++--------- .../fly/sdk/models/machines/__init__.py | 4 +-- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index dd5e3983..75822c08 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -78,13 +78,7 @@ async def create_vm(self): machines.FlyMachineConfigServices( ports=[ machines.FlyMachineRequestConfigServicesPort( - port=80, handlers=["http"] - ), - machines.FlyMachineRequestConfigServicesPort( - port=443, handlers=["http", "tls"] - ), - machines.FlyMachineRequestConfigServicesPort( - port=8786, handlers=["http", "tls"] + port=8786 ), ], protocol="tcp", @@ -92,6 +86,12 @@ async def create_vm(self): ), machines.FlyMachineConfigServices( ports=[ + machines.FlyMachineRequestConfigServicesPort( + port=80, handlers=["http"] + ), + machines.FlyMachineRequestConfigServicesPort( + port=443, handlers=["http", "tls"] + ), machines.FlyMachineRequestConfigServicesPort( port=8787, handlers=["http", "tls"] ), @@ -120,15 +120,14 @@ async def create_vm(self): name=self.name, # The name of the machine. region=self.region, # The deployment region for the machine. ) - self.cluster._log( - f"[fly.io] Created machine name={self.name} id={self.machine.id}" - ) self.host = f"{self.machine.id}.vm.{self.cluster.app_name}.internal" self.internal_ip = self.machine.private_ip self.port = 8786 - # self.address = f'tcp://{self.host}:{self.port}' - self.address = f"{self.cluster.protocol}://[{self.machine.private_ip}]:8786" - # self.external_address = f'tls://{self.cluster.app_name}.fly.dev:443' + self.address = f"{self.cluster.protocol}://[{self.machine.private_ip}]:{self.port}" + # self.external_address = f"{self.cluster.protocol}://{self.host}:{self.port}" + self.cluster._log( + f"[fly.io] Created machine name={self.name} id={self.machine.id} internal_ip={self.internal_ip} address={self.address} external_address={self.external_address}" + ) return self.address, self.external_address async def destroy_vm(self): @@ -179,6 +178,8 @@ def __init__( "python", "-m", "distributed.cli.dask_scheduler", + "--host", + "fly-local-6pn", ] + cli_keywords(scheduler_options, cls=_Scheduler) async def start(self): @@ -269,6 +270,7 @@ def __init__( "opts": { **worker_options, "name": self.name, + "host": "fly-local-6pn", }, } ), @@ -429,7 +431,6 @@ def __init__( "protocol": self.config.get("protocol", "tcp"), "security": self.config.get("security", False), "host": "fly-local-6pn", - "security": self.config.get("security", False), } self.scheduler_options = {**self.options} self.worker_options = {**self.options} diff --git a/dask_cloudprovider/fly/sdk/models/machines/__init__.py b/dask_cloudprovider/fly/sdk/models/machines/__init__.py index ad89b181..18e2cdc9 100644 --- a/dask_cloudprovider/fly/sdk/models/machines/__init__.py +++ b/dask_cloudprovider/fly/sdk/models/machines/__init__.py @@ -70,7 +70,7 @@ class FlyMachineRequestConfigServicesPort(BaseModel): """Model for FlyMachineConfig.services.port""" port: int - handlers: list[str] + handlers: list[str] = [] @validator("port") def validate_port(cls, port: int) -> int: @@ -97,7 +97,7 @@ def validate_handlers(cls, handlers: list[str]) -> list[str]: class FlyMachineConfigServices(BaseModel): """Model for FlyMachineConfig.services""" - ports: list[FlyMachineRequestConfigServicesPort] + ports: list[FlyMachineRequestConfigServicesPort] = [] protocol: str internal_port: int From b9c6e4aa39be24339095cfcf748cf8f82e1c3d7f Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Fri, 3 Nov 2023 20:34:58 -0700 Subject: [PATCH 14/15] fix SSLWantReadError during shutdown; address code review comments --- dask_cloudprovider/__init__.py | 6 ------ dask_cloudprovider/cloudprovider.yaml | 2 +- dask_cloudprovider/fly/machine.py | 17 +++++++++++++---- dask_cloudprovider/fly/sdk/fly.py | 6 +++--- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/dask_cloudprovider/__init__.py b/dask_cloudprovider/__init__.py index 19f8b39f..ced23cb2 100755 --- a/dask_cloudprovider/__init__.py +++ b/dask_cloudprovider/__init__.py @@ -42,9 +42,3 @@ def __getattr__(name): "DigitalOcean cluster managers must be imported from the digitalocean subpackage. " f"Please import dask_cloudprovider.digitalocean.{name}" ) - - if name in ["FlyMachineCluster"]: - raise ImportError( - "Fly.io cluster managers must be imported from the fly subpackage. " - f"Please import dask_cloudprovider.fly.{name}" - ) diff --git a/dask_cloudprovider/cloudprovider.yaml b/dask_cloudprovider/cloudprovider.yaml index 1d2c09ab..d4b7bf2d 100755 --- a/dask_cloudprovider/cloudprovider.yaml +++ b/dask_cloudprovider/cloudprovider.yaml @@ -122,7 +122,7 @@ cloudprovider: token: null # API token for interacting with the Fly API region: "lax" # Region to launch Droplets in vm_size: "shared-cpu-1x" # Droplet size to launch, default is 1GB RAM, 1 vCPU - image: "daskdev/dask:latest" # Operating System image to use + image: "ghcr.io/dask/dask:latest" # Operating System image to use memory_mb: 1024 # Memory in MB to use for the scheduler and workers cpus: 1 # Number of CPUs to use for the scheduler and workers app_name: null # Name of Fly app to use. If it is blank, a random name will be generated. diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index 75822c08..64cb474c 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -125,9 +125,18 @@ async def create_vm(self): self.port = 8786 self.address = f"{self.cluster.protocol}://[{self.machine.private_ip}]:{self.port}" # self.external_address = f"{self.cluster.protocol}://{self.host}:{self.port}" - self.cluster._log( - f"[fly.io] Created machine name={self.name} id={self.machine.id} internal_ip={self.internal_ip} address={self.address} external_address={self.external_address}" + log_attributes = { + 'name': self.name, + 'machine': self.machine.id, + 'internal_ip': self.internal_ip, + 'address': self.address, + } + if self.external_address is not None: + log_attributes['external_address'] = self.external_address + logline = "[fly.io] Created machine " + " ".join( + [f"{k}={v}" for k, v in log_attributes.items()] ) + self.cluster._log(logline) return self.address, self.external_address async def destroy_vm(self): @@ -144,7 +153,7 @@ async def destroy_vm(self): self.cluster._log(f"[fly.io] Terminated machine {self.name}") async def wait_for_scheduler(self): - self.cluster._log(f"Waiting for scheduler to run at {self.host}:{self.port}") + self.cluster._log(f"Waiting for scheduler to run at {self.address}") while not asyncio.create_task(async_socket_open(self.internal_ip, self.port)): await asyncio.sleep(1) self.cluster._log("Scheduler is running") @@ -333,7 +342,7 @@ class FlyMachineCluster(VMCluster): security : Security or bool, optional Configures communication security in this cluster. Can be a security object, or True. If True, temporary self-signed credentials will - be created automatically. Default is ``True``. + be created automatically. Default is ``False``. debug : bool, optional More information will be printed when constructing clusters to enable debugging. diff --git a/dask_cloudprovider/fly/sdk/fly.py b/dask_cloudprovider/fly/sdk/fly.py index d3efd713..e1cd9df4 100644 --- a/dask_cloudprovider/fly/sdk/fly.py +++ b/dask_cloudprovider/fly/sdk/fly.py @@ -309,7 +309,7 @@ async def _make_api_delete_request( """An internal function for making DELETE requests to the Fly.io API.""" api_hostname = self._get_api_hostname() url = f"{api_hostname}/v{self.api_version}/{path}" - async with httpx.AsyncClient(verify=False) as client: + async with httpx.AsyncClient(verify=False, timeout=None) as client: r = await client.delete(url, headers=self._generate_headers()) r.raise_for_status() return r @@ -321,7 +321,7 @@ async def _make_api_get_request( """An internal function for making GET requests to the Fly.io API.""" api_hostname = self._get_api_hostname() url = f"{api_hostname}/v{self.api_version}/{path}" - async with httpx.AsyncClient(verify=False) as client: + async with httpx.AsyncClient(verify=False, timeout=None) as client: r = await client.get(url, headers=self._generate_headers()) r.raise_for_status() return r @@ -335,7 +335,7 @@ async def _make_api_post_request( api_hostname = self._get_api_hostname() url = f"{api_hostname}/v{self.api_version}/{path}" headers = self._generate_headers() - async with httpx.AsyncClient(verify=False) as client: + async with httpx.AsyncClient(verify=False, timeout=None) as client: r = await client.post(url, headers=headers, json=payload) r.raise_for_status() return r From 1ad441992c1eee18826dfb0d55c5fc660a99eb49 Mon Sep 17 00:00:00 2001 From: Miles Zimmerman Date: Fri, 3 Nov 2023 20:48:29 -0700 Subject: [PATCH 15/15] fix lint issues --- dask_cloudprovider/fly/machine.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/dask_cloudprovider/fly/machine.py b/dask_cloudprovider/fly/machine.py index 64cb474c..96a4c374 100644 --- a/dask_cloudprovider/fly/machine.py +++ b/dask_cloudprovider/fly/machine.py @@ -77,9 +77,7 @@ async def create_vm(self): services=[ machines.FlyMachineConfigServices( ports=[ - machines.FlyMachineRequestConfigServicesPort( - port=8786 - ), + machines.FlyMachineRequestConfigServicesPort(port=8786), ], protocol="tcp", internal_port=8786, @@ -123,16 +121,18 @@ async def create_vm(self): self.host = f"{self.machine.id}.vm.{self.cluster.app_name}.internal" self.internal_ip = self.machine.private_ip self.port = 8786 - self.address = f"{self.cluster.protocol}://[{self.machine.private_ip}]:{self.port}" + self.address = ( + f"{self.cluster.protocol}://[{self.machine.private_ip}]:{self.port}" + ) # self.external_address = f"{self.cluster.protocol}://{self.host}:{self.port}" log_attributes = { - 'name': self.name, - 'machine': self.machine.id, - 'internal_ip': self.internal_ip, - 'address': self.address, + "name": self.name, + "machine": self.machine.id, + "internal_ip": self.internal_ip, + "address": self.address, } if self.external_address is not None: - log_attributes['external_address'] = self.external_address + log_attributes["external_address"] = self.external_address logline = "[fly.io] Created machine " + " ".join( [f"{k}={v}" for k, v in log_attributes.items()] )