Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] cluster module #681

Open
adebardo opened this issue Feb 6, 2025 · 0 comments
Open

[POC] cluster module #681

adebardo opened this issue Feb 6, 2025 · 0 comments

Comments

@adebardo
Copy link
Contributor

adebardo commented Feb 6, 2025

Context

The goal of this ticket is to set up a cluster in xDEM to enable distributed computing.
This ticket focuses only on creating a multiprocessing cluster, but it is designed in a way that will allow Dask integration in the future if needed.

Implementation

This cluster is simple.
First, we propose creating a cluster.py module at the root of the source code, where we will add the internal proof-of-concept (PoC) code.

class ClusterGenerator:
    def __new__(self, name, nb_workers=2):
        if name == "basic":
            self.cluster = BasicCluster
        else:
            self.cluster = MpCluster(conf={"nb_workers": nb_workers})
        return self.cluster

class AbstractCluster:
    def __init__(self):
        self.pool = None
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
    
    def close(self):
        pass
    
    def launch_task(self, fun, args=[], kwargs={}):
        pass
    
    def get_res(self, future):
        return future
    
    def return_wrapper(self):
        pass
    
    def tile_retriever(self, res):
        pass

class BasicCluster(AbstractCluster):
    def launch_task(self, fun, args=[], kwargs={}):
        return fun(*args, **kwargs)
        
class MpCluster(AbstractCluster):
    def __init__(self, conf=None):
        nb_workers = 1
        if conf is not None:
            nb_workers = conf.get("nb_workers", 1)
        ctx_in_main = multiprocessing.get_context("forkserver")
        self.pool = ctx_in_main.Pool(processes=nb_workers, maxtasksperchild=10)

    def close(self):
        self.pool.terminate()
        self.pool.join()

def launch_task(self, fun, args=[], kwargs={}):
    return self.pool.apply_async(fun, args=args, kwds=kwargs)

def get_res(self, future):
    return future.get(timeout=5000)

Tests

It is possible to refer to the tests done here: https://github.com/CNES/cars/blob/master/tests/orchestrator/cluster/test_cluster.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant