diff --git a/gprofiler/main.py b/gprofiler/main.py index 01880e52d..aba1900b5 100644 --- a/gprofiler/main.py +++ b/gprofiler/main.py @@ -50,9 +50,9 @@ grab_gprofiler_mutex, is_root, reset_umask, - resource_path, - run_process, ) +from gprofiler.utils.fs import resource_path +from gprofiler.utils.process import run_process from gprofiler.utils.proxy import get_https_proxy logger: logging.LoggerAdapter diff --git a/gprofiler/metadata/application_identifiers.py b/gprofiler/metadata/application_identifiers.py index ed683eed2..ff873df81 100644 --- a/gprofiler/metadata/application_identifiers.py +++ b/gprofiler/metadata/application_identifiers.py @@ -16,7 +16,7 @@ from gprofiler.log import get_logger_adapter from gprofiler.metadata.enrichment import EnrichmentOptions from gprofiler.profilers.java import jattach_path -from gprofiler.utils import run_process +from gprofiler.utils.process import run_process _logger = get_logger_adapter(__name__) diff --git a/gprofiler/metadata/system_metadata.py b/gprofiler/metadata/system_metadata.py index dc1b86b0e..37e936c86 100644 --- a/gprofiler/metadata/system_metadata.py +++ b/gprofiler/metadata/system_metadata.py @@ -18,7 +18,8 @@ from granulate_utils.linux.ns import run_in_ns from gprofiler.log import get_logger_adapter -from gprofiler.utils import is_pyinstaller, run_process +from gprofiler.utils import is_pyinstaller +from gprofiler.utils.process import run_process UNKNOWN_VALUE = "unknown" diff --git a/gprofiler/metadata/versions.py b/gprofiler/metadata/versions.py index 907ea53e7..5365f3066 100644 --- a/gprofiler/metadata/versions.py +++ b/gprofiler/metadata/versions.py @@ -8,7 +8,7 @@ from granulate_utils.linux.ns import get_process_nspid, run_in_ns from psutil import Process -from gprofiler.utils import run_process +from gprofiler.utils.process import run_process def get_exe_version( diff --git a/gprofiler/profilers/dotnet.py b/gprofiler/profilers/dotnet.py index 50980c49b..f2e6cb0c0 100644 --- a/gprofiler/profilers/dotnet.py +++ b/gprofiler/profilers/dotnet.py @@ -18,9 +18,10 @@ from gprofiler.metadata.application_metadata import ApplicationMetadata from gprofiler.profilers.profiler_base import ProcessProfilerBase from gprofiler.profilers.registry import register_profiler -from gprofiler.utils import pgrep_maps, random_prefix, removed_path, resource_path, run_process -from gprofiler.utils.process import is_process_basename_matching, process_comm +from gprofiler.utils.fs import resource_path +from gprofiler.utils.process import pgrep_maps, is_process_basename_matching, process_comm, run_process from gprofiler.utils.speedscope import load_speedscope_as_collapsed +from gprofiler.utils.fs import removed_path, random_prefix logger = get_logger_adapter(__name__) diff --git a/gprofiler/profilers/java.py b/gprofiler/profilers/java.py index 91ddff163..5d821f788 100644 --- a/gprofiler/profilers/java.py +++ b/gprofiler/profilers/java.py @@ -55,17 +55,11 @@ from gprofiler.utils import ( GPROFILER_DIRECTORY_NAME, TEMPORARY_STORAGE_PATH, - pgrep_maps, - remove_path, - remove_prefix, - resource_path, - run_process, - touch_path, wait_event, ) -from gprofiler.utils.fs import is_rw_exec_dir, safe_copy +from gprofiler.utils.fs import is_rw_exec_dir, safe_copy, remove_prefix, touch_path, remove_path, resource_path from gprofiler.utils.perf import can_i_use_perf_events -from gprofiler.utils.process import is_process_basename_matching, process_comm, search_proc_maps +from gprofiler.utils.process import pgrep_maps, is_process_basename_matching, process_comm, search_proc_maps, run_process logger = get_logger_adapter(__name__) diff --git a/gprofiler/profilers/node.py b/gprofiler/profilers/node.py index b36ab3a40..c212de9e7 100644 --- a/gprofiler/profilers/node.py +++ b/gprofiler/profilers/node.py @@ -22,7 +22,9 @@ from gprofiler.log import get_logger_adapter from gprofiler.metadata.versions import get_exe_version -from gprofiler.utils import TEMPORARY_STORAGE_PATH, add_permission_dir, pgrep_exe, resource_path +from gprofiler.utils import TEMPORARY_STORAGE_PATH, add_permission_dir +from gprofiler.utils.fs import resource_path +from gprofiler.utils.process import pgrep_exe logger = get_logger_adapter(__name__) diff --git a/gprofiler/profilers/perf.py b/gprofiler/profilers/perf.py index 66831f1fa..4b97ed7d8 100644 --- a/gprofiler/profilers/perf.py +++ b/gprofiler/profilers/perf.py @@ -23,9 +23,10 @@ from gprofiler.profilers.node import clean_up_node_maps, generate_map_for_node_processes, get_node_processes from gprofiler.profilers.profiler_base import ProfilerBase from gprofiler.profilers.registry import ProfilerArgument, register_profiler -from gprofiler.utils import run_process, start_process, wait_event, wait_for_file_by_prefix +from gprofiler.utils import wait_event from gprofiler.utils.perf import perf_path -from gprofiler.utils.process import is_process_basename_matching +from gprofiler.utils.process import is_process_basename_matching, start_process, run_process +from gprofiler.utils.fs import wait_for_file_by_prefix logger = get_logger_adapter(__name__) diff --git a/gprofiler/profilers/php.py b/gprofiler/profilers/php.py index 11097f35d..ebbf67d1c 100644 --- a/gprofiler/profilers/php.py +++ b/gprofiler/profilers/php.py @@ -19,7 +19,9 @@ from gprofiler.log import get_logger_adapter from gprofiler.profilers.profiler_base import ProfilerBase from gprofiler.profilers.registry import ProfilerArgument, register_profiler -from gprofiler.utils import random_prefix, resource_path, start_process, wait_event +from gprofiler.utils import wait_event +from gprofiler.utils.process import start_process +from gprofiler.utils.fs import random_prefix, resource_path logger = get_logger_adapter(__name__) # Currently tracing only php-fpm, TODO: support mod_php in apache. diff --git a/gprofiler/profilers/python.py b/gprofiler/profilers/python.py index 7ea32d5e3..8df4ab08b 100644 --- a/gprofiler/profilers/python.py +++ b/gprofiler/profilers/python.py @@ -43,8 +43,8 @@ if not is_windows(): from gprofiler.profilers.python_ebpf import PythonEbpfProfiler, PythonEbpfError -from gprofiler.utils import pgrep_maps, random_prefix, removed_path, resource_path, run_process -from gprofiler.utils.process import is_process_basename_matching, process_comm, search_proc_maps +from gprofiler.utils.fs import removed_path, remove_prefix, resource_path +from gprofiler.utils.process import pgrep_maps, is_process_basename_matching, process_comm, search_proc_maps, run_process logger = get_logger_adapter(__name__) diff --git a/gprofiler/profilers/python_ebpf.py b/gprofiler/profilers/python_ebpf.py index 9113f3559..82fc31993 100644 --- a/gprofiler/profilers/python_ebpf.py +++ b/gprofiler/profilers/python_ebpf.py @@ -21,15 +21,9 @@ from gprofiler.metadata import application_identifiers from gprofiler.profilers import python from gprofiler.profilers.profiler_base import ProfilerBase -from gprofiler.utils import ( - poll_process, - random_prefix, - resource_path, - run_process, - start_process, - wait_event, - wait_for_file_by_prefix, -) +from gprofiler.utils import wait_event +from gprofiler.utils.process import start_process, poll_process, run_process +from gprofiler.utils.fs import wait_for_file_by_prefix, random_prefix, resource_path logger = get_logger_adapter(__name__) diff --git a/gprofiler/profilers/ruby.py b/gprofiler/profilers/ruby.py index abcb27584..79ebb1383 100644 --- a/gprofiler/profilers/ruby.py +++ b/gprofiler/profilers/ruby.py @@ -21,8 +21,8 @@ from gprofiler.metadata.application_metadata import ApplicationMetadata from gprofiler.profilers.profiler_base import SpawningProcessProfilerBase from gprofiler.profilers.registry import register_profiler -from gprofiler.utils import pgrep_maps, random_prefix, removed_path, resource_path, run_process -from gprofiler.utils.process import is_process_basename_matching, process_comm, search_proc_maps +from gprofiler.utils.fs import removed_path, random_prefix, resource_path +from gprofiler.utils.process import pgrep_maps, is_process_basename_matching, process_comm, search_proc_maps, run_process logger = get_logger_adapter(__name__) diff --git a/gprofiler/utils/__init__.py b/gprofiler/utils/__init__.py index c43bad0a5..b13f0dc1d 100644 --- a/gprofiler/utils/__init__.py +++ b/gprofiler/utils/__init__.py @@ -16,15 +16,13 @@ import subprocess import sys import time -from contextlib import contextmanager from functools import lru_cache from pathlib import Path from subprocess import CompletedProcess, Popen, TimeoutExpired from tempfile import TemporaryDirectory from threading import Event -from typing import Any, Callable, Iterator, List, Optional, Tuple, Union, cast +from typing import Any, Callable, Iterator, List, Optional, Union, cast -import importlib_resources import psutil from granulate_utils.exceptions import CouldNotAcquireMutex from granulate_utils.linux.mutex import try_acquire_mutex @@ -49,17 +47,6 @@ gprofiler_mutex: Optional[socket.socket] = None -@lru_cache(maxsize=None) -def resource_path(relative_path: str = "") -> str: - *relative_directory, basename = relative_path.split("/") - package = ".".join(["gprofiler", "resources"] + relative_directory) - try: - with importlib_resources.path(package, basename) as path: - return str(path) - except ImportError as e: - raise Exception(f"Resource {relative_path!r} not found!") from e - - @lru_cache(maxsize=None) def is_root() -> bool: return os.geteuid() == 0 @@ -101,47 +88,6 @@ def wrapper() -> Any: return wrapper -def start_process( - cmd: Union[str, List[str]], via_staticx: bool, term_on_parent_death: bool = True, **kwargs: Any -) -> Popen: - cmd_text = " ".join(cmd) if isinstance(cmd, list) else cmd - logger.debug(f"Running command: ({cmd_text})") - if isinstance(cmd, str): - cmd = [cmd] - - env = kwargs.pop("env", None) - staticx_dir = get_staticx_dir() - # are we running under staticx? - if staticx_dir is not None: - # if so, if "via_staticx" was requested, then run the binary with the staticx ld.so - # because it's supposed to be run with it. - if via_staticx: - # staticx_dir (from STATICX_BUNDLE_DIR) is where staticx has extracted all of the - # libraries it had collected earlier. - # see https://github.com/JonathonReinhart/staticx#run-time-information - cmd = [f"{staticx_dir}/.staticx.interp", "--library-path", staticx_dir] + cmd - else: - # explicitly remove our directory from LD_LIBRARY_PATH - env = env if env is not None else os.environ.copy() - env.update({"LD_LIBRARY_PATH": ""}) - - cur_preexec_fn = kwargs.pop("preexec_fn", os.setpgrp) - - if term_on_parent_death: - cur_preexec_fn = wrap_callbacks([set_child_termination_on_parent_death, cur_preexec_fn]) - - popen = Popen( - cmd, - stdout=kwargs.pop("stdout", subprocess.PIPE), - stderr=kwargs.pop("stderr", subprocess.PIPE), - stdin=subprocess.PIPE, - preexec_fn=cur_preexec_fn, - env=env, - **kwargs, - ) - return popen - - def wait_event(timeout: float, stop_event: Event, condition: Callable[[], bool], interval: float = 0.1) -> None: end_time = time.monotonic() + timeout while True: @@ -155,172 +101,6 @@ def wait_event(timeout: float, stop_event: Event, condition: Callable[[], bool], raise TimeoutError() -def poll_process(process: Popen, timeout: float, stop_event: Event) -> None: - try: - wait_event(timeout, stop_event, lambda: process.poll() is not None) - except StopEventSetException: - process.kill() - raise - - -def wait_for_file_by_prefix(prefix: str, timeout: float, stop_event: Event) -> Path: - glob_pattern = f"{prefix}*" - wait_event(timeout, stop_event, lambda: len(glob.glob(glob_pattern)) > 0) - - output_files = glob.glob(glob_pattern) - # All the snapshot samples should be in one file - if len(output_files) != 1: - # this can happen if: - # * the profiler generating those files is erroneous - # * the profiler received many signals (and it generated files based on signals) - # * errors in gProfiler led to previous output fails remain not removed - # in any case, we remove all old files, and assume the last one (after sorting by timestamp) - # is the one we want. - logger.warning( - f"One output file expected, but found {len(output_files)}." - f" Removing all and using the last one. {output_files}" - ) - # timestamp format guarantees alphabetical order == chronological order. - output_files.sort() - for f in output_files[:-1]: - os.unlink(f) - output_files = output_files[-1:] - - return Path(output_files[0]) - - -def _reap_process(process: Popen, kill_signal: signal.Signals) -> Tuple[int, str, str]: - # kill the process and read its output so far - process.send_signal(kill_signal) - process.wait() - logger.debug(f"({process.args!r}) was killed by us with signal {kill_signal} due to timeout or stop request") - stdout, stderr = process.communicate() - returncode = process.poll() - assert returncode is not None # only None if child has not terminated - return returncode, stdout, stderr - - -def run_process( - cmd: Union[str, List[str]], - stop_event: Event = None, - suppress_log: bool = False, - via_staticx: bool = False, - check: bool = True, - timeout: int = None, - kill_signal: signal.Signals = signal.SIGKILL, - communicate: bool = True, - stdin: bytes = None, - **kwargs: Any, -) -> "CompletedProcess[bytes]": - stdout = None - stderr = None - reraise_exc: Optional[BaseException] = None - with start_process(cmd, via_staticx, **kwargs) as process: - try: - communicate_kwargs = dict(input=stdin) if stdin is not None else {} - if stop_event is None: - assert timeout is None, f"expected no timeout, got {timeout!r}" - if communicate: - # wait for stderr & stdout to be closed - stdout, stderr = process.communicate(timeout=timeout, **communicate_kwargs) - else: - # just wait for the process to exit - process.wait() - else: - end_time = (time.monotonic() + timeout) if timeout is not None else None - while True: - try: - if communicate: - stdout, stderr = process.communicate(timeout=1, **communicate_kwargs) - else: - process.wait(timeout=1) - break - except TimeoutExpired: - if stop_event.is_set(): - raise ProcessStoppedException from None - if end_time is not None and time.monotonic() > end_time: - assert timeout is not None - raise - except TimeoutExpired: - returncode, stdout, stderr = _reap_process(process, kill_signal) - assert timeout is not None - reraise_exc = CalledProcessTimeoutError(timeout, returncode, cmd, stdout, stderr) - except BaseException as e: # noqa - returncode, stdout, stderr = _reap_process(process, kill_signal) - reraise_exc = e - retcode = process.poll() - assert retcode is not None # only None if child has not terminated - - result: CompletedProcess[bytes] = CompletedProcess(process.args, retcode, stdout, stderr) - - logger.debug(f"({process.args!r}) exit code: {result.returncode}") - if not suppress_log: - if result.stdout: - logger.debug(f"({process.args!r}) stdout: {result.stdout.decode()!r}") - if result.stderr: - logger.debug(f"({process.args!r}) stderr: {result.stderr.decode()!r}") - if reraise_exc is not None: - raise reraise_exc - elif check and retcode != 0: - raise CalledProcessError(retcode, process.args, output=stdout, stderr=stderr) - return result - - -def pgrep_exe(match: str) -> List[Process]: - pattern = re.compile(match) - procs = [] - for process in psutil.process_iter(): - try: - # kernel threads should be child of process with pid 2 - if process.pid != 2 and process.ppid() != 2 and pattern.match(process_exe(process)): - procs.append(process) - except psutil.NoSuchProcess: # process might have died meanwhile - continue - return procs - - -def pgrep_maps(match: str) -> List[Process]: - # this is much faster than iterating over processes' maps with psutil. - result = run_process( - f"grep -lP '{match}' /proc/*/maps", - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - suppress_log=True, - check=False, - ) - # 0 - found - # 1 - not found - # 2 - error (which we might get for a missing /proc/pid/maps file of a process which just exited) - # so this ensures grep wasn't killed by a signal - assert result.returncode in ( - 0, - 1, - 2, - ), f"unexpected 'grep' exit code: {result.returncode}, stdout {result.stdout!r} stderr {result.stderr!r}" - - error_lines = [] - for line in result.stderr.splitlines(): - if not ( - line.startswith(b"grep: /proc/") - and (line.endswith(b"/maps: No such file or directory") or line.endswith(b"/maps: No such process")) - ): - error_lines.append(line) - if error_lines: - logger.error(f"Unexpected 'grep' error output (first 10 lines): {error_lines[:10]}") - - processes: List[Process] = [] - for line in result.stdout.splitlines(): - assert line.startswith(b"/proc/") and line.endswith(b"/maps"), f"unexpected 'grep' line: {line!r}" - pid = int(line[len(b"/proc/") : -len(b"/maps")]) - try: - processes.append(Process(pid)) - except psutil.NoSuchProcess: - continue # process might have died meanwhile - - return processes - - def get_iso8601_format_time_from_epoch_time(time: float) -> str: return get_iso8601_format_time(datetime.datetime.utcfromtimestamp(time)) @@ -329,35 +109,6 @@ def get_iso8601_format_time(time: datetime.datetime) -> str: return time.replace(microsecond=0).isoformat() -def remove_prefix(s: str, prefix: str) -> str: - # like str.removeprefix of Python 3.9, but this also ensures the prefix exists. - assert s.startswith(prefix), f"{s} doesn't start with {prefix}" - return s[len(prefix) :] - - -def touch_path(path: str, mode: int) -> None: - Path(path).touch() - # chmod() afterwards (can't use 'mode' in touch(), because it's affected by umask) - os.chmod(path, mode) - - -def remove_path(path: Union[str, Path], missing_ok: bool = False) -> None: - # backporting missing_ok, available only from 3.8 - try: - Path(path).unlink() - except FileNotFoundError: - if not missing_ok: - raise - - -@contextmanager -def removed_path(path: str) -> Iterator[None]: - try: - yield - finally: - remove_path(path, missing_ok=True) - - _INSTALLED_PROGRAMS_CACHE: List[str] = [] @@ -433,10 +184,6 @@ def limit_frequency( return requested -def random_prefix() -> str: - return "".join(random.choice(string.ascii_letters) for _ in range(16)) - - PERF_EVENT_MLOCK_KB = "/proc/sys/kernel/perf_event_mlock_kb" diff --git a/gprofiler/utils/fs.py b/gprofiler/utils/fs.py index fe8d8db40..0d6f01a9b 100644 --- a/gprofiler/utils/fs.py +++ b/gprofiler/utils/fs.py @@ -8,8 +8,39 @@ import shutil from pathlib import Path from secrets import token_hex +from threading import Event +from typing import Union, Iterator +from contextlib import contextmanager +from gprofiler.utils.process import run_process +from functools import lru_cache +import importlib_resources -from gprofiler.utils import remove_path, run_process + +@lru_cache(maxsize=None) +def resource_path(relative_path: str = "") -> str: + *relative_directory, basename = relative_path.split("/") + package = ".".join(["gprofiler", "resources"] + relative_directory) + try: + with importlib_resources.path(package, basename) as path: + return str(path) + except ImportError as e: + raise Exception(f"Resource {relative_path!r} not found!") from e + +def remove_path(path: Union[str, Path], missing_ok: bool = False) -> None: + # backporting missing_ok, available only from 3.8 + try: + Path(path).unlink() + except FileNotFoundError: + if not missing_ok: + raise + + +@contextmanager +def removed_path(path: str) -> Iterator[None]: + try: + yield + finally: + remove_path(path, missing_ok=True) def safe_copy(src: str, dst: str) -> None: @@ -51,3 +82,45 @@ def is_rw_exec_dir(path: str) -> bool: test_script.unlink() return True + + +def wait_for_file_by_prefix(prefix: str, timeout: float, stop_event: Event) -> Path: + glob_pattern = f"{prefix}*" + wait_event(timeout, stop_event, lambda: len(glob.glob(glob_pattern)) > 0) + + output_files = glob.glob(glob_pattern) + # All the snapshot samples should be in one file + if len(output_files) != 1: + # this can happen if: + # * the profiler generating those files is erroneous + # * the profiler received many signals (and it generated files based on signals) + # * errors in gProfiler led to previous output fails remain not removed + # in any case, we remove all old files, and assume the last one (after sorting by timestamp) + # is the one we want. + logger.warning( + f"One output file expected, but found {len(output_files)}." + f" Removing all and using the last one. {output_files}" + ) + # timestamp format guarantees alphabetical order == chronological order. + output_files.sort() + for f in output_files[:-1]: + os.unlink(f) + output_files = output_files[-1:] + + return Path(output_files[0]) + + +def remove_prefix(s: str, prefix: str) -> str: + # like str.removeprefix of Python 3.9, but this also ensures the prefix exists. + assert s.startswith(prefix), f"{s} doesn't start with {prefix}" + return s[len(prefix) :] + + +def touch_path(path: str, mode: int) -> None: + Path(path).touch() + # chmod() afterwards (can't use 'mode' in touch(), because it's affected by umask) + os.chmod(path, mode) + + +def random_prefix() -> str: + return "".join(random.choice(string.ascii_letters) for _ in range(16)) diff --git a/gprofiler/utils/perf.py b/gprofiler/utils/perf.py index 7bd1ba9e3..08380e904 100644 --- a/gprofiler/utils/perf.py +++ b/gprofiler/utils/perf.py @@ -5,7 +5,8 @@ from gprofiler.exceptions import CalledProcessError from gprofiler.log import get_logger_adapter -from gprofiler.utils import resource_path, run_process +from gprofiler.utils.fs import resource_path +from gprofiler.utils.process import run_process logger = get_logger_adapter(__name__) diff --git a/gprofiler/utils/process.py b/gprofiler/utils/process.py index d80cbdd64..060fe8596 100644 --- a/gprofiler/utils/process.py +++ b/gprofiler/utils/process.py @@ -5,11 +5,15 @@ import os import re +import signal from functools import lru_cache -from typing import Match, Optional +from typing import Any, Match, Optional, Union, List, Tuple +from subprocess import CompletedProcess, Popen, TimeoutExpired +from threading import Event from granulate_utils.linux.process import process_exe, read_proc_file from psutil import Process +from gprofiler.utils import get_staticx_dir def search_proc_maps(process: Process, pattern: str) -> Optional[Match[str]]: @@ -34,3 +38,180 @@ def is_process_basename_matching(process: Process, basename_pattern: str) -> boo return True return False + +def start_process( + cmd: Union[str, List[str]], via_staticx: bool, term_on_parent_death: bool = True, **kwargs: Any +) -> Popen: + cmd_text = " ".join(cmd) if isinstance(cmd, list) else cmd + logger.debug(f"Running command: ({cmd_text})") + if isinstance(cmd, str): + cmd = [cmd] + + env = kwargs.pop("env", None) + staticx_dir = get_staticx_dir() + # are we running under staticx? + if staticx_dir is not None: + # if so, if "via_staticx" was requested, then run the binary with the staticx ld.so + # because it's supposed to be run with it. + if via_staticx: + # staticx_dir (from STATICX_BUNDLE_DIR) is where staticx has extracted all of the + # libraries it had collected earlier. + # see https://github.com/JonathonReinhart/staticx#run-time-information + cmd = [f"{staticx_dir}/.staticx.interp", "--library-path", staticx_dir] + cmd + else: + # explicitly remove our directory from LD_LIBRARY_PATH + env = env if env is not None else os.environ.copy() + env.update({"LD_LIBRARY_PATH": ""}) + + cur_preexec_fn = kwargs.pop("preexec_fn", os.setpgrp) + + if term_on_parent_death: + cur_preexec_fn = wrap_callbacks([set_child_termination_on_parent_death, cur_preexec_fn]) + + popen = Popen( + cmd, + stdout=kwargs.pop("stdout", subprocess.PIPE), + stderr=kwargs.pop("stderr", subprocess.PIPE), + stdin=subprocess.PIPE, + preexec_fn=cur_preexec_fn, + env=env, + **kwargs, + ) + return popen + +def poll_process(process: Popen, timeout: float, stop_event: Event) -> None: + try: + wait_event(timeout, stop_event, lambda: process.poll() is not None) + except StopEventSetException: + process.kill() + raise + +def _reap_process(process: Popen, kill_signal: signal.Signals) -> Tuple[int, str, str]: + # kill the process and read its output so far + process.send_signal(kill_signal) + process.wait() + logger.debug(f"({process.args!r}) was killed by us with signal {kill_signal} due to timeout or stop request") + stdout, stderr = process.communicate() + returncode = process.poll() + assert returncode is not None # only None if child has not terminated + return returncode, stdout, stderr + +def run_process( + cmd: Union[str, List[str]], + stop_event: Event = None, + suppress_log: bool = False, + via_staticx: bool = False, + check: bool = True, + timeout: int = None, + kill_signal: signal.Signals = signal.SIGKILL, + communicate: bool = True, + stdin: bytes = None, + **kwargs: Any, +) -> "CompletedProcess[bytes]": + stdout = None + stderr = None + reraise_exc: Optional[BaseException] = None + with start_process(cmd, via_staticx, **kwargs) as process: + try: + communicate_kwargs = dict(input=stdin) if stdin is not None else {} + if stop_event is None: + assert timeout is None, f"expected no timeout, got {timeout!r}" + if communicate: + # wait for stderr & stdout to be closed + stdout, stderr = process.communicate(timeout=timeout, **communicate_kwargs) + else: + # just wait for the process to exit + process.wait() + else: + end_time = (time.monotonic() + timeout) if timeout is not None else None + while True: + try: + if communicate: + stdout, stderr = process.communicate(timeout=1, **communicate_kwargs) + else: + process.wait(timeout=1) + break + except TimeoutExpired: + if stop_event.is_set(): + raise ProcessStoppedException from None + if end_time is not None and time.monotonic() > end_time: + assert timeout is not None + raise + except TimeoutExpired: + returncode, stdout, stderr = _reap_process(process, kill_signal) + assert timeout is not None + reraise_exc = CalledProcessTimeoutError(timeout, returncode, cmd, stdout, stderr) + except BaseException as e: # noqa + returncode, stdout, stderr = _reap_process(process, kill_signal) + reraise_exc = e + retcode = process.poll() + assert retcode is not None # only None if child has not terminated + + result: CompletedProcess[bytes] = CompletedProcess(process.args, retcode, stdout, stderr) + + logger.debug(f"({process.args!r}) exit code: {result.returncode}") + if not suppress_log: + if result.stdout: + logger.debug(f"({process.args!r}) stdout: {result.stdout.decode()!r}") + if result.stderr: + logger.debug(f"({process.args!r}) stderr: {result.stderr.decode()!r}") + if reraise_exc is not None: + raise reraise_exc + elif check and retcode != 0: + raise CalledProcessError(retcode, process.args, output=stdout, stderr=stderr) + return result + + +def pgrep_exe(match: str) -> List[Process]: + pattern = re.compile(match) + procs = [] + for process in psutil.process_iter(): + try: + # kernel threads should be child of process with pid 2 + if process.pid != 2 and process.ppid() != 2 and pattern.match(process_exe(process)): + procs.append(process) + except psutil.NoSuchProcess: # process might have died meanwhile + continue + return procs + + +def pgrep_maps(match: str) -> List[Process]: + # this is much faster than iterating over processes' maps with psutil. + result = run_process( + f"grep -lP '{match}' /proc/*/maps", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + suppress_log=True, + check=False, + ) + # 0 - found + # 1 - not found + # 2 - error (which we might get for a missing /proc/pid/maps file of a process which just exited) + # so this ensures grep wasn't killed by a signal + assert result.returncode in ( + 0, + 1, + 2, + ), f"unexpected 'grep' exit code: {result.returncode}, stdout {result.stdout!r} stderr {result.stderr!r}" + + error_lines = [] + for line in result.stderr.splitlines(): + if not ( + line.startswith(b"grep: /proc/") + and (line.endswith(b"/maps: No such file or directory") or line.endswith(b"/maps: No such process")) + ): + error_lines.append(line) + if error_lines: + logger.error(f"Unexpected 'grep' error output (first 10 lines): {error_lines[:10]}") + + processes: List[Process] = [] + for line in result.stdout.splitlines(): + assert line.startswith(b"/proc/") and line.endswith(b"/maps"), f"unexpected 'grep' line: {line!r}" + pid = int(line[len(b"/proc/") : -len(b"/maps")]) + try: + processes.append(Process(pid)) + except psutil.NoSuchProcess: + continue # process might have died meanwhile + + return processes \ No newline at end of file diff --git a/tests/test_java.py b/tests/test_java.py index 31a8eb144..51a8d536b 100644 --- a/tests/test_java.py +++ b/tests/test_java.py @@ -35,7 +35,7 @@ frequency_to_ap_interval, get_java_version, ) -from gprofiler.utils import remove_prefix +from gprofiler.utils.fs import remove_prefix from tests.conftest import AssertInCollapsed from tests.type_utils import cast_away_optional from tests.utils import ( diff --git a/tests/utils.py b/tests/utils.py index b6ac605d0..c98ec01a6 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -25,7 +25,7 @@ JavaProfiler, ) from gprofiler.profilers.profiler_base import ProfilerInterface -from gprofiler.utils import remove_path, wait_event +from gprofiler.utils.fs import remove_path, wait_event RUNTIME_PROFILERS = [ ("java", "ap"),