16
16
import time
17
17
18
18
# custom packages
19
+ from . import functions
20
+ from . import const as Const
21
+ from .common import Request
19
22
20
23
# typing not natively supported on MicroPython
21
24
from .typing import dict_keys , List , Optional , Union
@@ -41,6 +44,157 @@ def __init__(self, itf, addr_list: List[int]) -> None:
41
44
for reg_type in self ._changeable_register_types :
42
45
self ._changed_registers [reg_type ] = dict ()
43
46
47
+ def process (self ) -> bool :
48
+ """
49
+ Process the Modbus requests.
50
+
51
+ :returns: Result of processing, True on success, False otherwise
52
+ :rtype: bool
53
+ """
54
+ reg_type = None
55
+ req_type = None
56
+
57
+ request = self ._itf .get_request (unit_addr_list = self ._addr_list ,
58
+ timeout = 0 )
59
+ if request is None :
60
+ return False
61
+
62
+ if request .function == Const .READ_COILS :
63
+ # Coils (setter+getter) [0, 1]
64
+ # function 01 - read single register
65
+ reg_type = 'COILS'
66
+ req_type = 'READ'
67
+ elif request .function == Const .READ_DISCRETE_INPUTS :
68
+ # Ists (only getter) [0, 1]
69
+ # function 02 - read input status (discrete inputs/digital input)
70
+ reg_type = 'ISTS'
71
+ req_type = 'READ'
72
+ elif request .function == Const .READ_HOLDING_REGISTERS :
73
+ # Hregs (setter+getter) [0, 65535]
74
+ # function 03 - read holding register
75
+ reg_type = 'HREGS'
76
+ req_type = 'READ'
77
+ elif request .function == Const .READ_INPUT_REGISTER :
78
+ # Iregs (only getter) [0, 65535]
79
+ # function 04 - read input registers
80
+ reg_type = 'IREGS'
81
+ req_type = 'READ'
82
+ elif (request .function == Const .WRITE_SINGLE_COIL or
83
+ request .function == Const .WRITE_MULTIPLE_COILS ):
84
+ # Coils (setter+getter) [0, 1]
85
+ # function 05 - write single coil
86
+ # function 15 - write multiple coil
87
+ reg_type = 'COILS'
88
+ req_type = 'WRITE'
89
+ elif (request .function == Const .WRITE_SINGLE_REGISTER or
90
+ request .function == Const .WRITE_MULTIPLE_REGISTERS ):
91
+ # Hregs (setter+getter) [0, 65535]
92
+ # function 06 - write holding register
93
+ # function 16 - write multiple holding register
94
+ reg_type = 'HREGS'
95
+ req_type = 'WRITE'
96
+ else :
97
+ request .send_exception (Const .ILLEGAL_FUNCTION )
98
+
99
+ if reg_type :
100
+ if req_type == 'READ' :
101
+ self ._process_read_access (request = request , reg_type = reg_type )
102
+ elif req_type == 'WRITE' :
103
+ self ._process_write_access (request = request , reg_type = reg_type )
104
+
105
+ return True
106
+
107
+ def _create_response (self ,
108
+ request : Request ,
109
+ reg_type : str ) -> Union [bool , int ,
110
+ List [bool ], List [int ]]:
111
+ """
112
+ Create a response.
113
+
114
+ :param request: The request
115
+ :type request: Request
116
+ :param reg_type: The register type
117
+ :type reg_type: str
118
+
119
+ :returns: Values of this register
120
+ :rtype: Union[bool, int, List[int], List[bool]]
121
+ """
122
+ data = []
123
+ if type (self ._register_dict [reg_type ][request .register_addr ]) is list :
124
+ data = self ._register_dict [reg_type ][request .register_addr ]
125
+ else :
126
+ data = [self ._register_dict [reg_type ][request .register_addr ]]
127
+
128
+ return data [:request .quantity ]
129
+
130
+ def _process_read_access (self , request : Request , reg_type : str ) -> None :
131
+ """
132
+ Process read access to register
133
+
134
+ :param request: The request
135
+ :type request: Request
136
+ :param reg_type: The register type
137
+ :type reg_type: str
138
+ """
139
+ if request .register_addr in self ._register_dict [reg_type ]:
140
+ vals = self ._create_response (request = request , reg_type = reg_type )
141
+ request .send_response (vals )
142
+ else :
143
+ request .send_exception (Const .ILLEGAL_DATA_ADDRESS )
144
+
145
+ def _process_write_access (self , request : Request , reg_type : str ) -> None :
146
+ """
147
+ Process write access to register
148
+
149
+ :param request: The request
150
+ :type request: Request
151
+ :param reg_type: The register type
152
+ :type reg_type: str
153
+ """
154
+ address = request .register_addr
155
+ val = 0
156
+ valid_register = False
157
+
158
+ if address in self ._register_dict [reg_type ]:
159
+ if reg_type == 'COILS' :
160
+ valid_register = True
161
+
162
+ if request .function == Const .WRITE_SINGLE_COIL :
163
+ val = request .data [0 ]
164
+ if 0x00 < val < 0xFF :
165
+ valid_register = False
166
+ request .send_exception (Const .ILLEGAL_DATA_VALUE )
167
+ else :
168
+ val = (val == 0xFF )
169
+ elif request .function == Const .WRITE_MULTIPLE_COILS :
170
+ tmp = int .from_bytes (request .data , "big" )
171
+ val = [
172
+ bool (tmp & (1 << n )) for n in range (request .quantity )
173
+ ]
174
+
175
+ if valid_register :
176
+ self .set_coil (address = address , value = val )
177
+ elif reg_type == 'HREGS' :
178
+ valid_register = True
179
+ val = list (functions .to_short (byte_array = request .data ,
180
+ signed = False ))
181
+
182
+ if request .function == Const .WRITE_SINGLE_REGISTER :
183
+ self .set_hreg (address = address , value = val [0 ])
184
+ elif request .function == Const .WRITE_MULTIPLE_REGISTERS :
185
+ self .set_hreg (address = address , value = val )
186
+ else :
187
+ # nothing except holding registers or coils can be set
188
+ request .send_exception (Const .ILLEGAL_FUNCTION )
189
+
190
+ if valid_register :
191
+ request .send_response ()
192
+ self ._set_changed_register (reg_type = reg_type ,
193
+ address = address ,
194
+ value = val )
195
+ else :
196
+ request .send_exception (Const .ILLEGAL_DATA_ADDRESS )
197
+
44
198
def add_coil (self ,
45
199
address : int ,
46
200
value : Union [bool , List [bool ]] = False ) -> None :
0 commit comments