Skip to content

Commit dd443ed

Browse files
Sam Daniel Thangarajanmegsri
authored andcommitted
Common core.py and session.py file added to handle and process protocol messages
1 parent cc38387 commit dd443ed

File tree

20 files changed

+204
-315
lines changed

20 files changed

+204
-315
lines changed

src/nasdaq_protocols/itch/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from nasdaq_protocols import soup
33

44
from .core import (
5-
ItchMessageId,
65
Message
76
)
87
from .session import (
@@ -15,7 +14,6 @@
1514

1615

1716
__all__ = [
18-
'ItchMessageId',
1917
'Message',
2018
'OnItchMessageCoro',
2119
'OnItchCloseCoro',

src/nasdaq_protocols/itch/core.py

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,17 @@
11
import attrs
2-
from nasdaq_protocols.common import Serializable, Byte, CommonMessage, logable
2+
from nasdaq_protocols.soup_app import SoupAppMessage
3+
from nasdaq_protocols.common import logable
34

45

56
__all__ = [
6-
'ItchMessageId',
77
'Message'
88
]
99
APP_NAME = 'ITCH'
1010

1111

12-
@attrs.define(auto_attribs=True, hash=True)
13-
class ItchMessageId(Serializable):
14-
indicator: int
15-
16-
@classmethod
17-
def from_bytes(cls, bytes_: bytes) -> tuple[int, 'ItchMessageId']:
18-
return 1, ItchMessageId(Byte.from_bytes(bytes_)[1])
19-
20-
def to_bytes(self) -> tuple[int, bytes]:
21-
return Byte.to_bytes(self.indicator)
22-
23-
def __str__(self):
24-
return f'indicator={self.indicator}'
25-
26-
2712
@attrs.define
2813
@logable
29-
class Message(CommonMessage, msg_id_cls=ItchMessageId, app_name=APP_NAME):
30-
def __init_subclass__(cls, *args, **kwargs):
31-
cls.log.debug('itch.core.Message subclassing %s, params = %s', cls.__name__, str(kwargs))
32-
33-
if 'app_name' not in kwargs:
34-
kwargs['app_name'] = APP_NAME
35-
36-
kwargs['msg_id_cls'] = ItchMessageId
37-
38-
if 'indicator' in kwargs:
39-
kwargs['msg_id'] = ItchMessageId(kwargs['indicator'])
14+
class Message(SoupAppMessage, app_name=APP_NAME):
4015

16+
def __init_subclass__(cls, **kwargs):
4117
super().__init_subclass__(**kwargs)
Lines changed: 13 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,87 +1,40 @@
1-
import asyncio
2-
from typing import Callable, Type, Awaitable
1+
from typing import Callable, Awaitable, Type
32

43
import attrs
5-
from nasdaq_protocols.common import DispatchableMessageQueue, logable
64
from nasdaq_protocols import soup
5+
from nasdaq_protocols.soup_app.session import BaseClientSession, SessionId
76
from .core import Message
87

9-
108
__all__ = [
119
'OnItchMessageCoro',
1210
'OnItchCloseCoro',
1311
'ItchSessionId',
1412
'ClientSession'
1513
]
14+
1615
OnItchMessageCoro = Callable[[Type[Message]], Awaitable[None]]
1716
OnItchCloseCoro = Callable[[], Awaitable[None]]
1817

1918

2019
@attrs.define(auto_attribs=True)
21-
class ItchSessionId:
20+
class ItchSessionId(SessionId):
2221
soup_session_id: soup.SoupSessionId = None
23-
24-
def __str__(self):
25-
if self.soup_session_id:
26-
return f'itch-{self.soup_session_id}'
27-
return 'itch-nosoup'
22+
protocol_name: str = "itch"
2823

2924

3025
@attrs.define(auto_attribs=True)
31-
@logable
32-
class ClientSession:
33-
soup_session: soup.SoupClientSession
34-
on_msg_coro: OnItchMessageCoro = None
35-
on_close_coro: OnItchCloseCoro = None
36-
closed: bool = False
37-
_session_id: ItchSessionId = None
38-
_close_event: asyncio.Event = None
39-
_message_queue: DispatchableMessageQueue = None
40-
41-
def __attrs_post_init__(self):
42-
self._session_id = ItchSessionId(self.soup_session.session_id)
43-
self._message_queue = DispatchableMessageQueue(self._session_id, self.on_msg_coro)
44-
self.soup_session.set_handlers(on_msg_coro=self._on_soup_message, on_close_coro=self._on_soup_close)
45-
self.soup_session.start_dispatching()
46-
47-
async def receive_message(self):
48-
"""
49-
Asynchronously receive a message from the itch session.
50-
51-
This method blocks until a message is received by the session.
52-
"""
53-
return await self._message_queue.get()
54-
55-
async def close(self):
56-
"""
57-
Asynchronously close the itch session.
58-
"""
59-
if self._close_event or self.closed:
60-
self.log.debug('%s> closing in progress..', self._session_id)
61-
return
62-
self._close_event = asyncio.Event()
63-
self.soup_session.initiate_close()
64-
await self._close_event.wait()
65-
self.log.debug('%s> closed.', self._session_id)
26+
class ClientSession(BaseClientSession):
27+
"""ITCH protocol client session implementation."""
6628

67-
async def _on_soup_message(self, message: soup.SoupMessage):
68-
if isinstance(message, soup.SequencedData):
69-
self.log.debug('%s> incoming sequenced bytes_', self._session_id)
70-
await self._message_queue.put(
71-
self.decode(message.data)[1]
72-
)
29+
def _create_session_id(self):
30+
return ItchSessionId(self.soup_session.session_id)
7331

74-
async def _on_soup_close(self):
75-
await self._message_queue.stop()
76-
if self.on_close_coro is not None:
77-
await self.on_close_coro()
78-
if self._close_event:
79-
self._close_event.set()
80-
self.closed = True
32+
def send_message(self, msg: Message):
33+
raise NotImplementedError("ITCH protocol does not support sending messages")
8134

8235
@classmethod
83-
def decode(cls, bytes_: bytes):
36+
def decode(cls, bytes_: bytes): # pylint: disable=W0221
8437
"""
85-
Decode the given bytes into an itch message.
38+
Decode the given bytes into an ITCH message.
8639
"""
8740
return Message.from_bytes(bytes_)

src/nasdaq_protocols/ouch/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from nasdaq_protocols import soup
33

44
from .core import (
5-
OuchMessageId,
65
Message
76
)
87
from .session import (
@@ -15,7 +14,6 @@
1514

1615

1716
__all__ = [
18-
'OuchMessageId',
1917
'Message',
2018
'OnOuchMessageCoro',
2119
'OnOuchCloseCoro',

src/nasdaq_protocols/ouch/core.py

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,17 @@
1-
from abc import ABC
2-
31
import attrs
4-
from nasdaq_protocols.common import Serializable, Byte, CommonMessage, logable
2+
from nasdaq_protocols.soup_app import SoupAppMessage
3+
from nasdaq_protocols.common import logable
54

65

76
__all__ = [
8-
'OuchMessageId',
97
'Message'
108
]
119
APP_NAME = 'OUCH'
1210

1311

14-
@attrs.define(auto_attribs=True, hash=True)
15-
class OuchMessageId(Serializable, ABC):
16-
indicator: int
17-
direction: str = 'outgoing'
18-
19-
def to_bytes(self) -> tuple[int, bytes]:
20-
return Byte.to_bytes(self.indicator)
21-
22-
@classmethod
23-
def from_bytes(cls, bytes_: bytes) -> tuple[int, 'OuchMessageId']:
24-
return 1, OuchMessageId(Byte.from_bytes(bytes_)[1])
25-
26-
def __str__(self):
27-
return f'indicator={self.indicator}, direction={self.direction}'
28-
29-
3012
@attrs.define
3113
@logable
32-
class Message(CommonMessage, msg_id_cls=OuchMessageId, app_name=APP_NAME):
33-
34-
IncomingMsgClasses = []
35-
OutgoingMsgsClasses = []
36-
37-
def __init_subclass__(cls, *args, **kwargs):
38-
cls.log.debug('ouch.core.Message subclassing %s, params = %s', cls.__name__, str(kwargs))
39-
40-
if 'app_name' not in kwargs:
41-
kwargs['app_name'] = APP_NAME
42-
43-
kwargs['msg_id_cls'] = OuchMessageId
14+
class Message(SoupAppMessage, app_name=APP_NAME):
4415

45-
if all(k in kwargs for k in ['direction', 'indicator']):
46-
kwargs['msg_id'] = OuchMessageId(kwargs['indicator'], kwargs['direction'])
47-
container = cls.IncomingMsgClasses if kwargs['direction'] == 'incoming' else cls.OutgoingMsgsClasses
48-
container.append(cls)
16+
def __init_subclass__(cls, **kwargs):
4917
super().__init_subclass__(**kwargs)
Lines changed: 9 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,90 +1,35 @@
1-
import asyncio
2-
from typing import Callable, Type, Awaitable
1+
from typing import Callable, Awaitable, Type
32

43
import attrs
5-
from nasdaq_protocols.common import DispatchableMessageQueue, logable
64
from nasdaq_protocols import soup
5+
from nasdaq_protocols.soup_app.session import BaseClientSession, SessionId
76
from .core import Message
87

9-
108
__all__ = [
119
'OnOuchMessageCoro',
1210
'OnOuchCloseCoro',
1311
'OuchSessionId',
1412
'ClientSession'
1513
]
14+
1615
OnOuchMessageCoro = Callable[[Type[Message]], Awaitable[None]]
1716
OnOuchCloseCoro = Callable[[], Awaitable[None]]
1817

1918

2019
@attrs.define(auto_attribs=True)
21-
class OuchSessionId:
20+
class OuchSessionId(SessionId):
2221
soup_session_id: soup.SoupSessionId = None
23-
24-
def __str__(self):
25-
if self.soup_session_id:
26-
return f'ouch-{self.soup_session_id}'
27-
return 'ouch-nosoup'
22+
protocol_name: str = "ouch"
2823

2924

3025
@attrs.define(auto_attribs=True)
31-
@logable
32-
class ClientSession:
33-
soup_session: soup.SoupClientSession
34-
on_msg_coro: OnOuchMessageCoro = None
35-
on_close_coro: OnOuchCloseCoro = None
36-
closed: bool = False
37-
_session_id: OuchSessionId = None
38-
_close_event: asyncio.Event = None
39-
_message_queue: DispatchableMessageQueue = None
40-
41-
def __attrs_post_init__(self):
42-
self._session_id = OuchSessionId(self.soup_session.session_id)
43-
self._message_queue = DispatchableMessageQueue(self._session_id, self.on_msg_coro)
44-
self.soup_session.set_handlers(on_msg_coro=self._on_soup_message, on_close_coro=self._on_soup_close)
45-
self.soup_session.start_dispatching()
46-
47-
async def receive_message(self):
48-
"""
49-
Asynchronously receive a message from the ouch session.
50-
51-
This method blocks until a message is received by the session.
52-
"""
53-
return await self._message_queue.get()
54-
55-
def send_message(self, msg: Message):
56-
"""
57-
Send a message to the Ouch Server.
58-
"""
59-
self.soup_session.send_unseq_data(msg.to_bytes()[1])
60-
61-
async def close(self):
62-
"""
63-
Asynchronously close the ouch session.
64-
"""
65-
if self._close_event or self.closed:
66-
self.log.debug('%s> closing in progress..', self._session_id)
67-
return
68-
self._close_event = asyncio.Event()
69-
self.soup_session.initiate_close()
70-
await self._close_event.wait()
71-
self.log.debug('%s> closed.', self._session_id)
72-
73-
async def _on_soup_message(self, message: soup.SoupMessage):
74-
if isinstance(message, soup.SequencedData):
75-
self.log.debug('%s> incoming sequenced bytes_', self._session_id)
76-
await self._message_queue.put(self.decode(message.data)[1])
26+
class ClientSession(BaseClientSession):
7727

78-
async def _on_soup_close(self):
79-
await self._message_queue.stop()
80-
if self.on_close_coro is not None:
81-
await self.on_close_coro()
82-
if self._close_event:
83-
self._close_event.set()
84-
self.closed = True
28+
def _create_session_id(self):
29+
return OuchSessionId(self.soup_session.session_id)
8530

8631
def decode(self, bytes_: bytes):
8732
"""
88-
Decode the given bytes into an itch message.
33+
Decode the given bytes into an OUCH message.
8934
"""
9035
return Message.from_bytes(bytes_)

src/nasdaq_protocols/soup/_reader.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,5 @@ def deserialize(self) -> Any:
2121

2222
_, msg = SoupMessage.from_bytes(self._buffer[:siz + 2])
2323
self._buffer = self._buffer[siz + 2:]
24-
buff_len -= (siz+2)
2524

2625
return msg, msg.is_logout(), msg.is_heartbeat()
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from .core import (
2+
SoupAppMessageId,
3+
SoupAppMessage
4+
)
5+
6+
__all__ = [
7+
'SoupAppMessageId',
8+
'SoupAppMessage'
9+
]
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import attrs
2+
from nasdaq_protocols.common import Serializable, Byte, CommonMessage, logable
3+
4+
5+
__all__ = [
6+
'SoupAppMessageId',
7+
'SoupAppMessage'
8+
]
9+
10+
11+
@attrs.define(auto_attribs=True, hash=True)
12+
class SoupAppMessageId(Serializable):
13+
indicator: int
14+
direction: str = 'outgoing'
15+
16+
@classmethod
17+
def from_bytes(cls, bytes_: bytes) -> tuple[int, 'SoupAppMessageId']:
18+
return 1, cls(Byte.from_bytes(bytes_)[1])
19+
20+
def to_bytes(self) -> tuple[int, bytes]:
21+
return Byte.to_bytes(self.indicator)
22+
23+
def __str__(self):
24+
return f'indicator={self.indicator}, direction={self.direction}'
25+
26+
27+
@attrs.define
28+
@logable
29+
class SoupAppMessage(CommonMessage):
30+
IncomingMsgClasses = []
31+
OutgoingMsgsClasses = []
32+
33+
def __init_subclass__(cls, *args, **kwargs):
34+
cls.log.debug('%s subclassing %s, params = %s', cls.__mro__[1].__name__, cls.__name__, str(kwargs))
35+
36+
app_name = kwargs.get('app_name')
37+
kwargs['app_name'] = app_name
38+
kwargs['msg_id_cls'] = SoupAppMessageId
39+
40+
if 'indicator' in kwargs and 'direction' in kwargs:
41+
kwargs['msg_id'] = SoupAppMessageId(kwargs['indicator'], kwargs['direction'])
42+
43+
super().__init_subclass__(**kwargs)

0 commit comments

Comments
 (0)