Skip to content

Proc

Process management module.

ManagedProcess

Bases: AbstractContextManager

Class to manage the specified process with redirected output.

stdout and stderr will both be redirected to .out. Interactive processes (requiring stdin input) are currently unsupported.

Source code in dvc_task/proc/process.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class ManagedProcess(AbstractContextManager):
    """Class to manage the specified process with redirected output.

    stdout and stderr will both be redirected to <name>.out.
    Interactive processes (requiring stdin input) are currently unsupported.
    """

    def __init__(
        self,
        args: Union[str, List[str]],
        env: Optional[Dict[str, str]] = None,
        wdir: Optional[str] = None,
        name: Optional[str] = None,
    ):
        """Construct a MangedProcess.

        Arguments:
            args: Command to be run.
            env: Optional environment variables.
            wdir: If specified, redirected output files will be placed in
                `wdir`. Defaults to current working directory.
            name: Name to use for this process, if not specified a UUID will be
                generated instead.
        """
        self.args: List[str] = (
            shlex.split(args, posix=os.name == "posix")
            if isinstance(args, str)
            else list(args)
        )
        self.env = env
        self.wdir = wdir
        self.name = name or uuid()
        self.returncode: Optional[int] = None
        self._fd_stack = ExitStack()
        self._proc: Optional[subprocess.Popen] = None

    def __enter__(self):
        if self._proc is None:
            self.run()
        return self

    def __exit__(self, *args, **kwargs):
        self.wait()

    def _close_fds(self):
        with self._fd_stack:
            pass

    def _make_path(self, path: str) -> str:
        return os.path.join(self.wdir, path) if self.wdir else path

    @cached_property
    def stdout_path(self) -> str:
        """Return redirected stdout path."""
        return self._make_path(f"{self.name}.out")

    @cached_property
    def info_path(self) -> str:
        """Return process information file path."""
        return self._make_path(f"{self.name}.json")

    @cached_property
    def pidfile_path(self) -> str:
        """Return process pidfile path."""
        return self._make_path(f"{self.name}.pid")

    @property
    def info(self) -> "ProcessInfo":
        """Return process information."""
        return ProcessInfo(
            pid=self.pid,
            stdin=None,
            stdout=self.stdout_path,
            stderr=None,
            returncode=self.returncode,
        )

    @property
    def pid(self) -> int:
        """Return process PID.

        Raises:
            ValueError: Process is not running.
        """
        if self._proc is None:
            raise ValueError
        return self._proc.pid

    def _make_wdir(self):
        if self.wdir:
            makedirs(self.wdir, exist_ok=True)

    def _dump(self):
        self._make_wdir()
        self.info.dump(self.info_path)

        with open(self.pidfile_path, "w", encoding="utf-8") as fobj:
            fobj.write(str(self.pid))

    def run(self):
        """Run this process."""
        self._make_wdir()
        logger.debug(
            "Appending output to '%s'",
            self.stdout_path,
        )
        stdout = self._fd_stack.enter_context(open(self.stdout_path, "ab"))  # noqa: SIM115
        try:
            self._proc = subprocess.Popen(  # noqa: S603
                self.args,
                stdin=subprocess.DEVNULL,
                stdout=stdout,
                stderr=subprocess.STDOUT,
                close_fds=True,
                shell=False,
                env=self.env,
            )
            self._dump()
        except Exception:
            if self._proc is not None:
                self._proc.kill()
            self._close_fds()
            raise

    def wait(self, timeout: Optional[int] = None) -> Optional[int]:
        """Block until a process started with `run` has completed.

        Raises:
            TimeoutExpired if `timeout` was set and the process
            did not terminate after `timeout` seconds.
        """
        if self.returncode is not None or self._proc is None:
            return self.returncode
        try:
            self._proc.wait(timeout=timeout)
        except subprocess.TimeoutExpired as exc:
            raise TimeoutExpired(exc.cmd, exc.timeout) from exc
        except KeyboardInterrupt:
            pass
        self.returncode = self._proc.returncode
        self._close_fds()
        self._dump()
        return self.returncode

    @classmethod
    def spawn(cls, *args, **kwargs) -> Optional[int]:
        """Spawn a ManagedProcess command in the background.

        Returns: The spawned process PID.
        """
        proc = _DaemonProcess(
            target=cls._spawn,
            args=args,
            kwargs=kwargs,
            daemon=True,
        )
        proc.start()
        # Do not terminate the child daemon when the main process exits
        mp.process._children.discard(proc)  # type: ignore[attr-defined]
        return proc.pid

    @classmethod
    def _spawn(cls, *args, **kwargs):
        with cls(*args, **kwargs):
            pass

info: ProcessInfo property

Return process information.

pid: int property

Return process PID.

Raises:

Type Description
ValueError

Process is not running.

__init__(args, env=None, wdir=None, name=None)

Construct a MangedProcess.

Parameters:

Name Type Description Default
args Union[str, List[str]]

Command to be run.

required
env Optional[Dict[str, str]]

Optional environment variables.

None
wdir Optional[str]

If specified, redirected output files will be placed in wdir. Defaults to current working directory.

None
name Optional[str]

Name to use for this process, if not specified a UUID will be generated instead.

None
Source code in dvc_task/proc/process.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(
    self,
    args: Union[str, List[str]],
    env: Optional[Dict[str, str]] = None,
    wdir: Optional[str] = None,
    name: Optional[str] = None,
):
    """Construct a MangedProcess.

    Arguments:
        args: Command to be run.
        env: Optional environment variables.
        wdir: If specified, redirected output files will be placed in
            `wdir`. Defaults to current working directory.
        name: Name to use for this process, if not specified a UUID will be
            generated instead.
    """
    self.args: List[str] = (
        shlex.split(args, posix=os.name == "posix")
        if isinstance(args, str)
        else list(args)
    )
    self.env = env
    self.wdir = wdir
    self.name = name or uuid()
    self.returncode: Optional[int] = None
    self._fd_stack = ExitStack()
    self._proc: Optional[subprocess.Popen] = None

info_path()

Return process information file path.

Source code in dvc_task/proc/process.py
120
121
122
123
@cached_property
def info_path(self) -> str:
    """Return process information file path."""
    return self._make_path(f"{self.name}.json")

pidfile_path()

Return process pidfile path.

Source code in dvc_task/proc/process.py
125
126
127
128
@cached_property
def pidfile_path(self) -> str:
    """Return process pidfile path."""
    return self._make_path(f"{self.name}.pid")

run()

Run this process.

Source code in dvc_task/proc/process.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def run(self):
    """Run this process."""
    self._make_wdir()
    logger.debug(
        "Appending output to '%s'",
        self.stdout_path,
    )
    stdout = self._fd_stack.enter_context(open(self.stdout_path, "ab"))  # noqa: SIM115
    try:
        self._proc = subprocess.Popen(  # noqa: S603
            self.args,
            stdin=subprocess.DEVNULL,
            stdout=stdout,
            stderr=subprocess.STDOUT,
            close_fds=True,
            shell=False,
            env=self.env,
        )
        self._dump()
    except Exception:
        if self._proc is not None:
            self._proc.kill()
        self._close_fds()
        raise

spawn(*args, **kwargs) classmethod

Spawn a ManagedProcess command in the background.

Returns: The spawned process PID.

Source code in dvc_task/proc/process.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
@classmethod
def spawn(cls, *args, **kwargs) -> Optional[int]:
    """Spawn a ManagedProcess command in the background.

    Returns: The spawned process PID.
    """
    proc = _DaemonProcess(
        target=cls._spawn,
        args=args,
        kwargs=kwargs,
        daemon=True,
    )
    proc.start()
    # Do not terminate the child daemon when the main process exits
    mp.process._children.discard(proc)  # type: ignore[attr-defined]
    return proc.pid

stdout_path()

Return redirected stdout path.

Source code in dvc_task/proc/process.py
115
116
117
118
@cached_property
def stdout_path(self) -> str:
    """Return redirected stdout path."""
    return self._make_path(f"{self.name}.out")

wait(timeout=None)

Block until a process started with run has completed.

Source code in dvc_task/proc/process.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def wait(self, timeout: Optional[int] = None) -> Optional[int]:
    """Block until a process started with `run` has completed.

    Raises:
        TimeoutExpired if `timeout` was set and the process
        did not terminate after `timeout` seconds.
    """
    if self.returncode is not None or self._proc is None:
        return self.returncode
    try:
        self._proc.wait(timeout=timeout)
    except subprocess.TimeoutExpired as exc:
        raise TimeoutExpired(exc.cmd, exc.timeout) from exc
    except KeyboardInterrupt:
        pass
    self.returncode = self._proc.returncode
    self._close_fds()
    self._dump()
    return self.returncode

ProcessInfo dataclass

Process information.

Source code in dvc_task/proc/process.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@dataclass
class ProcessInfo:
    """Process information."""

    pid: int
    stdin: Optional[str]
    stdout: Optional[str]
    stderr: Optional[str]
    returncode: Optional[int]

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo":
        """Construct ProcessInfo from the specified dictionary."""
        return cls(**data)

    @classmethod
    def load(cls, filename: str) -> "ProcessInfo":
        """Construct the process information from a file."""
        with open(filename, encoding="utf-8") as fobj:
            return cls.from_dict(json.load(fobj))

    def asdict(self) -> Dict[str, Any]:
        """Return this info as a dictionary."""
        return asdict(self)

    def dump(self, filename: str) -> None:
        """Dump the process information into a file."""
        directory, file = os.path.split(filename)
        with tempfile.NamedTemporaryFile(
            mode="w",
            encoding="utf-8",
            dir=directory,
            prefix=f"{file}.",
            suffix=".tmp",
            delete=False,
        ) as tmp:
            json.dump(self.asdict(), tmp)
        os.replace(tmp.name, filename)

asdict()

Return this info as a dictionary.

Source code in dvc_task/proc/process.py
45
46
47
def asdict(self) -> Dict[str, Any]:
    """Return this info as a dictionary."""
    return asdict(self)

dump(filename)

Dump the process information into a file.

Source code in dvc_task/proc/process.py
49
50
51
52
53
54
55
56
57
58
59
60
61
def dump(self, filename: str) -> None:
    """Dump the process information into a file."""
    directory, file = os.path.split(filename)
    with tempfile.NamedTemporaryFile(
        mode="w",
        encoding="utf-8",
        dir=directory,
        prefix=f"{file}.",
        suffix=".tmp",
        delete=False,
    ) as tmp:
        json.dump(self.asdict(), tmp)
    os.replace(tmp.name, filename)

from_dict(data) classmethod

Construct ProcessInfo from the specified dictionary.

Source code in dvc_task/proc/process.py
34
35
36
37
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo":
    """Construct ProcessInfo from the specified dictionary."""
    return cls(**data)

load(filename) classmethod

Construct the process information from a file.

Source code in dvc_task/proc/process.py
39
40
41
42
43
@classmethod
def load(cls, filename: str) -> "ProcessInfo":
    """Construct the process information from a file."""
    with open(filename, encoding="utf-8") as fobj:
        return cls.from_dict(json.load(fobj))

ProcessManager

Manager for controlling background ManagedProcess(es) via celery.

Spawned process entries are kept in the manager directory until they are explicitly removed (with remove() or cleanup()) so that return value and log information can be accessed after a process has completed.

Source code in dvc_task/proc/manager.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class ProcessManager:
    """Manager for controlling background ManagedProcess(es) via celery.

    Spawned process entries are kept in the manager directory until they
    are explicitly removed (with remove() or cleanup()) so that return
    value and log information can be accessed after a process has completed.
    """

    def __init__(
        self,
        wdir: Optional[str] = None,
    ):
        """Construct a ProcessManager

        Arguments:
            wdir: Directory used for storing process information. Defaults
                to the current working directory.
        """
        self.wdir = wdir or os.curdir

    def __iter__(self) -> Generator[str, None, None]:
        if not os.path.exists(self.wdir):
            return
        yield from os.listdir(self.wdir)

    @reraise(FileNotFoundError, KeyError)
    def __getitem__(self, key: str) -> "ProcessInfo":
        info_path = self._get_info_path(key)
        return ProcessInfo.load(info_path)

    @reraise(FileNotFoundError, KeyError)
    def __setitem__(self, key: str, value: "ProcessInfo"):
        info_path = self._get_info_path(key)
        value.dump(info_path)

    def __delitem__(self, key: str) -> None:
        path = os.path.join(self.wdir, key)
        if os.path.exists(path):
            remove(path)

    def _get_info_path(self, key: str) -> str:
        return os.path.join(self.wdir, key, f"{key}.json")

    def get(self, key: str, default=None) -> "ProcessInfo":
        """Return the specified process."""
        try:
            return self[key]
        except KeyError:
            return default

    def processes(self) -> Generator[Tuple[str, "ProcessInfo"], None, None]:
        """Iterate over managed processes."""
        for name in self:
            try:
                yield name, self[name]
            except KeyError:
                continue

    def run_signature(
        self,
        args: Union[str, List[str]],
        name: Optional[str] = None,
        task: Optional[str] = None,
        env: Optional[Dict[str, str]] = None,
        immutable: bool = False,
    ) -> Signature:
        """Return signature for a task which runs a command in the background.

        Arguments:
            args: Command to run.
            name: Optional name to use for the spawned process.
            task: Optional name of Celery task to use for spawning the process.
                Defaults to 'dvc_task.proc.tasks.run'.
            env: Optional environment to be passed into the process.
            immutable: True if the returned Signature should be immutable.

        Returns:
            Celery signature for the run task.
        """
        name = name or uuid()
        task = task or "dvc_task.proc.tasks.run"
        return signature(
            task,
            args=(args,),
            kwargs={
                "name": name,
                "wdir": os.path.join(self.wdir, name),
                "env": env,
            },
            immutable=immutable,
        )

    def send_signal(self, name: str, sig: int, group: bool = False):  # noqa: C901
        """Send `signal` to the specified named process."""
        try:
            process_info = self[name]
        except KeyError as exc:
            raise ProcessLookupError from exc
        if sys.platform == "win32":
            if sig not in (
                signal.SIGTERM,
                signal.CTRL_C_EVENT,
                signal.CTRL_BREAK_EVENT,
            ):
                raise UnsupportedSignalError(sig)

        def handle_closed_process():
            logging.warning("Process '%s' had already aborted unexpectedly.", name)
            process_info.returncode = -1
            self[name] = process_info

        if process_info.returncode is None:
            try:
                if sys.platform != "win32" and group:
                    pgid = os.getpgid(process_info.pid)
                    os.killpg(pgid, sig)
                else:
                    os.kill(process_info.pid, sig)
            except ProcessLookupError:
                handle_closed_process()
                raise
            except OSError as exc:
                if sys.platform == "win32":
                    if exc.winerror == 87:
                        handle_closed_process()
                        raise ProcessLookupError from exc
                raise
        else:
            raise ProcessLookupError

    def interrupt(self, name: str, group: bool = True):
        """Send interrupt signal to specified named process"""
        if sys.platform == "win32":
            self.send_signal(
                name,
                signal.CTRL_C_EVENT,
                group,
            )
        else:
            self.send_signal(name, signal.SIGINT, group)

    def terminate(self, name: str, group: bool = False):
        """Terminate the specified named process."""
        self.send_signal(name, signal.SIGTERM, group)

    def kill(self, name: str, group: bool = False):
        """Kill the specified named process."""
        if sys.platform == "win32":
            self.send_signal(name, signal.SIGTERM, group)
        else:
            self.send_signal(name, signal.SIGKILL, group)

    def remove(self, name: str, force: bool = False):
        """Remove the specified named process from this manager.

        If the specified process is still running, it will be forcefully killed
        if `force` is True`, otherwise an exception will be raised.

        Raises:
            ProcessNotTerminatedError if the specified process is still
            running and was not forcefully killed.
        """
        try:
            process_info = self[name]
        except KeyError:
            return
        if process_info.returncode is None and not force:
            raise ProcessNotTerminatedError(name)
        try:
            self.kill(name)
        except ProcessLookupError:
            pass
        del self[name]

    def cleanup(self, force: bool = False):
        """Remove stale (terminated) processes from this manager."""
        for name in self:
            try:
                self.remove(name, force)
            except ProcessNotTerminatedError:
                continue

    def follow(
        self,
        name: str,
        encoding: Optional[str] = None,
        sleep_interval: int = 1,
    ) -> Generator[str, None, None]:
        """Iterate over lines in redirected output for a process.

        This will block calling thread when waiting for output (until the
        followed process has exited).

        Arguments:
            name: Process name.
            encoding: Text encoding for redirected output. Defaults to
                `locale.getpreferredencoding()`.
            sleep_interval: Sleep interval for follow iterations (when waiting
                for output).

        Note:
            Yielded strings may not always end in line terminators (all
            available output will yielded if EOF is reached).
        """
        output_path = self[name].stdout
        if output_path is None:
            return
        with open(
            output_path,
            encoding=encoding or locale.getpreferredencoding(),
        ) as fobj:
            while True:
                offset = fobj.tell()
                line = fobj.readline()
                if line:
                    yield line
                else:
                    info = self[name]
                    if info.returncode is not None:
                        return
                    time.sleep(sleep_interval)
                    fobj.seek(offset)

__init__(wdir=None)

Construct a ProcessManager

Parameters:

Name Type Description Default
wdir Optional[str]

Directory used for storing process information. Defaults to the current working directory.

None
Source code in dvc_task/proc/manager.py
31
32
33
34
35
36
37
38
39
40
41
def __init__(
    self,
    wdir: Optional[str] = None,
):
    """Construct a ProcessManager

    Arguments:
        wdir: Directory used for storing process information. Defaults
            to the current working directory.
    """
    self.wdir = wdir or os.curdir

cleanup(force=False)

Remove stale (terminated) processes from this manager.

Source code in dvc_task/proc/manager.py
197
198
199
200
201
202
203
def cleanup(self, force: bool = False):
    """Remove stale (terminated) processes from this manager."""
    for name in self:
        try:
            self.remove(name, force)
        except ProcessNotTerminatedError:
            continue

follow(name, encoding=None, sleep_interval=1)

Iterate over lines in redirected output for a process.

This will block calling thread when waiting for output (until the followed process has exited).

Parameters:

Name Type Description Default
name str

Process name.

required
encoding Optional[str]

Text encoding for redirected output. Defaults to locale.getpreferredencoding().

None
sleep_interval int

Sleep interval for follow iterations (when waiting for output).

1
Note

Yielded strings may not always end in line terminators (all available output will yielded if EOF is reached).

Source code in dvc_task/proc/manager.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def follow(
    self,
    name: str,
    encoding: Optional[str] = None,
    sleep_interval: int = 1,
) -> Generator[str, None, None]:
    """Iterate over lines in redirected output for a process.

    This will block calling thread when waiting for output (until the
    followed process has exited).

    Arguments:
        name: Process name.
        encoding: Text encoding for redirected output. Defaults to
            `locale.getpreferredencoding()`.
        sleep_interval: Sleep interval for follow iterations (when waiting
            for output).

    Note:
        Yielded strings may not always end in line terminators (all
        available output will yielded if EOF is reached).
    """
    output_path = self[name].stdout
    if output_path is None:
        return
    with open(
        output_path,
        encoding=encoding or locale.getpreferredencoding(),
    ) as fobj:
        while True:
            offset = fobj.tell()
            line = fobj.readline()
            if line:
                yield line
            else:
                info = self[name]
                if info.returncode is not None:
                    return
                time.sleep(sleep_interval)
                fobj.seek(offset)

get(key, default=None)

Return the specified process.

Source code in dvc_task/proc/manager.py
66
67
68
69
70
71
def get(self, key: str, default=None) -> "ProcessInfo":
    """Return the specified process."""
    try:
        return self[key]
    except KeyError:
        return default

interrupt(name, group=True)

Send interrupt signal to specified named process

Source code in dvc_task/proc/manager.py
153
154
155
156
157
158
159
160
161
162
def interrupt(self, name: str, group: bool = True):
    """Send interrupt signal to specified named process"""
    if sys.platform == "win32":
        self.send_signal(
            name,
            signal.CTRL_C_EVENT,
            group,
        )
    else:
        self.send_signal(name, signal.SIGINT, group)

kill(name, group=False)

Kill the specified named process.

Source code in dvc_task/proc/manager.py
168
169
170
171
172
173
def kill(self, name: str, group: bool = False):
    """Kill the specified named process."""
    if sys.platform == "win32":
        self.send_signal(name, signal.SIGTERM, group)
    else:
        self.send_signal(name, signal.SIGKILL, group)

processes()

Iterate over managed processes.

Source code in dvc_task/proc/manager.py
73
74
75
76
77
78
79
def processes(self) -> Generator[Tuple[str, "ProcessInfo"], None, None]:
    """Iterate over managed processes."""
    for name in self:
        try:
            yield name, self[name]
        except KeyError:
            continue

remove(name, force=False)

Remove the specified named process from this manager.

If the specified process is still running, it will be forcefully killed if force is True`, otherwise an exception will be raised.

Source code in dvc_task/proc/manager.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def remove(self, name: str, force: bool = False):
    """Remove the specified named process from this manager.

    If the specified process is still running, it will be forcefully killed
    if `force` is True`, otherwise an exception will be raised.

    Raises:
        ProcessNotTerminatedError if the specified process is still
        running and was not forcefully killed.
    """
    try:
        process_info = self[name]
    except KeyError:
        return
    if process_info.returncode is None and not force:
        raise ProcessNotTerminatedError(name)
    try:
        self.kill(name)
    except ProcessLookupError:
        pass
    del self[name]

run_signature(args, name=None, task=None, env=None, immutable=False)

Return signature for a task which runs a command in the background.

Parameters:

Name Type Description Default
args Union[str, List[str]]

Command to run.

required
name Optional[str]

Optional name to use for the spawned process.

None
task Optional[str]

Optional name of Celery task to use for spawning the process. Defaults to 'dvc_task.proc.tasks.run'.

None
env Optional[Dict[str, str]]

Optional environment to be passed into the process.

None
immutable bool

True if the returned Signature should be immutable.

False

Returns:

Type Description
Signature

Celery signature for the run task.

Source code in dvc_task/proc/manager.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def run_signature(
    self,
    args: Union[str, List[str]],
    name: Optional[str] = None,
    task: Optional[str] = None,
    env: Optional[Dict[str, str]] = None,
    immutable: bool = False,
) -> Signature:
    """Return signature for a task which runs a command in the background.

    Arguments:
        args: Command to run.
        name: Optional name to use for the spawned process.
        task: Optional name of Celery task to use for spawning the process.
            Defaults to 'dvc_task.proc.tasks.run'.
        env: Optional environment to be passed into the process.
        immutable: True if the returned Signature should be immutable.

    Returns:
        Celery signature for the run task.
    """
    name = name or uuid()
    task = task or "dvc_task.proc.tasks.run"
    return signature(
        task,
        args=(args,),
        kwargs={
            "name": name,
            "wdir": os.path.join(self.wdir, name),
            "env": env,
        },
        immutable=immutable,
    )

send_signal(name, sig, group=False)

Send signal to the specified named process.

Source code in dvc_task/proc/manager.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def send_signal(self, name: str, sig: int, group: bool = False):  # noqa: C901
    """Send `signal` to the specified named process."""
    try:
        process_info = self[name]
    except KeyError as exc:
        raise ProcessLookupError from exc
    if sys.platform == "win32":
        if sig not in (
            signal.SIGTERM,
            signal.CTRL_C_EVENT,
            signal.CTRL_BREAK_EVENT,
        ):
            raise UnsupportedSignalError(sig)

    def handle_closed_process():
        logging.warning("Process '%s' had already aborted unexpectedly.", name)
        process_info.returncode = -1
        self[name] = process_info

    if process_info.returncode is None:
        try:
            if sys.platform != "win32" and group:
                pgid = os.getpgid(process_info.pid)
                os.killpg(pgid, sig)
            else:
                os.kill(process_info.pid, sig)
        except ProcessLookupError:
            handle_closed_process()
            raise
        except OSError as exc:
            if sys.platform == "win32":
                if exc.winerror == 87:
                    handle_closed_process()
                    raise ProcessLookupError from exc
            raise
    else:
        raise ProcessLookupError

terminate(name, group=False)

Terminate the specified named process.

Source code in dvc_task/proc/manager.py
164
165
166
def terminate(self, name: str, group: bool = False):
    """Terminate the specified named process."""
    self.send_signal(name, signal.SIGTERM, group)