Skip to content

Manager

Serverless process manager.

ProcessManager

Manager for controlling background ManagedProcess(es) via celery.

Spawned process entries are kept in the manager directory until they are explicitly removed (with remove() or cleanup()) so that return value and log information can be accessed after a process has completed.

Source code in dvc_task/proc/manager.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class ProcessManager:
    """Manager for controlling background ManagedProcess(es) via celery.

    Spawned process entries are kept in the manager directory until they
    are explicitly removed (with remove() or cleanup()) so that return
    value and log information can be accessed after a process has completed.
    """

    def __init__(
        self,
        wdir: Optional[str] = None,
    ):
        """Construct a ProcessManager

        Arguments:
            wdir: Directory used for storing process information. Defaults
                to the current working directory.
        """
        self.wdir = wdir or os.curdir

    def __iter__(self) -> Generator[str, None, None]:
        if not os.path.exists(self.wdir):
            return
        yield from os.listdir(self.wdir)

    @reraise(FileNotFoundError, KeyError)
    def __getitem__(self, key: str) -> "ProcessInfo":
        info_path = self._get_info_path(key)
        return ProcessInfo.load(info_path)

    @reraise(FileNotFoundError, KeyError)
    def __setitem__(self, key: str, value: "ProcessInfo"):
        info_path = self._get_info_path(key)
        value.dump(info_path)

    def __delitem__(self, key: str) -> None:
        path = os.path.join(self.wdir, key)
        if os.path.exists(path):
            remove(path)

    def _get_info_path(self, key: str) -> str:
        return os.path.join(self.wdir, key, f"{key}.json")

    def get(self, key: str, default=None) -> "ProcessInfo":
        """Return the specified process."""
        try:
            return self[key]
        except KeyError:
            return default

    def processes(self) -> Generator[Tuple[str, "ProcessInfo"], None, None]:
        """Iterate over managed processes."""
        for name in self:
            try:
                yield name, self[name]
            except KeyError:
                continue

    def run_signature(
        self,
        args: Union[str, List[str]],
        name: Optional[str] = None,
        task: Optional[str] = None,
        env: Optional[Dict[str, str]] = None,
        immutable: bool = False,
    ) -> Signature:
        """Return signature for a task which runs a command in the background.

        Arguments:
            args: Command to run.
            name: Optional name to use for the spawned process.
            task: Optional name of Celery task to use for spawning the process.
                Defaults to 'dvc_task.proc.tasks.run'.
            env: Optional environment to be passed into the process.
            immutable: True if the returned Signature should be immutable.

        Returns:
            Celery signature for the run task.
        """
        name = name or uuid()
        task = task or "dvc_task.proc.tasks.run"
        return signature(
            task,
            args=(args,),
            kwargs={
                "name": name,
                "wdir": os.path.join(self.wdir, name),
                "env": env,
            },
            immutable=immutable,
        )

    def send_signal(self, name: str, sig: int, group: bool = False):  # noqa: C901
        """Send `signal` to the specified named process."""
        try:
            process_info = self[name]
        except KeyError as exc:
            raise ProcessLookupError from exc
        if sys.platform == "win32":
            if sig not in (
                signal.SIGTERM,
                signal.CTRL_C_EVENT,
                signal.CTRL_BREAK_EVENT,
            ):
                raise UnsupportedSignalError(sig)

        def handle_closed_process():
            logging.warning("Process '%s' had already aborted unexpectedly.", name)
            process_info.returncode = -1
            self[name] = process_info

        if process_info.returncode is None:
            try:
                if sys.platform != "win32" and group:
                    pgid = os.getpgid(process_info.pid)
                    os.killpg(pgid, sig)
                else:
                    os.kill(process_info.pid, sig)
            except ProcessLookupError:
                handle_closed_process()
                raise
            except OSError as exc:
                if sys.platform == "win32":
                    if exc.winerror == 87:
                        handle_closed_process()
                        raise ProcessLookupError from exc
                raise
        else:
            raise ProcessLookupError

    def interrupt(self, name: str, group: bool = True):
        """Send interrupt signal to specified named process"""
        if sys.platform == "win32":
            self.send_signal(
                name,
                signal.CTRL_C_EVENT,
                group,
            )
        else:
            self.send_signal(name, signal.SIGINT, group)

    def terminate(self, name: str, group: bool = False):
        """Terminate the specified named process."""
        self.send_signal(name, signal.SIGTERM, group)

    def kill(self, name: str, group: bool = False):
        """Kill the specified named process."""
        if sys.platform == "win32":
            self.send_signal(name, signal.SIGTERM, group)
        else:
            self.send_signal(name, signal.SIGKILL, group)

    def remove(self, name: str, force: bool = False):
        """Remove the specified named process from this manager.

        If the specified process is still running, it will be forcefully killed
        if `force` is True`, otherwise an exception will be raised.

        Raises:
            ProcessNotTerminatedError if the specified process is still
            running and was not forcefully killed.
        """
        try:
            process_info = self[name]
        except KeyError:
            return
        if process_info.returncode is None and not force:
            raise ProcessNotTerminatedError(name)
        try:
            self.kill(name)
        except ProcessLookupError:
            pass
        del self[name]

    def cleanup(self, force: bool = False):
        """Remove stale (terminated) processes from this manager."""
        for name in self:
            try:
                self.remove(name, force)
            except ProcessNotTerminatedError:
                continue

    def follow(
        self,
        name: str,
        encoding: Optional[str] = None,
        sleep_interval: int = 1,
    ) -> Generator[str, None, None]:
        """Iterate over lines in redirected output for a process.

        This will block calling thread when waiting for output (until the
        followed process has exited).

        Arguments:
            name: Process name.
            encoding: Text encoding for redirected output. Defaults to
                `locale.getpreferredencoding()`.
            sleep_interval: Sleep interval for follow iterations (when waiting
                for output).

        Note:
            Yielded strings may not always end in line terminators (all
            available output will yielded if EOF is reached).
        """
        output_path = self[name].stdout
        if output_path is None:
            return
        with open(
            output_path,
            encoding=encoding or locale.getpreferredencoding(),
        ) as fobj:
            while True:
                offset = fobj.tell()
                line = fobj.readline()
                if line:
                    yield line
                else:
                    info = self[name]
                    if info.returncode is not None:
                        return
                    time.sleep(sleep_interval)
                    fobj.seek(offset)

__init__(wdir=None)

Construct a ProcessManager

Parameters:

Name Type Description Default
wdir Optional[str]

Directory used for storing process information. Defaults to the current working directory.

None
Source code in dvc_task/proc/manager.py
31
32
33
34
35
36
37
38
39
40
41
def __init__(
    self,
    wdir: Optional[str] = None,
):
    """Construct a ProcessManager

    Arguments:
        wdir: Directory used for storing process information. Defaults
            to the current working directory.
    """
    self.wdir = wdir or os.curdir

cleanup(force=False)

Remove stale (terminated) processes from this manager.

Source code in dvc_task/proc/manager.py
197
198
199
200
201
202
203
def cleanup(self, force: bool = False):
    """Remove stale (terminated) processes from this manager."""
    for name in self:
        try:
            self.remove(name, force)
        except ProcessNotTerminatedError:
            continue

follow(name, encoding=None, sleep_interval=1)

Iterate over lines in redirected output for a process.

This will block calling thread when waiting for output (until the followed process has exited).

Parameters:

Name Type Description Default
name str

Process name.

required
encoding Optional[str]

Text encoding for redirected output. Defaults to locale.getpreferredencoding().

None
sleep_interval int

Sleep interval for follow iterations (when waiting for output).

1
Note

Yielded strings may not always end in line terminators (all available output will yielded if EOF is reached).

Source code in dvc_task/proc/manager.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def follow(
    self,
    name: str,
    encoding: Optional[str] = None,
    sleep_interval: int = 1,
) -> Generator[str, None, None]:
    """Iterate over lines in redirected output for a process.

    This will block calling thread when waiting for output (until the
    followed process has exited).

    Arguments:
        name: Process name.
        encoding: Text encoding for redirected output. Defaults to
            `locale.getpreferredencoding()`.
        sleep_interval: Sleep interval for follow iterations (when waiting
            for output).

    Note:
        Yielded strings may not always end in line terminators (all
        available output will yielded if EOF is reached).
    """
    output_path = self[name].stdout
    if output_path is None:
        return
    with open(
        output_path,
        encoding=encoding or locale.getpreferredencoding(),
    ) as fobj:
        while True:
            offset = fobj.tell()
            line = fobj.readline()
            if line:
                yield line
            else:
                info = self[name]
                if info.returncode is not None:
                    return
                time.sleep(sleep_interval)
                fobj.seek(offset)

get(key, default=None)

Return the specified process.

Source code in dvc_task/proc/manager.py
66
67
68
69
70
71
def get(self, key: str, default=None) -> "ProcessInfo":
    """Return the specified process."""
    try:
        return self[key]
    except KeyError:
        return default

interrupt(name, group=True)

Send interrupt signal to specified named process

Source code in dvc_task/proc/manager.py
153
154
155
156
157
158
159
160
161
162
def interrupt(self, name: str, group: bool = True):
    """Send interrupt signal to specified named process"""
    if sys.platform == "win32":
        self.send_signal(
            name,
            signal.CTRL_C_EVENT,
            group,
        )
    else:
        self.send_signal(name, signal.SIGINT, group)

kill(name, group=False)

Kill the specified named process.

Source code in dvc_task/proc/manager.py
168
169
170
171
172
173
def kill(self, name: str, group: bool = False):
    """Kill the specified named process."""
    if sys.platform == "win32":
        self.send_signal(name, signal.SIGTERM, group)
    else:
        self.send_signal(name, signal.SIGKILL, group)

processes()

Iterate over managed processes.

Source code in dvc_task/proc/manager.py
73
74
75
76
77
78
79
def processes(self) -> Generator[Tuple[str, "ProcessInfo"], None, None]:
    """Iterate over managed processes."""
    for name in self:
        try:
            yield name, self[name]
        except KeyError:
            continue

remove(name, force=False)

Remove the specified named process from this manager.

If the specified process is still running, it will be forcefully killed if force is True`, otherwise an exception will be raised.

Source code in dvc_task/proc/manager.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def remove(self, name: str, force: bool = False):
    """Remove the specified named process from this manager.

    If the specified process is still running, it will be forcefully killed
    if `force` is True`, otherwise an exception will be raised.

    Raises:
        ProcessNotTerminatedError if the specified process is still
        running and was not forcefully killed.
    """
    try:
        process_info = self[name]
    except KeyError:
        return
    if process_info.returncode is None and not force:
        raise ProcessNotTerminatedError(name)
    try:
        self.kill(name)
    except ProcessLookupError:
        pass
    del self[name]

run_signature(args, name=None, task=None, env=None, immutable=False)

Return signature for a task which runs a command in the background.

Parameters:

Name Type Description Default
args Union[str, List[str]]

Command to run.

required
name Optional[str]

Optional name to use for the spawned process.

None
task Optional[str]

Optional name of Celery task to use for spawning the process. Defaults to 'dvc_task.proc.tasks.run'.

None
env Optional[Dict[str, str]]

Optional environment to be passed into the process.

None
immutable bool

True if the returned Signature should be immutable.

False

Returns:

Type Description
Signature

Celery signature for the run task.

Source code in dvc_task/proc/manager.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def run_signature(
    self,
    args: Union[str, List[str]],
    name: Optional[str] = None,
    task: Optional[str] = None,
    env: Optional[Dict[str, str]] = None,
    immutable: bool = False,
) -> Signature:
    """Return signature for a task which runs a command in the background.

    Arguments:
        args: Command to run.
        name: Optional name to use for the spawned process.
        task: Optional name of Celery task to use for spawning the process.
            Defaults to 'dvc_task.proc.tasks.run'.
        env: Optional environment to be passed into the process.
        immutable: True if the returned Signature should be immutable.

    Returns:
        Celery signature for the run task.
    """
    name = name or uuid()
    task = task or "dvc_task.proc.tasks.run"
    return signature(
        task,
        args=(args,),
        kwargs={
            "name": name,
            "wdir": os.path.join(self.wdir, name),
            "env": env,
        },
        immutable=immutable,
    )

send_signal(name, sig, group=False)

Send signal to the specified named process.

Source code in dvc_task/proc/manager.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def send_signal(self, name: str, sig: int, group: bool = False):  # noqa: C901
    """Send `signal` to the specified named process."""
    try:
        process_info = self[name]
    except KeyError as exc:
        raise ProcessLookupError from exc
    if sys.platform == "win32":
        if sig not in (
            signal.SIGTERM,
            signal.CTRL_C_EVENT,
            signal.CTRL_BREAK_EVENT,
        ):
            raise UnsupportedSignalError(sig)

    def handle_closed_process():
        logging.warning("Process '%s' had already aborted unexpectedly.", name)
        process_info.returncode = -1
        self[name] = process_info

    if process_info.returncode is None:
        try:
            if sys.platform != "win32" and group:
                pgid = os.getpgid(process_info.pid)
                os.killpg(pgid, sig)
            else:
                os.kill(process_info.pid, sig)
        except ProcessLookupError:
            handle_closed_process()
            raise
        except OSError as exc:
            if sys.platform == "win32":
                if exc.winerror == 87:
                    handle_closed_process()
                    raise ProcessLookupError from exc
            raise
    else:
        raise ProcessLookupError

terminate(name, group=False)

Terminate the specified named process.

Source code in dvc_task/proc/manager.py
164
165
166
def terminate(self, name: str, group: bool = False):
    """Terminate the specified named process."""
    self.send_signal(name, signal.SIGTERM, group)