Skip to content

Commit b092a85

Browse files
authored
Whole queue iteration is wrapped in transaction.atomic() (#10)
1 parent 03fa2fa commit b092a85

File tree

7 files changed

+67
-47
lines changed

7 files changed

+67
-47
lines changed

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
* *0.3.0* (2025-03-19)
4+
* The whole queue iteration now is wrapped in a transaction atomic
5+
36
* *0.2.0* (2025-03-17)
47
* Extend strict mode to prohibit event (!) handlers to talk to the database
58

queuebie/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Simple message queue for commands and events (CQRS)"""
22

3-
__version__ = "0.2.0"
3+
__version__ = "0.3.0"
44

55
from queuebie.registry import MessageRegistry
66

queuebie/runner.py

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,18 @@ def handle_message(messages: Message | list[Message]) -> None:
2121
# Run auto-registry
2222
message_registry.autodiscover()
2323

24-
while queue:
25-
message = queue.pop(0)
26-
if isinstance(message, Command):
27-
handler_list = message_registry.command_dict.get(message.module_path(), [])
28-
block_db_access = False
29-
else:
30-
handler_list = message_registry.event_dict.get(message.module_path(), [])
31-
block_db_access = True if get_queuebie_strict_mode() else False
24+
with transaction.atomic():
25+
while queue:
26+
message = queue.pop(0)
27+
if isinstance(message, Command):
28+
handler_list = message_registry.command_dict.get(message.module_path(), [])
29+
block_db_access = False
30+
else:
31+
handler_list = message_registry.event_dict.get(message.module_path(), [])
32+
block_db_access = True if get_queuebie_strict_mode() else False
3233

33-
new_messages = _process_message(handler_list=handler_list, message=message, block_db_access=block_db_access)
34-
queue.extend(new_messages)
34+
new_messages = _process_message(handler_list=handler_list, message=message, block_db_access=block_db_access)
35+
queue.extend(new_messages)
3536

3637

3738
def _process_message(*, handler_list: list, message: [Command, Event], block_db_access: bool) -> list[Message]:
@@ -41,24 +42,22 @@ def _process_message(*, handler_list: list, message: [Command, Event], block_db_
4142
logger = get_logger()
4243
messages = []
4344

44-
# TODO: should the whole chain be atomic and not just the handler?
45-
with transaction.atomic():
46-
for handler in handler_list:
47-
try:
48-
logger.debug(
49-
f"Handling command '{message.module_path()}' ({message.uuid}) with handler '{handler['name']}'."
50-
)
51-
module = importlib.import_module(handler["module"])
52-
handler_function = getattr(module, handler["name"])
53-
with BlockDatabaseAccess() if block_db_access else nullcontext():
54-
handler_messages = handler_function(context=message) or []
55-
handler_messages = handler_messages if isinstance(handler_messages, list) else [handler_messages]
56-
if len(handler_messages) > 0:
57-
messages.extend(handler_messages)
58-
uuid_list = [f"{m!s}" for m in handler_messages]
59-
logger.debug(f"New messages: {uuid_list!s}")
60-
except Exception as e:
61-
logger.debug(f"Exception handling command {message.module_path()}: {e!s}")
62-
raise e from e
45+
for handler in handler_list:
46+
try:
47+
logger.debug(
48+
f"Handling command '{message.module_path()}' ({message.uuid}) with handler '{handler['name']}'."
49+
)
50+
module = importlib.import_module(handler["module"])
51+
handler_function = getattr(module, handler["name"])
52+
with BlockDatabaseAccess() if block_db_access else nullcontext():
53+
handler_messages = handler_function(context=message) or []
54+
handler_messages = handler_messages if isinstance(handler_messages, list) else [handler_messages]
55+
if len(handler_messages) > 0:
56+
messages.extend(handler_messages)
57+
uuid_list = [f"{m!s}" for m in handler_messages]
58+
logger.debug(f"New messages: {uuid_list!s}")
59+
except Exception as e:
60+
logger.debug(f"Exception handling command {message.module_path()}: {e!s}")
61+
raise e from e
6362

6463
return messages

testapp/handlers/commands/testapp.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@
55
from queuebie import message_registry
66
from queuebie.logger import get_logger
77
from queuebie.messages import Event
8-
from testapp.messages.commands.my_commands import CriticalCommand, DoSomething, PersistSomething
8+
from testapp.messages.commands.my_commands import (
9+
CreateUser,
10+
CriticalCommand,
11+
DoSomething,
12+
PersistSomething,
13+
RaiseRuntimeError,
14+
)
915
from testapp.messages.events.my_events import SomethingHappened, SomethingHappenedThatWantsToBePersisted
1016

1117

@@ -29,9 +35,11 @@ def handle_critical_command(*, context: CriticalCommand) -> None:
2935
raise RuntimeError("Handler is broken.") # noqa: TRY003
3036

3137

32-
def create_user(*args, **kwargs):
33-
return User.objects.create_user(username="username")
38+
@message_registry.register_command(command=CreateUser)
39+
def create_user(context: CreateUser):
40+
User.objects.create_user(username=context.username)
3441

3542

36-
def raise_exception(*args, **kwargs):
37-
raise RuntimeError("Something is broken.") # noqa: TRY003
43+
@message_registry.register_command(command=RaiseRuntimeError)
44+
def raise_exception(context: RaiseRuntimeError):
45+
raise RuntimeError(context.error_msg)

testapp/messages/commands/my_commands.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,13 @@ class SameNameCommand(Command):
2121
@dataclass(kw_only=True)
2222
class PersistSomething(Command):
2323
any_var: str
24+
25+
26+
@dataclass(kw_only=True)
27+
class CreateUser(Command):
28+
username: str
29+
30+
31+
@dataclass(kw_only=True)
32+
class RaiseRuntimeError(Command):
33+
error_msg: str

tests/test_registry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def test_message_autodiscover_regular():
124124
message_registry.autodiscover()
125125

126126
# Assert one command registered
127-
assert len(message_registry.command_dict) == 3 # noqa: PLR2004
127+
assert len(message_registry.command_dict) == 5 # noqa: PLR2004
128128
assert DoSomething.module_path() in message_registry.command_dict.keys()
129129
assert CriticalCommand.module_path() in message_registry.command_dict.keys()
130130

tests/test_runner.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,15 @@
88
from queuebie import MessageRegistry
99
from queuebie.database_blocker import DatabaseAccessDeniedError
1010
from queuebie.exceptions import InvalidMessageTypeError
11-
from queuebie.runner import _process_message, handle_message
12-
from testapp.messages.commands.my_commands import CriticalCommand, DoSomething, PersistSomething, SameNameCommand
11+
from queuebie.runner import handle_message
12+
from testapp.messages.commands.my_commands import (
13+
CreateUser,
14+
CriticalCommand,
15+
DoSomething,
16+
PersistSomething,
17+
RaiseRuntimeError,
18+
SameNameCommand,
19+
)
1320
from testapp.messages.events.my_events import (
1421
SomethingHappened,
1522
SomethingHappenedThatWantsToBePersistedViaEvent,
@@ -125,15 +132,8 @@ def dummy_func(*args, **kwargs):
125132

126133
@pytest.mark.django_db
127134
@mock.patch("queuebie.registry.get_queuebie_strict_mode", return_value=False)
128-
def test_process_message_atomic_works(mocked_handle_command, *args):
129-
handler_list = [
130-
{"module": "testapp.handlers.commands.testapp", "name": "create_user"},
131-
{"module": "testapp.handlers.commands.testapp", "name": "raise_exception"},
132-
]
133-
134-
message = DoSomething(my_var=1)
135-
135+
def test_handle_message_atomic_works(*args):
136136
with pytest.raises(RuntimeError, match="Something is broken."):
137-
_process_message(handler_list=handler_list, message=message, block_db_access=False)
137+
handle_message([CreateUser(username="username"), RaiseRuntimeError(error_msg="Something is broken.")])
138138

139139
assert User.objects.filter(username="username").exists() is False

0 commit comments

Comments
 (0)