Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 104 additions & 2 deletions python/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
# limitations under the License.
#

import asyncio
import logging
from argparse import ArgumentParser
import os
import importlib
import io
import platform
import pty
import re
import shutil
import subprocess
Expand Down Expand Up @@ -74,6 +77,103 @@ def get_valid_filename(s):
raise RuntimeError("Cannot find assembly build directory, please build Spark first.")


class TestRunner:
def __init__(self, cmd, env, test_output):
self.cmd = cmd
self.env = env
self.test_output = test_output
self.pdb_mode = False
self.loop = asyncio.new_event_loop()
self.master_fd = None
self.write_task = None

def run(self):
"""
Run a command in subprocess, with stdin, stdout, stderr hooked.
In normaly case, all the outputs from subprocess will be redirected to
the test_output file.
When `(Pdb)` is detected, the subprocess will be in interactive mode,
and the output will be redirected to the console.
"""
self.master_fd, slave_fd = pty.openpty()

# Start child connected to the PTY
p = subprocess.Popen(
self.cmd,
env=self.env,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
)
os.close(slave_fd)

self.loop.run_until_complete(self.handle_inout())
return p.wait()

async def handle_inout(self):
await self.read_from_child()

def output_line(self, line):
if self.pdb_mode:
sys.stdout.write(line.decode("utf-8", "replace"))
sys.stdout.flush()
else:
if isinstance(self.test_output, io.TextIOBase):
self.test_output.write(line.decode("utf-8", "replace"))
else:
self.test_output.write(line)

def process_buffer(self, buffer, force_flush=False):
# Process all full lines first
while (nl := buffer.find(b"\n")) != -1:
self.output_line(buffer[:nl + 1])
buffer = buffer[nl + 1:]
# Process the remaining buffer
if b"(Pdb)" in buffer:
self.pdb_mode = True
self.output_line(buffer)
return b""
elif force_flush:
self.output_line(buffer)
return b""
else:
return buffer

# Reader: forward child output to our stdout
async def read_from_child(self):
buffer = b""
while True:
try:
data = await self.loop.run_in_executor(None, os.read, self.master_fd, 1024)
except OSError:
break
if not data:
break
buffer += data
buffer = self.process_buffer(buffer)
if self.pdb_mode and self.write_task is None:
self.write_task = self.loop.create_task(self.write_to_child())
buffer = self.process_buffer(buffer, force_flush=True)
self.test_output.flush()

if self.write_task is not None:
self.write_task.cancel()
try:
await self.write_task
except asyncio.CancelledError:
pass
self.write_task = None
self.pdb_mode = False

# Writer: forward our stdin to child tty
async def write_to_child(self):
while True:
data = await self.loop.run_in_executor(None, sys.stdin.buffer.read, 1)
if not data:
break
os.write(self.master_fd, data)


def run_individual_python_test(target_dir, test_name, pyspark_python, keep_test_output):
"""
Runs an individual test. This function is called by the multi-process runner of all tests.
Expand Down Expand Up @@ -144,9 +244,11 @@ def run_individual_python_test(target_dir, test_name, pyspark_python, keep_test_
"Starting test(%s): %s (temp output: %s)", pyspark_python, test_name, per_test_output.name)
start_time = time.time()
try:
retcode = subprocess.Popen(
retcode = TestRunner(
[os.path.join(SPARK_HOME, "bin/pyspark")] + test_name.split(),
stderr=per_test_output, stdout=per_test_output, env=env).wait()
env,
per_test_output
).run()
if not keep_test_output:
# There exists a race condition in Python and it causes flakiness in MacOS
# https://github.com/python/cpython/issues/73885
Expand Down