diff --git a/python/run-tests.py b/python/run-tests.py index 83b7d86c11da..a20bb58ae3ef 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -17,11 +17,14 @@ # limitations under the License. # +import asyncio import logging from argparse import ArgumentParser import os import importlib +import io import platform +import pty import re import shutil import subprocess @@ -74,6 +77,103 @@ def get_valid_filename(s): raise RuntimeError("Cannot find assembly build directory, please build Spark first.") +class TestRunner: + def __init__(self, cmd, env, test_output): + self.cmd = cmd + self.env = env + self.test_output = test_output + self.pdb_mode = False + self.loop = asyncio.new_event_loop() + self.master_fd = None + self.write_task = None + + def run(self): + """ + Run a command in subprocess, with stdin, stdout, stderr hooked. + In normaly case, all the outputs from subprocess will be redirected to + the test_output file. + When `(Pdb)` is detected, the subprocess will be in interactive mode, + and the output will be redirected to the console. + """ + self.master_fd, slave_fd = pty.openpty() + + # Start child connected to the PTY + p = subprocess.Popen( + self.cmd, + env=self.env, + stdin=slave_fd, + stdout=slave_fd, + stderr=slave_fd, + ) + os.close(slave_fd) + + self.loop.run_until_complete(self.handle_inout()) + return p.wait() + + async def handle_inout(self): + await self.read_from_child() + + def output_line(self, line): + if self.pdb_mode: + sys.stdout.write(line.decode("utf-8", "replace")) + sys.stdout.flush() + else: + if isinstance(self.test_output, io.TextIOBase): + self.test_output.write(line.decode("utf-8", "replace")) + else: + self.test_output.write(line) + + def process_buffer(self, buffer, force_flush=False): + # Process all full lines first + while (nl := buffer.find(b"\n")) != -1: + self.output_line(buffer[:nl + 1]) + buffer = buffer[nl + 1:] + # Process the remaining buffer + if b"(Pdb)" in buffer: + self.pdb_mode = True + self.output_line(buffer) + return b"" + elif force_flush: + self.output_line(buffer) + return b"" + else: + return buffer + + # Reader: forward child output to our stdout + async def read_from_child(self): + buffer = b"" + while True: + try: + data = await self.loop.run_in_executor(None, os.read, self.master_fd, 1024) + except OSError: + break + if not data: + break + buffer += data + buffer = self.process_buffer(buffer) + if self.pdb_mode and self.write_task is None: + self.write_task = self.loop.create_task(self.write_to_child()) + buffer = self.process_buffer(buffer, force_flush=True) + self.test_output.flush() + + if self.write_task is not None: + self.write_task.cancel() + try: + await self.write_task + except asyncio.CancelledError: + pass + self.write_task = None + self.pdb_mode = False + + # Writer: forward our stdin to child tty + async def write_to_child(self): + while True: + data = await self.loop.run_in_executor(None, sys.stdin.buffer.read, 1) + if not data: + break + os.write(self.master_fd, data) + + def run_individual_python_test(target_dir, test_name, pyspark_python, keep_test_output): """ Runs an individual test. This function is called by the multi-process runner of all tests. @@ -144,9 +244,11 @@ def run_individual_python_test(target_dir, test_name, pyspark_python, keep_test_ "Starting test(%s): %s (temp output: %s)", pyspark_python, test_name, per_test_output.name) start_time = time.time() try: - retcode = subprocess.Popen( + retcode = TestRunner( [os.path.join(SPARK_HOME, "bin/pyspark")] + test_name.split(), - stderr=per_test_output, stdout=per_test_output, env=env).wait() + env, + per_test_output + ).run() if not keep_test_output: # There exists a race condition in Python and it causes flakiness in MacOS # https://github.com/python/cpython/issues/73885