Source code for distributed.worker_client

from contextlib import contextmanager
import warnings

from .threadpoolexecutor import secede, rejoin
from .worker import thread_state, get_client, get_worker


[docs]@contextmanager def worker_client(timeout=3, separate_thread=True): """ Get client for this thread This context manager is intended to be called within functions that we run on workers. When run as a context manager it delivers a client ``Client`` object that can submit other tasks directly from that worker. Parameters ---------- timeout: Number Timeout after which to err separate_thread: bool, optional Whether to run this function outside of the normal thread pool defaults to True Examples -------- >>> def func(x): ... with worker_client() as c: # connect from worker back to scheduler ... a = c.submit(inc, x) # this task can submit more tasks ... b = c.submit(dec, x) ... result = c.gather([a, b]) # and gather results ... return result >>> future = client.submit(func, 1) # submit func(1) on cluster See Also -------- get_worker get_client secede """ worker = get_worker() client = get_client(timeout=timeout) if separate_thread: secede() # have this thread secede from the thread pool worker.loop.add_callback(worker.transition, thread_state.key, "long-running") yield client if separate_thread: rejoin()
def local_client(*args, **kwargs): warnings.warn("local_client has moved to worker_client") return worker_client(*args, **kwargs)