From fdaf09c4d6780059c677b0078dc2d9b255fede7f Mon Sep 17 00:00:00 2001 From: Marc Hartmayer Date: Tue, 9 Dec 2025 19:26:59 +0000 Subject: [PATCH 1/6] mitogen/parent: Fix typo Signed-off-by: Marc Hartmayer --- mitogen/parent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mitogen/parent.py b/mitogen/parent.py index e91504ce1..9da5d58a6 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1416,7 +1416,7 @@ def __repr__(self): # r: read side of core_src FD. # w: write side of core_src FD. - # Final os.close(STDOUT_FILENO) to avoid --py-debug build corrupting stream with + # Final os.close(STDERR_FILENO) to avoid --py-debug build corrupting stream with # "[1234 refs]" during exit. @staticmethod def _first_stage(): From 2d9e90acf914fe838adf062222dba748a31b7035 Mon Sep 17 00:00:00 2001 From: Marc Hartmayer Date: Mon, 15 Dec 2025 10:00:17 +0000 Subject: [PATCH 2/6] parent_test: Refactor `wait_for_child` Signed-off-by: Marc Hartmayer --- tests/parent_test.py | 22 ++-------------------- tests/testlib.py | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/tests/parent_test.py b/tests/parent_test.py index 558a89b6c..dfacffb1e 100644 --- a/tests/parent_test.py +++ b/tests/parent_test.py @@ -2,7 +2,6 @@ import fcntl import os import signal -import sys import time import unittest @@ -21,23 +20,6 @@ from io import FileIO as file -def wait_for_child(pid, timeout=1.0): - deadline = mitogen.core.now() + timeout - while timeout < mitogen.core.now(): - try: - target_pid, status = os.waitpid(pid, os.WNOHANG) - if target_pid == pid: - return - except OSError: - e = sys.exc_info()[1] - if e.args[0] == errno.ECHILD: - return - - time.sleep(0.05) - - assert False, "wait_for_child() timed out" - - @mitogen.core.takes_econtext def call_func_in_sibling(ctx, econtext, sync_sender): recv = ctx.call_async(time.sleep, 99999) @@ -106,7 +88,7 @@ def test_connect_timeout(self): self.assertRaises(mitogen.core.TimeoutError, lambda: conn.connect(context=mitogen.core.Context(None, 1234)) ) - wait_for_child(conn.proc.pid) + testlib.wait_for_child(conn.proc.pid) e = self.assertRaises(OSError, lambda: os.kill(conn.proc.pid, 0) ) @@ -165,7 +147,7 @@ def test_context_shutdown(self): local = self.router.local() pid = local.call(os.getpid) local.shutdown(wait=True) - wait_for_child(pid) + testlib.wait_for_child(pid) self.assertRaises(OSError, lambda: os.kill(pid, 0)) diff --git a/tests/testlib.py b/tests/testlib.py index e91040006..868aaa843 100644 --- a/tests/testlib.py +++ b/tests/testlib.py @@ -86,6 +86,23 @@ mitogen.core.LOG.propagate = True +def wait_for_child(pid, timeout=1.0): + deadline = mitogen.core.now() + timeout + while timeout < mitogen.core.now(): + try: + target_pid, status = os.waitpid(pid, os.WNOHANG) + if target_pid == pid: + return + except OSError: + e = sys.exc_info()[1] + if e.args[0] == errno.ECHILD: + return + + time.sleep(0.05) + + assert False, "wait_for_child() timed out" + + def base_executable(executable=None): '''Return the path of the Python executable used to create the virtualenv. ''' From 0ab5b425d8258d27afd17655f7d6cca11550b9aa Mon Sep 17 00:00:00 2001 From: Marc Hartmayer Date: Tue, 9 Dec 2025 09:58:40 +0000 Subject: [PATCH 3/6] first_stage_test: Refactor the test Use testlib.subprocess instead of subprocess and make the test description a docstring that can be used by the test runner. Signed-off-by: Marc Hartmayer --- tests/first_stage_test.py | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/tests/first_stage_test.py b/tests/first_stage_test.py index 2576ec14d..28c60a91c 100644 --- a/tests/first_stage_test.py +++ b/tests/first_stage_test.py @@ -1,5 +1,3 @@ -import subprocess - import mitogen.core import mitogen.parent from mitogen.core import b @@ -17,28 +15,32 @@ class CommandLineTest(testlib.RouterMixin, testlib.TestCase): # * 3.x starting 2.7 def test_valid_syntax(self): - options = mitogen.parent.Options(max_message_size=123) - conn = mitogen.parent.Connection(options, self.router) - conn.context = mitogen.core.Context(None, 123) - args = conn.get_boot_command() + """Test valid syntax + + The boot command should write an ECO marker to stdout, read the + preamble from stdin, then execute it. - # The boot command should write an ECO marker to stdout, read the - # preamble from stdin, then execute it. + This test attaches /dev/zero to stdin to create a specific failure - # This test attaches /dev/zero to stdin to create a specific failure - # 1. Fork child reads bytes of NUL (`b'\0'`) - # 2. Fork child crashes (trying to decompress the junk data) - # 3. Fork child's file descriptors (write pipes) are closed by the OS - # 4. Fork parent does `dup(, )` and `exec()` - # 5. Python reads `b''` (i.e. EOF) from stdin (a closed pipe) - # 6. Python runs `''` (a valid script) and exits with success + 1. Fork child reads bytes of NUL (`b'\0'`) + 2. Fork child crashes (trying to decompress the junk data) + 3. Fork child's file descriptors (write pipes) are closed by the OS + 4. Fork parent does `dup(, )` and `exec()` + 5. Python reads `b''` (i.e. EOF) from stdin (a closed pipe) + 6. Python runs `''` (a valid script) and exits with success + """ + + options = mitogen.parent.Options(max_message_size=123) + conn = mitogen.parent.Connection(options, self.router) + conn.context = mitogen.core.Context(None, 123) fp = open("/dev/zero", "r") try: - proc = subprocess.Popen(args, + proc = testlib.subprocess.Popen( + args=conn.get_boot_command(), stdin=fp, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stdout=testlib.subprocess.PIPE, + stderr=testlib.subprocess.PIPE, ) stdout, stderr = proc.communicate() self.assertEqual(0, proc.returncode) From f7ca6af62d9f8968b696a565a77d4b1349494cdd Mon Sep 17 00:00:00 2001 From: Marc Hartmayer Date: Wed, 17 Dec 2025 08:40:51 +0000 Subject: [PATCH 4/6] first_stage_test: Open /dev/zero in binary mode Signed-off-by: Marc Hartmayer --- tests/first_stage_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/first_stage_test.py b/tests/first_stage_test.py index 28c60a91c..3f8a3f50e 100644 --- a/tests/first_stage_test.py +++ b/tests/first_stage_test.py @@ -34,7 +34,7 @@ def test_valid_syntax(self): options = mitogen.parent.Options(max_message_size=123) conn = mitogen.parent.Connection(options, self.router) conn.context = mitogen.core.Context(None, 123) - fp = open("/dev/zero", "r") + fp = open("/dev/zero", "rb") try: proc = testlib.subprocess.Popen( args=conn.get_boot_command(), From f5195edf08a9dca39db03c79f6d87231c05a718f Mon Sep 17 00:00:00 2001 From: Marc Hartmayer Date: Tue, 9 Dec 2025 09:59:51 +0000 Subject: [PATCH 5/6] first_stage_test: Add more tests + test_non_blocking_stdin + test_blocking_stdin + test_premature_eof + test_broker_connect_eof_error + test_broker_connect_timeout_because_blocking_read(self): Signed-off-by: Marc Hartmayer --- tests/first_stage_test.py | 313 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 313 insertions(+) diff --git a/tests/first_stage_test.py b/tests/first_stage_test.py index 3f8a3f50e..354f7479b 100644 --- a/tests/first_stage_test.py +++ b/tests/first_stage_test.py @@ -1,3 +1,7 @@ +import errno +import operator +import os + import mitogen.core import mitogen.parent from mitogen.core import b @@ -5,6 +9,269 @@ import testlib +def create_child_using_pipes(args, blocking, preexec_fn=None): + """ + Create a child process whose stdin/stdout/stderr is connected to a pipe. + + :param list args: + Program argument vector. + :param bool blocking: + If :data:`True`, the sockets use blocking IO, otherwise non-blocking. + :param function preexec_fn: + If not :data:`None`, a function to run within the post-fork child + before executing the target program. + :returns: + :class:`PopenProcess` instance. + """ + + parent_rfp, child_wfp = mitogen.core.pipe(blocking) + child_rfp, parent_wfp = mitogen.core.pipe(blocking) + stderr_r, stderr = mitogen.core.pipe(blocking=blocking) + mitogen.core.set_cloexec(stderr_r.fileno()) + try: + proc = testlib.subprocess.Popen( + args=args, + stdin=child_rfp, + stdout=child_wfp, + stderr=stderr, + close_fds=True, + preexec_fn=preexec_fn, + ) + except Exception: + parent_rfp.close() + parent_wfp.close() + stderr_r.close() + raise + finally: + child_rfp.close() + child_wfp.close() + stderr.close() + + return mitogen.parent.PopenProcess( + proc=proc, + stdin=parent_wfp, + stdout=parent_rfp, + stderr=stderr_r, + ) + + +def create_child_using_sockets(args, blocking, size=None, preexec_fn=None): + """ + Create a child process whose stdin/stdout is connected to a socket and stderr to a pipe. + + :param list args: + Program argument vector. + :param bool blocking: + If :data:`True`, the sockets use blocking IO, otherwise non-blocking. + :param int size: + If not :data:`None`, use the value as the socket buffer size. + :param function preexec_fn: + If not :data:`None`, a function to run within the post-fork child + before executing the target program. + :returns: + :class:`PopenProcess` instance. + """ + + parent_rw_fp, child_rw_fp = mitogen.parent.create_socketpair(size=size, blocking=blocking) + stderr_r, stderr = mitogen.core.pipe(blocking=blocking) + mitogen.core.set_cloexec(stderr_r.fileno()) + try: + proc = testlib.subprocess.Popen( + args=args, + stdin=child_rw_fp, + stdout=child_rw_fp, + stderr=stderr, + close_fds=True, + preexec_fn=preexec_fn, + ) + except Exception: + parent_rw_fp.close() + stderr_r.close() + raise + finally: + child_rw_fp.close() + stderr.close() + + return mitogen.parent.PopenProcess( + proc=proc, + stdin=parent_rw_fp, + stdout=parent_rw_fp, + stderr=stderr_r, + ) + + +class DummyConnectionBlocking(mitogen.parent.Connection): + """Dummy blocking IO connection""" + + create_child = staticmethod(create_child_using_sockets) + name_prefix = "dummy_blocking" + + #: Dictionary of extra kwargs passed to :attr:`create_child`. + #: Use a size smaller than the conn.get_preamble() size so multiple + #: read-calls are needed in the first stage. + create_child_args = {"blocking": True, "size": 4096} + + +class DummyConnectionNonBlocking(mitogen.parent.Connection): + """Dummy non-blocking IO connection""" + + create_child = staticmethod(create_child_using_sockets) + name_prefix = "dummy_non_blocking" + + #: Dictionary of extra kwargs passed to :attr:`create_child`. + #: Use a size smaller than the conn.get_preamble() size so multiple + #: read-calls are needed in the first stage. + create_child_args = {"blocking": False, "size": 4096} + + +class DummyConnectionEOFRead(mitogen.parent.Connection): + """Dummy connection that triggers an EOF-read(STDIN) in the first_stage""" + + name_prefix = "dummy_eof_read" + + #: Dictionary of extra kwargs passed to :attr:`create_child`. + create_child_args = {"blocking": True} + + @staticmethod + def create_child(*a, **kw): + proc = create_child_using_pipes(*a, **kw) + # Close the pipe -> results in an EOF-read(STDIN) in the first_stage + proc.stdin.close() + # Whatever the parent writes to the child, drop it. + proc.stdin = open("/dev/null", "wb") + return proc + + +class DummyConnectionEndlessBlockingRead(mitogen.parent.Connection): + """Dummy connection that triggers a non-returning read(STDIN) call in the + first_stage. + + """ + + name_prefix = "dummy_endless_blocking_read" + + #: Dictionary of extra kwargs passed to :attr:`create_child`. + create_child_args = {"blocking": True} + + @staticmethod + def create_child(*a, **kw): + proc = create_child_using_pipes(*a, **kw) + # Keep the pipe open by having a reference to it, otherwise it would be + # automatically closed by the garbage collector. + proc._mitogen_test_orig_stdin = proc.stdin + # Whatever the parent writes to the child, drop it -> read from STDOUT + # blocks forever in the fork child as no data could be read. + proc.stdin = open("/dev/null", "wb") + return proc + + +class ConnectionTest(testlib.RouterMixin, testlib.TestCase): + def test_non_blocking_stdin(self): + """Test that first stage works with non-blocking STDIN + + The boot command should read the preamble from STDIN, write all ECO + markers to STDOUT, and then execute the preamble. + + This test writes the complete preamble to non-blocking STDIN. + + 1. Fork child reads from non-blocking STDIN + 2. Fork child writes all data as expected by the protocol. + 3. A context call works as expected. + + """ + with testlib.LogCapturer() as _: + ctx = self.router._connect(DummyConnectionNonBlocking, connect_timeout=0.5) + self.assertEqual(3, ctx.call(operator.add, 1, 2)) + + def test_blocking_stdin(self): + """Test that first stage works with blocking STDIN + + The boot command should read the preamble from STDIN, write all ECO + markers to STDOUT, and then execute the preamble. + + This test writes the complete preamble to blocking STDIN. + + 1. Fork child reads from blocking STDIN + 2. Fork child writes all data as expected by the protocol. + 3. A context call works as expected. + + """ + with testlib.LogCapturer() as _: + ctx = self.router._connect(DummyConnectionBlocking, connect_timeout=0.5) + self.assertEqual(3, ctx.call(operator.add, 1, 2)) + + def test_broker_connect_eof_error(self): + """Test that broker takes care about EOF errors in the first stage + + The boot command should write an ECO marker to stdout, try to read the + preamble from STDIN. This read returns with an EOF and the process exits. + + This test writes closes the pipe for STDIN of the fork child to enforce an EOF read call. + 1. Fork child reads from STDIN and reads an EOF and breaks the read-loop + 2. Decompressing the received data results in an error + 3. The child process exits + 4. The streams get disconnected -> mitogen.parent.EofError is raised + + """ + + with testlib.LogCapturer() as _: + e = self.assertRaises(mitogen.parent.EofError, + self.router._connect, DummyConnectionEOFRead, connect_timeout=0.5) + self.assertIn("Error -5 while decompressing data", str(e)) + + # Test that a TimeoutError is raised by the broker and all resources + # are cleaned up. + options = mitogen.parent.Options( + old_router=self.router, + max_message_size=self.router.max_message_size, + connect_timeout=0.5, + ) + conn = DummyConnectionEOFRead(options, router=self.router) + e = self.assertRaises(mitogen.parent.EofError, + conn.connect, context=mitogen.core.Context(None, 1234)) + self.assertIn("Error -5 while decompressing data", str(e)) + # Ensure the child process is reaped if the connection times out. + testlib.wait_for_child(conn.proc.pid) + e = self.assertRaises(OSError, + os.kill, conn.proc.pid, 0) + self.assertEqual(e.args[0], errno.ESRCH) + + def test_broker_connect_timeout_because_endless_blocking_read(self): + """Test that broker takes care about connection timeouts + + The boot command should write an ECO marker to stdout, try to read the + preamble from STDIN. This read blocks forever as the parent does write + all the data to /dev/null instead of the pipe. The broker should then + raise a TimeoutError as the child needs too much time. + + This test writes no data to STDIN of the fork child to enforce a blocking read call. + 1. Fork child tries to read from STDIN, but blocks forever. + 2. Parent connection timeout timer pops up and the parent cleans up + everything from the child (e.g. kills the child process). + 3. TimeoutError is raised in the connect call + + """ + with testlib.LogCapturer() as _: + # Ensure the child process is reaped if the connection times out. + options = mitogen.parent.Options( + old_router=self.router, + max_message_size=self.router.max_message_size, + connect_timeout=0.5, + ) + + conn = DummyConnectionEndlessBlockingRead(options, router=self.router) + try: + self.assertRaises(mitogen.core.TimeoutError, + lambda: conn.connect(context=mitogen.core.Context(None, 1234)) + ) + testlib.wait_for_child(conn.proc.pid) + e = self.assertRaises(OSError, + os.kill, conn.proc.pid, 0) + self.assertEqual(e.args[0], errno.ESRCH) + finally: + conn.proc._mitogen_test_orig_stdin.close() + + class CommandLineTest(testlib.RouterMixin, testlib.TestCase): # Ensure this version of Python produces a command line that is sufficient # to bootstrap this version of Python. @@ -52,3 +319,49 @@ def test_valid_syntax(self): ) finally: fp.close() + + def test_premature_eof(self): + """The boot command should write an ECO marker to stdout, read the + preamble from stdin, then execute it. + + This test writes some data to STDIN and closes it then to create an + EOF situation. + 1. Fork child tries to read from STDIN, but stops as EOF is received. + 2. Fork child crashes (trying to decompress the junk data) + 3. Fork child's file descriptors (write pipes) are closed by the OS + 4. Fork parent does `dup(, )` and `exec()` + 5. Python reads `b''` (i.e. EOF) from stdin (a closed pipe) + 6. Python runs `''` (a valid script) and exits with success""" + + options = mitogen.parent.Options(max_message_size=123) + conn = mitogen.parent.Connection(options, self.router) + conn.context = mitogen.core.Context(None, 123) + proc = testlib.subprocess.Popen( + args=conn.get_boot_command(), + stdout=testlib.subprocess.PIPE, + stderr=testlib.subprocess.PIPE, + stdin=testlib.subprocess.PIPE, + ) + + # Do not send all of the data from the preamble + proc.stdin.write(conn.get_preamble()[:-128]) + proc.stdin.close() + try: + returncode = proc.wait(timeout=1) + except testlib.subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=3) + self.fail("First stage did not handle EOF on STDIN") + try: + self.assertEqual(0, returncode) + self.assertEqual( + proc.stdout.read(), + mitogen.parent.BootstrapProtocol.EC0_MARKER + b("\n"), + ) + self.assertIn( + b("Error -5 while decompressing data"), + proc.stderr.read(), + ) + finally: + proc.stdout.close() + proc.stderr.close() From 8807cd53beb12ff9a022a9fa355e989e31fcc871 Mon Sep 17 00:00:00 2001 From: Marc Hartmayer Date: Tue, 9 Dec 2025 09:57:48 +0000 Subject: [PATCH 6/6] mitogen: first_stage: Break the while loop in case of EOF The current implementation can cause an infinite loop, leading to a process that hangs and consumes 100% CPU. This occurs because the EOF condition is not handled properly, resulting in repeated select(...) and read(...) calls. The fix is to properly handle the EOF condition and break out of the loop when it occurs. -SSH command size: 822 +SSH command size: 838 Preamble (mitogen.core + econtext) size: 18226 (17.80KiB) -mitogen.parent 99062 96.7KiB 51235 50.0KiB 51.7% 12936 12.6KiB 13.1% +mitogen.parent 99240 96.9KiB 51244 50.0KiB 51.6% 12956 12.7KiB 13.1% Fixes: https://github.com/mitogen-hq/mitogen/issues/1348 Signed-off-by: Marc Hartmayer --- docs/changelog.rst | 1 + mitogen/parent.py | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index af756f371..6d35b7b26 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -27,6 +27,7 @@ In progress (unreleased) * :gh:issue:`1354` docs: Document Ansible 13 (ansible-core 2.20) support * :gh:issue:`1354` :mod:`mitogen`: Clarify error message when a module request would be refused by allow or deny listing +* :gh:issue:`1348` :mod:`mitogen`: Fix hanging process with 100% CPU usage v0.3.35 (2025-12-01) diff --git a/mitogen/parent.py b/mitogen/parent.py index 9da5d58a6..97681653d 100644 --- a/mitogen/parent.py +++ b/mitogen/parent.py @@ -1436,11 +1436,16 @@ def _first_stage(): os.environ['ARGV0']=sys.executable os.execl(sys.executable,sys.executable+'(mitogen:%s)'%sys.argv[2]) os.write(1,'MITO000\n'.encode()) + # Size of the compressed core source to be read + n=int(sys.argv[3]) # Read `len(compressed preamble)` bytes sent by our Mitogen parent. # `select()` handles non-blocking stdin (e.g. sudo + log_output). # `C` accumulates compressed bytes. C=''.encode() - while int(sys.argv[3])-len(C)and select.select([0],[],[]):C+=os.read(0,int(sys.argv[3])-len(C)) + # data chunk + V='V' + # Stop looping if no more data is needed or EOF is detected (empty bytes). + while n-len(C) and V:select.select([0],[],[]);V=os.read(0,n-len(C));C+=V # Raises `zlib.error` if compressed preamble is truncated or invalid C=zlib.decompress(C) f=os.fdopen(W,'wb',0)