Skip to content

Commit 937d045

Browse files
committed
Provide a run() method to ctx to run subprocesses.
Fixes #4 Signed-off-by: Pedro Algarvio <[email protected]>
1 parent 35cce4a commit 937d045

File tree

3 files changed

+220
-0
lines changed

3 files changed

+220
-0
lines changed

changelog/4.improvement.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Provide a `run()` method to `ctx` to run subprocesses.

src/ptscripts/parser.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import sys
1212
import typing
1313
from functools import partial
14+
from subprocess import CompletedProcess
1415
from types import FunctionType
1516
from types import GenericAlias
1617
from typing import Any
@@ -23,6 +24,7 @@
2324
from rich.theme import Theme
2425

2526
from ptscripts import logs
27+
from ptscripts import process
2628

2729
if TYPE_CHECKING:
2830
from argparse import ArgumentParser
@@ -128,6 +130,23 @@ def exit(self, status=0, message=None):
128130
self.console.print(message, style=style)
129131
self.parser.exit(status)
130132

133+
def run(
134+
self,
135+
*cmdline,
136+
check=True,
137+
no_output_timeout_secs: int | None = None,
138+
capture: bool = False,
139+
) -> CompletedProcess[str]:
140+
"""
141+
Run a subprocess.
142+
"""
143+
return process.run(
144+
*cmdline,
145+
check=check,
146+
no_output_timeout_secs=no_output_timeout_secs,
147+
capture=capture,
148+
)
149+
131150

132151
class Parser:
133152
"""
@@ -187,6 +206,18 @@ def __new__(cls):
187206
default=False,
188207
help="Show debug messages",
189208
)
209+
run_options = instance.parser.add_argument_group(
210+
"Run Subprocess Options", description="These options apply to ctx.run() calls"
211+
)
212+
run_options.add_argument(
213+
"--no-output-timeout-secs",
214+
"--nots",
215+
default=None,
216+
type=int,
217+
help="Timeout if no output has been seen for the provided seconds.",
218+
metavar="SECONDS",
219+
dest="no_output_timeout_secs",
220+
)
190221

191222
instance.subparsers = instance.parser.add_subparsers(
192223
title="Commands", dest="command", required=True

src/ptscripts/process.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
from __future__ import annotations
2+
3+
import asyncio.events
4+
import asyncio.streams
5+
import asyncio.subprocess
6+
import logging
7+
import os
8+
import subprocess
9+
import sys
10+
from datetime import datetime
11+
from datetime import timedelta
12+
from typing import cast
13+
from typing import TYPE_CHECKING
14+
15+
from . import logs
16+
17+
log = logging.getLogger(__name__)
18+
19+
20+
class Process(asyncio.subprocess.Process): # noqa: D101
21+
def __init__(self, *args, no_output_timeout_secs: int | timedelta | None = None, **kwargs):
22+
super().__init__(*args, **kwargs)
23+
no_output_timeout_task = None
24+
if isinstance(no_output_timeout_secs, int):
25+
assert no_output_timeout_secs >= 1
26+
no_output_timeout_secs = timedelta(seconds=no_output_timeout_secs)
27+
elif isinstance(no_output_timeout_secs, timedelta):
28+
assert no_output_timeout_secs >= timedelta(seconds=1)
29+
if no_output_timeout_secs is not None:
30+
no_output_timeout_task = self._loop.create_task( # type: ignore[attr-defined]
31+
self._check_no_output_timeout()
32+
)
33+
self._no_output_timeout_secs = no_output_timeout_secs
34+
self._no_output_timeout_task = no_output_timeout_task
35+
36+
async def _check_no_output_timeout(self):
37+
self._protocol._last_write = datetime.utcnow() # type: ignore[attr-defined]
38+
try:
39+
while self.returncode is None:
40+
await asyncio.sleep(1)
41+
last_write = self._protocol._last_write # type: ignore[attr-defined]
42+
if TYPE_CHECKING:
43+
assert self._no_output_timeout_secs
44+
if last_write + self._no_output_timeout_secs < datetime.utcnow():
45+
try:
46+
self.terminate()
47+
log.warning(
48+
"No output on has been seen for over %s second(s). "
49+
"Terminating process.",
50+
self._no_output_timeout_secs.seconds,
51+
)
52+
except ProcessLookupError:
53+
pass
54+
break
55+
except asyncio.CancelledError:
56+
pass
57+
58+
async def _cancel_no_output_timeout_task(self):
59+
task = self._no_output_timeout_task
60+
if task is None:
61+
return
62+
self._no_output_timeout_task = None
63+
if task.done():
64+
return
65+
if not task.cancelled():
66+
task.cancel()
67+
await task
68+
69+
async def wait(self):
70+
"""
71+
Wait until the process exit and return the process return code.
72+
"""
73+
retcode = await super().wait()
74+
await self._cancel_no_output_timeout_task()
75+
return retcode
76+
77+
78+
class SubprocessStreamProtocol(asyncio.subprocess.SubprocessStreamProtocol): # noqa: D101
79+
def __init__(self, *args, capture=False, **kwargs):
80+
super().__init__(*args, **kwargs)
81+
self._capture = capture
82+
self._last_write = None
83+
84+
def pipe_data_received(self, fd, data): # noqa: D102
85+
self._last_write = datetime.utcnow()
86+
if self._capture:
87+
super().pipe_data_received(fd, data)
88+
return
89+
data = data.decode("utf-8")
90+
if logs.include_timestamps() or "CI" in os.environ:
91+
if not data.strip():
92+
return
93+
if fd == 1:
94+
log.stdout(data) # type: ignore[attr-defined]
95+
else:
96+
log.stderr(data) # type: ignore[attr-defined]
97+
else:
98+
if fd == 1:
99+
sys.stdout.write(data)
100+
sys.stdout.flush()
101+
else:
102+
sys.stderr.write(data)
103+
sys.stderr.flush()
104+
105+
106+
async def _create_subprocess_exec(
107+
program,
108+
*args,
109+
stdin=None,
110+
stdout=None,
111+
stderr=None,
112+
limit=asyncio.streams._DEFAULT_LIMIT, # type: ignore[attr-defined]
113+
no_output_timeout_secs: int | None = None,
114+
capture: bool = False,
115+
**kwds,
116+
):
117+
def protocol_factory():
118+
return SubprocessStreamProtocol(limit=limit, loop=loop, capture=capture)
119+
120+
loop = asyncio.events.get_running_loop()
121+
transport, protocol = await loop.subprocess_exec(
122+
protocol_factory,
123+
program,
124+
*args,
125+
stdin=stdin,
126+
stdout=stdout,
127+
stderr=stderr,
128+
**kwds,
129+
)
130+
return Process(transport, protocol, loop, no_output_timeout_secs=no_output_timeout_secs)
131+
132+
133+
async def _subprocess_run(
134+
f,
135+
cmdline,
136+
check=True,
137+
no_output_timeout_secs: int | None = None,
138+
capture: bool = False,
139+
):
140+
stdout = subprocess.PIPE
141+
stderr = subprocess.PIPE
142+
proc = await _create_subprocess_exec(
143+
*cmdline,
144+
stdout=stdout,
145+
stderr=stderr,
146+
stdin=sys.stdin,
147+
limit=1,
148+
no_output_timeout_secs=no_output_timeout_secs,
149+
capture=capture,
150+
)
151+
stdout, stderr = await proc.communicate()
152+
result = subprocess.CompletedProcess(
153+
args=cmdline,
154+
stdout=stdout,
155+
stderr=stderr,
156+
returncode=proc.returncode,
157+
)
158+
f.set_result(result)
159+
160+
161+
def run(
162+
*cmdline,
163+
check=True,
164+
no_output_timeout_secs: int | None = None,
165+
capture: bool = False,
166+
) -> subprocess.CompletedProcess[str]:
167+
"""
168+
Run a command.
169+
"""
170+
loop = asyncio.new_event_loop()
171+
future = loop.create_future()
172+
try:
173+
loop.run_until_complete(
174+
_subprocess_run(
175+
future,
176+
cmdline,
177+
check,
178+
no_output_timeout_secs=no_output_timeout_secs,
179+
capture=capture,
180+
)
181+
)
182+
result = future.result()
183+
if check is True:
184+
result.check_returncode()
185+
return cast(subprocess.CompletedProcess[str], result)
186+
finally:
187+
loop.run_until_complete(loop.shutdown_asyncgens())
188+
loop.close()

0 commit comments

Comments
 (0)