Skip to content

Commit a8a073b

Browse files
authored
feat: Add query_step to ContextCommandPipeline
1 parent 08ac394 commit a8a073b

File tree

4 files changed

+235
-198
lines changed

4 files changed

+235
-198
lines changed

cq/_core/pipetools.py

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1+
from typing import TYPE_CHECKING, Any, Callable, overload
2+
13
import injection
24

5+
from cq import Dispatcher
36
from cq._core.dispatcher.lazy import LazyDispatcher
4-
from cq._core.dispatcher.pipe import ContextPipeline
5-
from cq._core.message import AnyCommandBus, Command
7+
from cq._core.dispatcher.pipe import ContextPipeline, PipeConverterMethod
8+
from cq._core.message import AnyCommandBus, Command, Query, QueryBus
69
from cq._core.scope import CQScope
710
from cq.middlewares.scope import InjectionScopeMiddleware
811

912

1013
class ContextCommandPipeline[I: Command](ContextPipeline[I]):
11-
__slots__ = ()
14+
__slots__ = ("__query_dispatcher",)
15+
16+
__query_dispatcher: Dispatcher[Query, Any]
1217

1318
def __init__(
1419
self,
@@ -17,15 +22,45 @@ def __init__(
1722
injection_module: injection.Module | None = None,
1823
threadsafe: bool | None = None,
1924
) -> None:
20-
dispatcher = LazyDispatcher(
25+
command_dispatcher = LazyDispatcher(
2126
AnyCommandBus,
2227
injection_module=injection_module,
2328
threadsafe=threadsafe,
2429
)
25-
super().__init__(dispatcher)
30+
super().__init__(command_dispatcher)
31+
32+
self.__query_dispatcher = LazyDispatcher(
33+
QueryBus,
34+
injection_module=injection_module,
35+
threadsafe=threadsafe,
36+
)
37+
2638
transaction_scope_middleware = InjectionScopeMiddleware(
2739
CQScope.TRANSACTION,
2840
exist_ok=True,
2941
threadsafe=threadsafe,
3042
)
3143
self.add_middlewares(transaction_scope_middleware)
44+
45+
if TYPE_CHECKING: # pragma: no cover
46+
47+
@overload
48+
def query_step[T: Query](
49+
self,
50+
wrapped: PipeConverterMethod[T, Any],
51+
/,
52+
) -> PipeConverterMethod[T, Any]: ...
53+
54+
@overload
55+
def query_step[T: Query](
56+
self,
57+
wrapped: None = ...,
58+
/,
59+
) -> Callable[[PipeConverterMethod[T, Any]], PipeConverterMethod[T, Any]]: ...
60+
61+
def query_step[T: Query](
62+
self,
63+
wrapped: PipeConverterMethod[T, Any] | None = None,
64+
/,
65+
) -> Any:
66+
return self.step(wrapped, dispatcher=self.__query_dispatcher)

docs/guides/pipeline.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ In CQRS, the saga pattern is typically used to orchestrate multiple commands. Ho
99
A pipeline executes a sequence of commands, where each step transforms the result of the previous command into the next command.
1010

1111
```python
12-
from cq import ContextCommandPipeline, ContextPipeline
12+
from cq import ContextCommandPipeline
1313

1414
class PaymentContext:
1515
transaction_id: int
1616

17-
pipeline: ContextPipeline[ValidateCartCommand] = ContextCommandPipeline()
17+
pipeline: ContextCommandPipeline[ValidateCartCommand] = ContextCommandPipeline()
1818

1919
@pipeline.step
2020
async def _(self, result: CartValidatedResult) -> CreateTransactionCommand:
@@ -36,6 +36,8 @@ The pipeline class acts as a context, allowing you to store intermediate values
3636

3737
Each step is a method decorated with `@pipeline.step`. It receives the result of the previous command handler and returns the next command to dispatch.
3838

39+
You can also use `@pipeline.query_step` to dispatch queries instead of commands.
40+
3941
The last step is optional. If defined, it must return `None`.
4042

4143
### Dispatching a pipeline

tests/test_context_command_pipeline.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from cq import ContextCommandPipeline, ContextPipeline, command_handler
1+
from cq import ContextCommandPipeline, command_handler, query_handler
22
from tests.helpers.history import HistoryMiddleware
33

44

@@ -11,7 +11,7 @@ class Command1: ...
1111

1212
class Command2: ...
1313

14-
class Command3: ...
14+
class Query: ...
1515

1616
class Foo: ...
1717

@@ -29,27 +29,27 @@ class CommandHandler2:
2929
async def handle(self, command: Command2) -> Bar:
3030
return Bar()
3131

32-
@command_handler
33-
class CommandHandler3:
34-
async def handle(self, command: Command3) -> Baz:
32+
@query_handler
33+
class QueryHandler:
34+
async def handle(self, query: Query) -> Baz:
3535
return Baz()
3636

3737
class Context:
3838
foo: Foo
3939
bar: Bar
4040
baz: Baz
4141

42-
pipeline: ContextPipeline[Command1] = ContextCommandPipeline()
42+
pipeline: ContextCommandPipeline[Command1] = ContextCommandPipeline()
4343

4444
@pipeline.step
4545
async def _(self, foo: Foo) -> Command2:
4646
self.foo = foo
4747
return Command2()
4848

49-
@pipeline.step
50-
async def _(self, bar: Bar) -> Command3:
49+
@pipeline.query_step
50+
async def _(self, bar: Bar) -> Query:
5151
self.bar = bar
52-
return Command3()
52+
return Query()
5353

5454
@pipeline.step
5555
async def _(self, baz: Baz) -> None:

0 commit comments

Comments
 (0)