Skip to content

Commit 1f1970c

Browse files
committed
WIP
1 parent 63c88b6 commit 1f1970c

File tree

7 files changed

+85
-36
lines changed

7 files changed

+85
-36
lines changed

benchmark.py

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
# pylint: disable=C0103
2-
32
"""
43
Benchmark runner for logprep (logprep-ng and non-ng).
54
@@ -286,6 +285,33 @@ def opensearch_count_processed(opensearch_url: str, processed_index: str) -> int
286285
return int(resp.json()["count"])
287286

288287

288+
def opensearch_debug_snapshot(opensearch_url: str) -> None:
289+
"""
290+
Print a small OpenSearch state snapshot for debugging.
291+
Never raises (best-effort).
292+
"""
293+
try:
294+
r = requests.get(f"{opensearch_url}/_cat/indices?v", timeout=10)
295+
print("\n--- _cat/indices ---")
296+
print(r.text)
297+
except Exception as e:
298+
print(f"\n--- _cat/indices (failed) ---\n{e}")
299+
300+
try:
301+
r = requests.get(f"{opensearch_url}/_cat/count?v", timeout=10)
302+
print("\n--- _cat/count ---")
303+
print(r.text)
304+
except Exception as e:
305+
print(f"\n--- _cat/count (failed) ---\n{e}")
306+
307+
try:
308+
r = requests.get(f"{opensearch_url}/_cat/aliases?v", timeout=10)
309+
print("\n--- _cat/aliases ---")
310+
print(r.text)
311+
except Exception as e:
312+
print(f"\n--- _cat/aliases (failed) ---\n{e}")
313+
314+
289315
def reset_prometheus_dir(path: str) -> None:
290316
"""
291317
Recreate PROMETHEUS_MULTIPROC_DIR.
@@ -596,6 +622,9 @@ def benchmark_run(
596622

597623
time.sleep(sleep_after_logprep_start_s)
598624

625+
print("\n=== OpenSearch snapshot (before measurement) ===")
626+
opensearch_debug_snapshot(opensearch_url)
627+
599628
baseline = opensearch_count_processed(opensearch_url, processed_index)
600629
startup_s = time.time() - t_startup
601630

@@ -610,27 +639,10 @@ def benchmark_run(
610639
# ensure near-real-time writes are visible to _count before measuring
611640
opensearch_refresh(opensearch_url, processed_index)
612641

613-
after = opensearch_count_processed(opensearch_url, processed_index)
614-
615-
def opensearch_debug_snapshot(opensearch_url: str) -> None:
616-
# welche Indizes existieren überhaupt?
617-
r = requests.get(f"{opensearch_url}/_cat/indices?v", timeout=10)
618-
print("\n--- _cat/indices ---")
619-
print(r.text)
620-
621-
# wie viele docs pro index? (sehr schnell, super aufschlussreich)
622-
r = requests.get(f"{opensearch_url}/_cat/count?v", timeout=10)
623-
print("\n--- _cat/count ---")
624-
print(r.text)
625-
626-
# optional: aliases / data streams
627-
r = requests.get(f"{opensearch_url}/_cat/aliases?v", timeout=10)
628-
print("\n--- _cat/aliases ---")
629-
print(r.text)
630-
631-
# im benchmark_run nach dem kill + refresh:
642+
print("\n=== OpenSearch snapshot (after run / after refresh) ===")
632643
opensearch_debug_snapshot(opensearch_url)
633644

645+
after = opensearch_count_processed(opensearch_url, processed_index)
634646
processed = max(0, after - baseline)
635647

636648
return RunResult(

logprep/ng/abc/input.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ def acknowledge(self) -> None:
317317
self.event_backlog.unregister(state_type=EventStateType.ACKED)
318318

319319
for event in self.event_backlog.get(state_type=EventStateType.DELIVERED):
320-
event.state.next_state()
320+
event.state.current_state = EventStateType.ACKED
321321

322322
@property
323323
def _add_hmac(self) -> bool:
@@ -488,7 +488,7 @@ async def get_next(self, timeout: float) -> LogEvent | None:
488488
)
489489

490490
self.event_backlog.register(events=[log_event])
491-
log_event.state.next_state()
491+
log_event.state.current_state = EventStateType.RECEIVED
492492

493493
return log_event
494494

logprep/ng/connector/confluent_kafka/output.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from logprep.metrics.metrics import GaugeMetric, Metric
3737
from logprep.ng.abc.event import Event
3838
from logprep.ng.abc.output import FatalOutputError, Output
39+
from logprep.ng.event.event_state import EventStateType
3940
from logprep.util.validators import keys_in_validator
4041

4142
DEFAULTS = {
@@ -281,7 +282,7 @@ def describe(self) -> str:
281282
f"{self.config.kafka_config.get('bootstrap.servers')}"
282283
)
283284

284-
def store(self, event: Event) -> None:
285+
def store(self, event: Event) -> None: # type: ignore # TODO: fix mypy issue
285286
"""Store a document in the producer topic.
286287
287288
Parameters
@@ -303,7 +304,8 @@ def store_custom(self, event: Event, target: str) -> None:
303304
target : str
304305
Topic to store event data in.
305306
"""
306-
event.state.next_state()
307+
event.state.current_state = EventStateType.STORING_IN_OUTPUT
308+
307309
document = event.data
308310
self.metrics.number_of_processed_events += 1
309311
try:
@@ -358,12 +360,12 @@ def setup(self) -> None:
358360
def on_delivery(self, event: Event, err: KafkaException, msg: Message) -> None:
359361
"""Callback for delivery reports."""
360362
if err is not None:
361-
event.state.next_state(success=False)
363+
event.state.current_state = EventStateType.FAILED
362364
event.errors.append(err)
363365
logger.error("Message delivery failed: %s", err)
364366
self.metrics.number_of_errors += 1
365367
return
366-
event.state.next_state(success=True)
368+
event.state.current_state = EventStateType.DELIVERED
367369
logger.debug(
368370
"Message delivered to '%s' partition %s, offset %s",
369371
msg.topic(),

logprep/ng/connector/opensearch/output.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
from logprep.metrics.metrics import Metric
5252
from logprep.ng.abc.event import Event
5353
from logprep.ng.abc.output import Output
54+
from logprep.ng.event.event_state import EventStateType
5455

5556
logger = logging.getLogger("OpenSearchOutput")
5657

@@ -281,7 +282,7 @@ async def store_custom(self, event: Event, target: str) -> None:
281282
target : str
282283
Index to store the document in.
283284
"""
284-
event.state.next_state()
285+
event.state.current_state = EventStateType.STORING_IN_OUTPUT
285286
document = event.data
286287
document["_index"] = target
287288
document["_op_type"] = document.get("_op_type", self.config.default_op_type)
@@ -396,7 +397,7 @@ async def _bulk(self, client: AsyncOpenSearch, events: list["Event"]) -> None:
396397
except OpenSearchException as e:
397398
# whole bulk request failed → mark all events failed
398399
for ev in batch:
399-
ev.state.next_state(success=False)
400+
ev.state.current_state = EventStateType.FAILED
400401
ev.errors.append(BulkError("Bulk request failed", exception=str(e)))
401402
continue
402403

@@ -417,7 +418,7 @@ async def _bulk(self, client: AsyncOpenSearch, events: list["Event"]) -> None:
417418

418419
ok = isinstance(status, int) and 200 <= status < 300 and not error_obj
419420
if ok:
420-
ev.state.next_state(success=True)
421+
ev.state.current_state = EventStateType.STORED_IN_OUTPUT
421422
continue
422423

423424
# normalize error into your BulkError shape
@@ -427,7 +428,7 @@ async def _bulk(self, client: AsyncOpenSearch, events: list["Event"]) -> None:
427428
else:
428429
message = str(error_obj) if error_obj else "Failed to index document"
429430

430-
ev.state.next_state(success=False)
431+
ev.state.current_state = EventStateType.FAILED
431432
ev.errors.append(
432433
BulkError(
433434
message,

logprep/ng/event/event_state.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,26 @@ class EventStateType(StrEnum):
1919
PROCESSED = "processed"
2020
"""The event has been processed by all pipeline processors."""
2121

22+
STORING_IN_OUTPUT = "storing_in_output"
23+
"""The event is storing in the output connector."""
24+
2225
STORED_IN_OUTPUT = "stored_in_output"
2326
"""The event was successfully stored in the output connector."""
2427

2528
FAILED = "failed"
2629
"""The event failed during processing or output storage."""
2730

31+
STORING_IN_ERROR = "storing_in_error"
32+
"""The event is storing in the error output (e.g. error queue or
33+
fallback output)."""
34+
2835
STORED_IN_ERROR = "stored_in_error"
2936
"""The event was stored in the error output (e.g. error queue or
3037
fallback output)."""
3138

39+
DELIVERING = "delivering"
40+
"""The event is delivering to the target system or final destination."""
41+
3242
DELIVERED = "delivered"
3343
"""The event was delivered to the target system or final destination."""
3444

logprep/ng/pipeline.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44

55
from logprep.ng.abc.processor import Processor
6+
from logprep.ng.event.event_state import EventStateType
67
from logprep.ng.event.log_event import LogEvent
78

89
logger = logging.getLogger("Pipeline")
@@ -12,15 +13,15 @@ def _process_event(event: LogEvent | None, processors: list[Processor]) -> LogEv
1213
"""process all processors for one event"""
1314
if event is None or not event.data:
1415
raise ValueError("no event given")
15-
event.state.next_state()
16+
event.state.current_state = EventStateType.PROCESSING
1617
for processor in processors:
1718
if not event.data:
1819
break
1920
processor.process(event)
2021
if not event.errors:
21-
event.state.next_state(success=True)
22+
event.state.current_state = EventStateType.PROCESSED
2223
else:
23-
event.state.next_state(success=False)
24+
event.state.current_state = EventStateType.FAILED
2425
logger.error("event failed: %s with errors: %s", event, event.errors)
2526
return event
2627

logprep/ng/sender.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,34 @@ async def _send_and_flush_processed_events(self, batch_events: list[LogEvent]) -
6868
if not processed:
6969
return
7070

71-
# send in parallel (minimal change vs. serial list comprehension)
72-
await asyncio.gather(*(self._send_processed(event) for event in processed))
71+
# send in parallel
72+
try:
73+
results = await asyncio.gather(
74+
*(self._send_processed(event) for event in processed),
75+
return_exceptions=True,
76+
)
77+
for r in results:
78+
if isinstance(r, Exception):
79+
logger.exception("Error while sending processed event", exc_info=r)
80+
81+
finally:
82+
for output in self._outputs.values():
83+
try:
84+
await output.flush()
85+
except Exception as e:
86+
logger.exception("Error while flushing output %s", output.name, exc_info=e)
7387

7488
# flush once per output after sending
75-
await asyncio.gather(*(output.flush() for output in self._outputs.values()))
89+
try:
90+
results = await asyncio.gather(
91+
*(output.flush() for output in self._outputs.values()),
92+
return_exceptions=True,
93+
)
94+
for r in results:
95+
if isinstance(r, Exception):
96+
logger.exception("Error during final output flush", exc_info=r)
97+
except Exception as e:
98+
logger.exception("Unexpected error during final output flush", exc_info=e)
7699

77100
async def _send_extra_data(self, event: LogEvent) -> None:
78101
extra_data_events = typing.cast(list[ExtraDataEvent], event.extra_data)

0 commit comments

Comments
 (0)