|
11 | 11 | # =================================================================== |
12 | 12 | # ZeroMQ Communication Wrapper |
13 | 13 | # =================================================================== |
14 | | -# single shared ZMQ context for the entire process. |
15 | | -_zmq_context = zmq.Context() |
| 14 | +# lazy-initialized shared ZMQ context for the entire process. |
| 15 | +# using None until first ZMQ port is created, so file-only workflows |
| 16 | +# never spawn ZMQ I/O threads at import time. |
| 17 | +_zmq_context = None |
| 18 | + |
| 19 | +def _get_zmq_context(): |
| 20 | + """Return the process-level shared ZMQ context, creating it on first call.""" |
| 21 | + global _zmq_context |
| 22 | + if _zmq_context is None or _zmq_context.closed: |
| 23 | + _zmq_context = zmq.Context() |
| 24 | + return _zmq_context |
16 | 25 |
|
17 | 26 | class ZeroMQPort: |
18 | | - def __init__(self, port_type, address, zmq_socket_type, context): |
| 27 | + def __init__(self, port_type, address, zmq_socket_type, context=None): |
19 | 28 | """ |
20 | 29 | port_type: "bind" or "connect" |
21 | 30 | address: ZeroMQ address (e.g., "tcp://*:5555") |
22 | 31 | zmq_socket_type: zmq.REQ, zmq.REP, zmq.PUB, zmq.SUB etc. |
23 | | - context: shared zmq.Context() for the process |
| 32 | + context: optional zmq.Context() for the process; defaults to the shared _zmq_context. |
24 | 33 | """ |
| 34 | + if context is None: |
| 35 | + context = _get_zmq_context() |
25 | 36 | self.socket = context.socket(zmq_socket_type) |
26 | 37 | self.port_type = port_type # "bind" or "connect" |
27 | 38 | self.address = address |
@@ -79,7 +90,7 @@ def init_zmq_port(mod, port_name, port_type, address, socket_type_str): |
79 | 90 | try: |
80 | 91 | # Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP) |
81 | 92 | zmq_socket_type = getattr(zmq, socket_type_str.upper()) |
82 | | - mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type, _zmq_context) |
| 93 | + mod.zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type, _get_zmq_context()) |
83 | 94 | logger.info(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}") |
84 | 95 | except AttributeError: |
85 | 96 | logger.error(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.") |
@@ -108,11 +119,15 @@ def terminate_zmq(mod): |
108 | 119 | logger.error(f"Error while terminating ZMQ port {port.address}: {e}") |
109 | 120 | mod.zmq_ports.clear() |
110 | 121 |
|
111 | | - # terminate the single shared context exactly once. |
112 | | - try: |
113 | | - _zmq_context.term() |
114 | | - except Exception as e: |
115 | | - logger.error(f"Error while terminating shared ZMQ context: {e}") |
| 122 | + # terminate the single shared context exactly once, then reset so it |
| 123 | + # can be safely recreated if init_zmq_port is called again later. |
| 124 | + global _zmq_context |
| 125 | + if _zmq_context is not None and not _zmq_context.closed: |
| 126 | + try: |
| 127 | + _zmq_context.term() |
| 128 | + except Exception as e: |
| 129 | + logger.error(f"Error while terminating shared ZMQ context: {e}") |
| 130 | + _zmq_context = None |
116 | 131 |
|
117 | 132 | mod._cleanup_in_progress = False |
118 | 133 |
|
|
0 commit comments