Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Nebius AI Cloud support #443

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions dask_cloudprovider/cloudprovider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,13 @@ cloudprovider:
external_network_id: null # The ID of the external network used for assigning floating IPs. List available external networks using: `openstack network list --external`
create_floating_ip: false # Specifies whether to assign a floating IP to each instance, enabling external access. Set to `True` if external connectivity is needed.
docker_image: "daskdev/dask:latest" # docker image to use

nebius:
token: null # iam token for interacting with the Nebius AI Cloud
project_id: null # You can find it in Nebius AI Cloud console
bootstrap: true # It is assumed that the OS image does not have Docker and needs bootstrapping. Set this to false if using a custom image with Docker already installed.
image_family: "ubuntu22.04-driverless" # it should be "ubuntu22.04-driverless" or "ubuntu22.04-cuda12" https://docs.nebius.com/compute/storage/manage#parameters-boot
docker_image: "daskdev/dask:latest" # docker image to use
server_platform: "cpu-d3" # all platforms https://docs.nebius.com/compute/virtual-machines/types
server_preset: "4vcpu-16gb" # all presets https://docs.nebius.com/compute/virtual-machines/types
disk_size: 64 # Specifies the size of the VM host OS disk in gigabytes.
1 change: 1 addition & 0 deletions dask_cloudprovider/nebius/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .instances import NebiusCluster
287 changes: 287 additions & 0 deletions dask_cloudprovider/nebius/instances.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import dask

from dask_cloudprovider.generic.vmcluster import (
VMCluster,
VMInterface,
SchedulerMixin,
WorkerMixin,
)

try:
from nebius.api.nebius.common.v1 import ResourceMetadata
from nebius.api.nebius.vpc.v1 import SubnetServiceClient, ListSubnetsRequest
from nebius.sdk import SDK
from nebius.api.nebius.compute.v1 import (
InstanceServiceClient,
CreateInstanceRequest,
DiskServiceClient,
CreateDiskRequest,
DiskSpec,
SourceImageFamily,
InstanceSpec,
AttachedDiskSpec,
ExistingDisk,
ResourcesSpec,
NetworkInterfaceSpec,
IPAddress,
PublicIPAddress,
GetInstanceRequest,
DeleteInstanceRequest,
DeleteDiskRequest,
)
except ImportError as e:
msg = (
"Dask Cloud Provider Nebius requirements are not installed.\n\n"
"Please pip install as follows:\n\n"
' pip install "dask-cloudprovider[nebius]" --upgrade # or python -m pip install'
)
raise ImportError(msg) from e


class NebiusInstance(VMInterface):
def __init__(
self,
cluster: str,
config,
env_vars: dict = None,
bootstrap=None,
extra_bootstrap=None,
docker_image: str = None,
image_family: str = None,
project_id: str = None,
server_platform: str = None,
server_preset: str = None,
disk_size: int = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.cluster = cluster
self.config = config
self.extra_bootstrap = extra_bootstrap
self.env_vars = env_vars
self.bootstrap = bootstrap
self.image_family = image_family
self.project_id = project_id
self.docker_image = docker_image
self.server_platform = server_platform
self.server_preset = server_preset
self.sdk = SDK(credentials=self.config.get("token"))
self.disk_size = disk_size
self.instance_id = None
self.disk_id = None

async def create_vm(self, user_data=None):
service = DiskServiceClient(self.sdk)
operation = await service.create(
CreateDiskRequest(
metadata=ResourceMetadata(
parent_id=self.project_id,
name=self.name + "-disk",
),
spec=DiskSpec(
source_image_family=SourceImageFamily(
image_family=self.image_family
),
size_gibibytes=self.disk_size,
type=DiskSpec.DiskType.NETWORK_SSD,
),
)
)
await operation.wait()
self.disk_id = operation.resource_id

service = SubnetServiceClient(self.sdk)
sub_net = await service.list(ListSubnetsRequest(parent_id=self.project_id))
subnet_id = sub_net.items[0].metadata.id

service = InstanceServiceClient(self.sdk)
operation = await service.create(
CreateInstanceRequest(
metadata=ResourceMetadata(
parent_id=self.project_id,
name=self.name,
),
spec=InstanceSpec(
boot_disk=AttachedDiskSpec(
attach_mode=AttachedDiskSpec.AttachMode(2),
existing_disk=ExistingDisk(id=self.disk_id),
),
cloud_init_user_data=self.cluster.render_process_cloud_init(self),
resources=ResourcesSpec(
platform=self.server_platform, preset=self.server_preset
),
network_interfaces=[
NetworkInterfaceSpec(
subnet_id=subnet_id,
ip_address=IPAddress(),
name="network-interface-0",
public_ip_address=PublicIPAddress(),
)
],
),
)
)
self.instance_id = operation.resource_id

self.cluster._log(f"Creating Nebius instance {self.name}")
await operation.wait()
service = InstanceServiceClient(self.sdk)
operation = await service.get(
GetInstanceRequest(
id=self.instance_id,
)
)
internal_ip = operation.status.network_interfaces[0].ip_address.address.split(
"/"
)[0]
external_ip = operation.status.network_interfaces[
0
].public_ip_address.address.split("/")[0]
self.cluster._log(
f"Created Nebius instance {self.name} with internal IP {internal_ip} and external IP {external_ip}"
)
return internal_ip, external_ip

async def destroy_vm(self):
if self.instance_id:
service = InstanceServiceClient(self.sdk)
operation = await service.delete(
DeleteInstanceRequest(
id=self.instance_id,
)
)
await operation.wait()

if self.disk_id:
service = DiskServiceClient(self.sdk)
await service.delete(
DeleteDiskRequest(
id=self.disk_id,
)
)
self.cluster._log(
f"Terminated instance {self.name} ({self.instance_id}) and deleted disk {self.disk_id}"
)
self.instance_id = None
self.disk_id = None


class NebiusScheduler(SchedulerMixin, NebiusInstance):
"""Scheduler running on a Nebius server."""


class NebiusWorker(WorkerMixin, NebiusInstance):
"""Worker running on a Nebius server."""


class NebiusCluster(VMCluster):
"""Cluster running on Nebius AI Cloud instances.

VMs in Nebius AI Cloud are referred to as instances. This cluster manager constructs a Dask cluster
running on VMs.

When configuring your cluster you may find it useful to install the ``nebius`` tool for querying the
Nebius API for available options.

https://docs.nebius.com/cli/quickstart

Parameters
----------
image_family: str
The image to use for the host OS. This should be a Ubuntu variant.
You find list available images here https://docs.nebius.com/compute/storage/manage#parameters-boot.
project_id: str
The Nebius AI Cloud project id. You can find in Nebius AI Cloud console.
server_platform: str
List of all platforms and presets here https://docs.nebius.com/compute/virtual-machines/types/.
server_preset: str
List of all platforms and presets here https://docs.nebius.com/compute/virtual-machines/types/.
n_workers: int
Number of workers to initialise the cluster with. Defaults to ``0``.
worker_module: str
The Python module to run for the worker. Defaults to ``distributed.cli.dask_worker``
worker_options: dict
Params to be passed to the worker class.
See :class:`distributed.worker.Worker` for default worker class.
If you set ``worker_module`` then refer to the docstring for the custom worker class.
scheduler_options: dict
Params to be passed to the scheduler class.
See :class:`distributed.scheduler.Scheduler`.
env_vars: dict
Environment variables to be passed to the worker.
extra_bootstrap: list[str] (optional)
Extra commands to be run during the bootstrap phase.

Example
--------

>>> from dask_cloudprovider.nebius import NebiusCluster
>>> cluster = NebiusCluster(n_workers=1)

>>> from dask.distributed import Client
>>> client = Client(cluster)

>>> import dask.array as da
>>> arr = da.random.random((1000, 1000), chunks=(100, 100))
>>> arr.mean().compute()

>>> client.close()
>>> cluster.close()

"""

def __init__(
self,
bootstrap: str = None,
image_family: str = None,
project_id: str = None,
disk_size: int = None,
server_platform: str = None,
server_preset: str = None,
docker_image: str = None,
debug: bool = False,
**kwargs,
):
self.config = dask.config.get("cloudprovider.nebius", {})

self.scheduler_class = NebiusScheduler
self.worker_class = NebiusWorker

self.image_family = dask.config.get(
"cloudprovider.nebius.image_family", override_with=image_family
)
self.docker_image = dask.config.get(
"cloudprovider.nebius.docker_image", override_with=docker_image
)
self.project_id = dask.config.get(
"cloudprovider.nebius.project_id", override_with=project_id
)
self.server_platform = dask.config.get(
"cloudprovider.nebius.server_platform", override_with=server_platform
)
self.server_preset = dask.config.get(
"cloudprovider.nebius.server_preset", override_with=server_preset
)
self.bootstrap = dask.config.get(
"cloudprovider.nebius.bootstrap", override_with=bootstrap
)
self.disk_size = dask.config.get(
"cloudprovider.nebius.disk_size", override_with=disk_size
)
self.debug = debug

self.options = {
"bootstrap": self.bootstrap,
"cluster": self,
"config": self.config,
"docker_image": self.docker_image,
"image_family": self.image_family,
"project_id": self.project_id,
"server_platform": self.server_platform,
"server_preset": self.server_preset,
"disk_size": self.disk_size,
}
self.scheduler_options = {**self.options}
self.worker_options = {**self.options}
super().__init__(debug=debug, **kwargs)
77 changes: 77 additions & 0 deletions dask_cloudprovider/nebius/tests/test_nebius.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import pytest

import dask

nebius = pytest.importorskip("nebius")

from dask_cloudprovider.nebius.instances import NebiusCluster
from dask.distributed import Client
from distributed.core import Status


async def skip_without_credentials(config):
if config.get("token") is None or config.get("project_id") is None:
pytest.skip(
"""
You must configure a Nebius AI Cloud API token to run this test.

Either set this in your config

# cloudprovider.yaml
cloudprovider:
nebius:
token: "yourtoken"
project_id: "yourprojectid"

Or by setting it as an environment variable

export DASK_CLOUDPROVIDER__NEBIUS__TOKEN=$(nebius iam get-access-token)
export DASK_CLOUDPROVIDER__NEBIUS__PROJECT_ID=project_id

"""
)


@pytest.fixture
async def config():
return dask.config.get("cloudprovider.nebius", {})


@pytest.fixture
@pytest.mark.external
async def cluster(config):
await skip_without_credentials(config)
async with NebiusCluster(asynchronous=True, debug=True) as cluster:
yield cluster


@pytest.mark.asyncio
@pytest.mark.external
async def test_init():
cluster = NebiusCluster(asynchronous=True, debug=True)
assert cluster.status == Status.created


@pytest.mark.asyncio
@pytest.mark.external
async def test_create_cluster(cluster):
assert cluster.status == Status.running

cluster.scale(1)
await cluster
assert len(cluster.workers) == 1

async with Client(cluster, asynchronous=True) as client:

def inc(x):
return x + 1

assert await client.submit(inc, 10).result() == 11


@pytest.mark.asyncio
async def test_get_cloud_init():
cloud_init = NebiusCluster.get_cloud_init(
docker_args="--privileged",
)
assert " --privileged " in cloud_init
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"hetzner": ["hcloud>=1.10.0"],
"ibm": ["ibm_code_engine_sdk>=3.1.0"],
"openstack": ["openstacksdk>=3.3.0"],
"nebius": ["nebius @ git+https://github.com/nebius/pysdk"],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any road to getting this properly published on PyPI. Just pointing to the tip of main on GitHub is an uncomfortable place to be.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will fix it as soon as Nebius publishes lib on PiPy.

}
extras_require["all"] = set(pkg for pkgs in extras_require.values() for pkg in pkgs)

Expand Down
Loading