Skip to content

Worker

DVC Task worker factories.

TemporaryWorker

Temporary worker that automatically shuts down when queue is empty.

Source code in dvc_task/worker/temporary.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class TemporaryWorker:
    """Temporary worker that automatically shuts down when queue is empty."""

    def __init__(
        self,
        app: Celery,
        timeout: int = 60,
        **kwargs,
    ):
        """Construct a worker.

        Arguments:
            app: Celery application instance.
            timeout: Queue timeout in seconds. Worker will be terminated if the
                queue remains empty after timeout.

        Additional keyword arguments will be passed as celery worker
        configuration.
        """
        self.app = app
        self.timeout = timeout
        self.config = kwargs

    def ping(self, name: str, timeout: float = 1.0) -> Optional[List[Dict[str, Any]]]:
        """Ping the specified worker."""
        return self._ping(destination=[default_nodename(name)], timeout=timeout)

    def _ping(
        self, *, destination: Optional[List[str]] = None, timeout: float = 1.0
    ) -> Optional[List[Dict[str, Any]]]:
        return self.app.control.ping(destination=destination, timeout=timeout)

    def start(self, name: str, fsapp_clean: bool = False) -> None:
        """Start the worker if it does not already exist.

        Runs the Celery worker main thread in the current process.

        Arguments:
            name: Celery worker name.
            fsapp_clean: Automatically cleanup FSApp broker on shutdown. Has no
                effect unless app is an FSApp instance.
        """
        if os.name == "nt":
            # see https://github.com/celery/billiard/issues/247
            os.environ["FORKED_BY_MULTIPROCESSING"] = "1"

        if not self.ping(name):
            monitor = threading.Thread(
                target=self.monitor,
                daemon=True,
                args=(name,),
            )
            monitor.start()
            config = dict(self.config)
            config["hostname"] = name
            argv = ["worker"]
            argv.extend(self._parse_config(config))
            self.app.worker_main(argv=argv)
            if fsapp_clean and isinstance(self.app, FSApp):  # type: ignore[unreachable]
                logger.info("cleaning up FSApp broker.")
                self.app.clean()
            logger.info("done")

    @staticmethod
    def _parse_config(config: Mapping[str, Any]) -> List[str]:
        loglevel = config.get("loglevel", "info")
        argv = [f"--loglevel={loglevel}"]
        for key in ("hostname", "pool", "concurrency", "prefetch_multiplier"):
            value = config.get(key)
            if value:
                argv_key = key.replace("_", "-")
                argv.append(f"--{argv_key}={value}")
        for key in (
            "without_heartbeat",
            "without_mingle",
            "without_gossip",
        ):
            if config.get(key):
                argv_key = key.replace("_", "-")
                argv.append(f"--{argv_key}")
        if config.get("task_events"):
            argv.append("-E")
        return argv

    def monitor(self, name: str) -> None:
        """Monitor the worker and stop it when the queue is empty."""
        nodename = default_nodename(name)

        def _tasksets(nodes):
            for taskset in (
                nodes.active(),
                nodes.scheduled(),
                nodes.reserved(),
            ):
                if taskset is not None:
                    yield from taskset.values()

            if isinstance(self.app, FSApp):
                yield from self.app.iter_queued()

        logger.debug("monitor: watching celery worker '%s'", nodename)
        while True:
            time.sleep(self.timeout)
            nodes = self.app.control.inspect(  # type: ignore[call-arg]
                destination=[nodename],
                limit=1,
            )
            if nodes is None or not any(tasks for tasks in _tasksets(nodes)):
                logger.info("monitor: shutting down due to empty queue.")
                break
        logger.debug("monitor: sending shutdown to '%s'.", nodename)
        self.app.control.shutdown(destination=[nodename])
        logger.debug("monitor: done")

__init__(app, timeout=60, **kwargs)

Construct a worker.

Parameters:

Name Type Description Default
app Celery

Celery application instance.

required
timeout int

Queue timeout in seconds. Worker will be terminated if the queue remains empty after timeout.

60

Additional keyword arguments will be passed as celery worker configuration.

Source code in dvc_task/worker/temporary.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    app: Celery,
    timeout: int = 60,
    **kwargs,
):
    """Construct a worker.

    Arguments:
        app: Celery application instance.
        timeout: Queue timeout in seconds. Worker will be terminated if the
            queue remains empty after timeout.

    Additional keyword arguments will be passed as celery worker
    configuration.
    """
    self.app = app
    self.timeout = timeout
    self.config = kwargs

monitor(name)

Monitor the worker and stop it when the queue is empty.

Source code in dvc_task/worker/temporary.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def monitor(self, name: str) -> None:
    """Monitor the worker and stop it when the queue is empty."""
    nodename = default_nodename(name)

    def _tasksets(nodes):
        for taskset in (
            nodes.active(),
            nodes.scheduled(),
            nodes.reserved(),
        ):
            if taskset is not None:
                yield from taskset.values()

        if isinstance(self.app, FSApp):
            yield from self.app.iter_queued()

    logger.debug("monitor: watching celery worker '%s'", nodename)
    while True:
        time.sleep(self.timeout)
        nodes = self.app.control.inspect(  # type: ignore[call-arg]
            destination=[nodename],
            limit=1,
        )
        if nodes is None or not any(tasks for tasks in _tasksets(nodes)):
            logger.info("monitor: shutting down due to empty queue.")
            break
    logger.debug("monitor: sending shutdown to '%s'.", nodename)
    self.app.control.shutdown(destination=[nodename])
    logger.debug("monitor: done")

ping(name, timeout=1.0)

Ping the specified worker.

Source code in dvc_task/worker/temporary.py
40
41
42
def ping(self, name: str, timeout: float = 1.0) -> Optional[List[Dict[str, Any]]]:
    """Ping the specified worker."""
    return self._ping(destination=[default_nodename(name)], timeout=timeout)

start(name, fsapp_clean=False)

Start the worker if it does not already exist.

Runs the Celery worker main thread in the current process.

Parameters:

Name Type Description Default
name str

Celery worker name.

required
fsapp_clean bool

Automatically cleanup FSApp broker on shutdown. Has no effect unless app is an FSApp instance.

False
Source code in dvc_task/worker/temporary.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def start(self, name: str, fsapp_clean: bool = False) -> None:
    """Start the worker if it does not already exist.

    Runs the Celery worker main thread in the current process.

    Arguments:
        name: Celery worker name.
        fsapp_clean: Automatically cleanup FSApp broker on shutdown. Has no
            effect unless app is an FSApp instance.
    """
    if os.name == "nt":
        # see https://github.com/celery/billiard/issues/247
        os.environ["FORKED_BY_MULTIPROCESSING"] = "1"

    if not self.ping(name):
        monitor = threading.Thread(
            target=self.monitor,
            daemon=True,
            args=(name,),
        )
        monitor.start()
        config = dict(self.config)
        config["hostname"] = name
        argv = ["worker"]
        argv.extend(self._parse_config(config))
        self.app.worker_main(argv=argv)
        if fsapp_clean and isinstance(self.app, FSApp):  # type: ignore[unreachable]
            logger.info("cleaning up FSApp broker.")
            self.app.clean()
        logger.info("done")