Skip to content

Process

Managed process module.

ManagedProcess

Bases: AbstractContextManager

Class to manage the specified process with redirected output.

stdout and stderr will both be redirected to .out. Interactive processes (requiring stdin input) are currently unsupported.

Source code in dvc_task/proc/process.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class ManagedProcess(AbstractContextManager):
    """Class to manage the specified process with redirected output.

    stdout and stderr will both be redirected to <name>.out.
    Interactive processes (requiring stdin input) are currently unsupported.
    """

    def __init__(
        self,
        args: Union[str, List[str]],
        env: Optional[Dict[str, str]] = None,
        wdir: Optional[str] = None,
        name: Optional[str] = None,
    ):
        """Construct a MangedProcess.

        Arguments:
            args: Command to be run.
            env: Optional environment variables.
            wdir: If specified, redirected output files will be placed in
                `wdir`. Defaults to current working directory.
            name: Name to use for this process, if not specified a UUID will be
                generated instead.
        """
        self.args: List[str] = (
            shlex.split(args, posix=os.name == "posix")
            if isinstance(args, str)
            else list(args)
        )
        self.env = env
        self.wdir = wdir
        self.name = name or uuid()
        self.returncode: Optional[int] = None
        self._fd_stack = ExitStack()
        self._proc: Optional[subprocess.Popen] = None

    def __enter__(self):
        if self._proc is None:
            self.run()
        return self

    def __exit__(self, *args, **kwargs):
        self.wait()

    def _close_fds(self):
        with self._fd_stack:
            pass

    def _make_path(self, path: str) -> str:
        return os.path.join(self.wdir, path) if self.wdir else path

    @cached_property
    def stdout_path(self) -> str:
        """Return redirected stdout path."""
        return self._make_path(f"{self.name}.out")

    @cached_property
    def info_path(self) -> str:
        """Return process information file path."""
        return self._make_path(f"{self.name}.json")

    @cached_property
    def pidfile_path(self) -> str:
        """Return process pidfile path."""
        return self._make_path(f"{self.name}.pid")

    @property
    def info(self) -> "ProcessInfo":
        """Return process information."""
        return ProcessInfo(
            pid=self.pid,
            stdin=None,
            stdout=self.stdout_path,
            stderr=None,
            returncode=self.returncode,
        )

    @property
    def pid(self) -> int:
        """Return process PID.

        Raises:
            ValueError: Process is not running.
        """
        if self._proc is None:
            raise ValueError
        return self._proc.pid

    def _make_wdir(self):
        if self.wdir:
            makedirs(self.wdir, exist_ok=True)

    def _dump(self):
        self._make_wdir()
        self.info.dump(self.info_path)

        with open(self.pidfile_path, "w", encoding="utf-8") as fobj:
            fobj.write(str(self.pid))

    def run(self):
        """Run this process."""
        self._make_wdir()
        logger.debug(
            "Appending output to '%s'",
            self.stdout_path,
        )
        stdout = self._fd_stack.enter_context(open(self.stdout_path, "ab"))  # noqa: SIM115
        try:
            self._proc = subprocess.Popen(  # noqa: S603
                self.args,
                stdin=subprocess.DEVNULL,
                stdout=stdout,
                stderr=subprocess.STDOUT,
                close_fds=True,
                shell=False,
                env=self.env,
            )
            self._dump()
        except Exception:
            if self._proc is not None:
                self._proc.kill()
            self._close_fds()
            raise

    def wait(self, timeout: Optional[int] = None) -> Optional[int]:
        """Block until a process started with `run` has completed.

        Raises:
            TimeoutExpired if `timeout` was set and the process
            did not terminate after `timeout` seconds.
        """
        if self.returncode is not None or self._proc is None:
            return self.returncode
        try:
            self._proc.wait(timeout=timeout)
        except subprocess.TimeoutExpired as exc:
            raise TimeoutExpired(exc.cmd, exc.timeout) from exc
        except KeyboardInterrupt:
            pass
        self.returncode = self._proc.returncode
        self._close_fds()
        self._dump()
        return self.returncode

    @classmethod
    def spawn(cls, *args, **kwargs) -> Optional[int]:
        """Spawn a ManagedProcess command in the background.

        Returns: The spawned process PID.
        """
        proc = _DaemonProcess(
            target=cls._spawn,
            args=args,
            kwargs=kwargs,
            daemon=True,
        )
        proc.start()
        # Do not terminate the child daemon when the main process exits
        mp.process._children.discard(proc)  # type: ignore[attr-defined]
        return proc.pid

    @classmethod
    def _spawn(cls, *args, **kwargs):
        with cls(*args, **kwargs):
            pass

info: ProcessInfo property

Return process information.

pid: int property

Return process PID.

Raises:

Type Description
ValueError

Process is not running.

__init__(args, env=None, wdir=None, name=None)

Construct a MangedProcess.

Parameters:

Name Type Description Default
args Union[str, List[str]]

Command to be run.

required
env Optional[Dict[str, str]]

Optional environment variables.

None
wdir Optional[str]

If specified, redirected output files will be placed in wdir. Defaults to current working directory.

None
name Optional[str]

Name to use for this process, if not specified a UUID will be generated instead.

None
Source code in dvc_task/proc/process.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(
    self,
    args: Union[str, List[str]],
    env: Optional[Dict[str, str]] = None,
    wdir: Optional[str] = None,
    name: Optional[str] = None,
):
    """Construct a MangedProcess.

    Arguments:
        args: Command to be run.
        env: Optional environment variables.
        wdir: If specified, redirected output files will be placed in
            `wdir`. Defaults to current working directory.
        name: Name to use for this process, if not specified a UUID will be
            generated instead.
    """
    self.args: List[str] = (
        shlex.split(args, posix=os.name == "posix")
        if isinstance(args, str)
        else list(args)
    )
    self.env = env
    self.wdir = wdir
    self.name = name or uuid()
    self.returncode: Optional[int] = None
    self._fd_stack = ExitStack()
    self._proc: Optional[subprocess.Popen] = None

info_path()

Return process information file path.

Source code in dvc_task/proc/process.py
120
121
122
123
@cached_property
def info_path(self) -> str:
    """Return process information file path."""
    return self._make_path(f"{self.name}.json")

pidfile_path()

Return process pidfile path.

Source code in dvc_task/proc/process.py
125
126
127
128
@cached_property
def pidfile_path(self) -> str:
    """Return process pidfile path."""
    return self._make_path(f"{self.name}.pid")

run()

Run this process.

Source code in dvc_task/proc/process.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def run(self):
    """Run this process."""
    self._make_wdir()
    logger.debug(
        "Appending output to '%s'",
        self.stdout_path,
    )
    stdout = self._fd_stack.enter_context(open(self.stdout_path, "ab"))  # noqa: SIM115
    try:
        self._proc = subprocess.Popen(  # noqa: S603
            self.args,
            stdin=subprocess.DEVNULL,
            stdout=stdout,
            stderr=subprocess.STDOUT,
            close_fds=True,
            shell=False,
            env=self.env,
        )
        self._dump()
    except Exception:
        if self._proc is not None:
            self._proc.kill()
        self._close_fds()
        raise

spawn(*args, **kwargs) classmethod

Spawn a ManagedProcess command in the background.

Returns: The spawned process PID.

Source code in dvc_task/proc/process.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
@classmethod
def spawn(cls, *args, **kwargs) -> Optional[int]:
    """Spawn a ManagedProcess command in the background.

    Returns: The spawned process PID.
    """
    proc = _DaemonProcess(
        target=cls._spawn,
        args=args,
        kwargs=kwargs,
        daemon=True,
    )
    proc.start()
    # Do not terminate the child daemon when the main process exits
    mp.process._children.discard(proc)  # type: ignore[attr-defined]
    return proc.pid

stdout_path()

Return redirected stdout path.

Source code in dvc_task/proc/process.py
115
116
117
118
@cached_property
def stdout_path(self) -> str:
    """Return redirected stdout path."""
    return self._make_path(f"{self.name}.out")

wait(timeout=None)

Block until a process started with run has completed.

Source code in dvc_task/proc/process.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def wait(self, timeout: Optional[int] = None) -> Optional[int]:
    """Block until a process started with `run` has completed.

    Raises:
        TimeoutExpired if `timeout` was set and the process
        did not terminate after `timeout` seconds.
    """
    if self.returncode is not None or self._proc is None:
        return self.returncode
    try:
        self._proc.wait(timeout=timeout)
    except subprocess.TimeoutExpired as exc:
        raise TimeoutExpired(exc.cmd, exc.timeout) from exc
    except KeyboardInterrupt:
        pass
    self.returncode = self._proc.returncode
    self._close_fds()
    self._dump()
    return self.returncode

ProcessInfo dataclass

Process information.

Source code in dvc_task/proc/process.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@dataclass
class ProcessInfo:
    """Process information."""

    pid: int
    stdin: Optional[str]
    stdout: Optional[str]
    stderr: Optional[str]
    returncode: Optional[int]

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo":
        """Construct ProcessInfo from the specified dictionary."""
        return cls(**data)

    @classmethod
    def load(cls, filename: str) -> "ProcessInfo":
        """Construct the process information from a file."""
        with open(filename, encoding="utf-8") as fobj:
            return cls.from_dict(json.load(fobj))

    def asdict(self) -> Dict[str, Any]:
        """Return this info as a dictionary."""
        return asdict(self)

    def dump(self, filename: str) -> None:
        """Dump the process information into a file."""
        directory, file = os.path.split(filename)
        with tempfile.NamedTemporaryFile(
            mode="w",
            encoding="utf-8",
            dir=directory,
            prefix=f"{file}.",
            suffix=".tmp",
            delete=False,
        ) as tmp:
            json.dump(self.asdict(), tmp)
        os.replace(tmp.name, filename)

asdict()

Return this info as a dictionary.

Source code in dvc_task/proc/process.py
45
46
47
def asdict(self) -> Dict[str, Any]:
    """Return this info as a dictionary."""
    return asdict(self)

dump(filename)

Dump the process information into a file.

Source code in dvc_task/proc/process.py
49
50
51
52
53
54
55
56
57
58
59
60
61
def dump(self, filename: str) -> None:
    """Dump the process information into a file."""
    directory, file = os.path.split(filename)
    with tempfile.NamedTemporaryFile(
        mode="w",
        encoding="utf-8",
        dir=directory,
        prefix=f"{file}.",
        suffix=".tmp",
        delete=False,
    ) as tmp:
        json.dump(self.asdict(), tmp)
    os.replace(tmp.name, filename)

from_dict(data) classmethod

Construct ProcessInfo from the specified dictionary.

Source code in dvc_task/proc/process.py
34
35
36
37
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo":
    """Construct ProcessInfo from the specified dictionary."""
    return cls(**data)

load(filename) classmethod

Construct the process information from a file.

Source code in dvc_task/proc/process.py
39
40
41
42
43
@classmethod
def load(cls, filename: str) -> "ProcessInfo":
    """Construct the process information from a file."""
    with open(filename, encoding="utf-8") as fobj:
        return cls.from_dict(json.load(fobj))