Skip to content

Commit 255d20b

Browse files
authored
fix: add the error to the global result queue (#315)
Signed-off-by: Vigith Maurice <vigith@gmail.com>
1 parent 7a26f15 commit 255d20b

File tree

4 files changed

+12
-4
lines changed

4 files changed

+12
-4
lines changed

packages/pynumaflow/pynumaflow/mapper/_servicer/_async_servicer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,10 @@ async def _process_inputs(
9393
# send an EOF to result queue to indicate that all tasks have completed
9494
await result_queue.put(STREAM_EOF)
9595

96-
except BaseException:
96+
except BaseException as e:
9797
_LOGGER.critical("MapFn Error, re-raising the error", exc_info=True)
98+
# Surface the error to the consumer; MapFn will handle and exit
99+
await result_queue.put(e)
98100

99101
async def _invoke_map(self, req: map_pb2.MapRequest, result_queue: NonBlockingIterator):
100102
"""

packages/pynumaflow/pynumaflow/mapper/_servicer/_sync_servicer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,10 @@ def _process_requests(
9292
self.executor.shutdown(wait=True)
9393
# Indicate to the result queue that no more messages left to process
9494
result_queue.put(STREAM_EOF)
95-
except BaseException:
95+
except BaseException as e:
9696
_LOGGER.critical("MapFn Error, re-raising the error", exc_info=True)
97+
# Surface the error to the consumer; MapFn will handle and exit
98+
result_queue.put(e)
9799

98100
def _invoke_map(
99101
self,

packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_async_servicer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ async def _process_inputs(
9696
# send an EOF to result queue to indicate that all tasks have completed
9797
await result_queue.put(STREAM_EOF)
9898

99-
except BaseException:
99+
except BaseException as e:
100100
_LOGGER.critical("SourceTransformFnError Error, re-raising the error", exc_info=True)
101+
# Surface the error to the consumer; SourceTransformFn will handle and exit
102+
await result_queue.put(e)
101103

102104
async def _invoke_transform(
103105
self, request: transform_pb2.SourceTransformRequest, result_queue: NonBlockingIterator

packages/pynumaflow/pynumaflow/sourcetransformer/servicer/_servicer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,10 @@ def _process_requests(
113113
self.executor.shutdown(wait=True)
114114
# Indicate to the result queue that no more messages left to process
115115
result_queue.put(STREAM_EOF)
116-
except BaseException:
116+
except BaseException as e:
117117
_LOGGER.critical("SourceTransformFnError, re-raising the error", exc_info=True)
118+
# Surface the error to the consumer; SourceTransformFn will handle and exit
119+
result_queue.put(e)
118120

119121
def _invoke_transformer(
120122
self, context, request: transform_pb2.SourceTransformRequest, result_queue: SyncIterator

0 commit comments

Comments
 (0)