prefix += ': '
timeout = timeout or ctx.timeout
- async def tee(reader: asyncio.StreamReader) -> str:
- collected = StringIO()
- async for line in reader:
- message = line.decode('utf-8')
- collected.write(message)
- return collected.getvalue()
-
async def run_with_timeout() -> Tuple[str, str, int]:
process = await asyncio.create_subprocess_exec(
*command,
assert process.stdout
assert process.stderr
try:
- stdout, stderr = await asyncio.gather(tee(process.stdout),
- tee(process.stderr))
- returncode = await asyncio.wait_for(process.wait(), timeout)
+ stdout, stderr = await asyncio.wait_for(
+ process.communicate(),
+ timeout,
+ )
except asyncio.TimeoutError:
+ # try to terminate the process assuming it is still running. It's
+ # possible that even after killing the process it will not
+ # complete, particularly if it is D-state. If that happens the
+ # process.wait call will block, but we're no worse off than before
+ # when the timeout did not work. Additionally, there are other
+ # corner-cases we could try and handle here but we decided to start
+ # simple.
+ process.kill()
+ await process.wait()
logger.info(prefix + f'timeout after {timeout} seconds')
return '', '', 124
else:
- return stdout, stderr, returncode
+ assert process.returncode is not None
+ return (
+ stdout.decode('utf-8'),
+ stderr.decode('utf-8'),
+ process.returncode,
+ )
stdout, stderr, returncode = async_run(run_with_timeout())
log_level = verbosity.success_log_level()