@@ -103,6 +103,8 @@ class WorkerTemplate:
103103 shared_extra_conf : Callable [[str ], Dict [str , Any ]] = lambda _worker_name : {}
104104 worker_extra_conf : str = ""
105105
106+ stream_writers : Set [str ] = field (default_factory = set )
107+
106108 # True if and only if multiple of this worker type are allowed.
107109 sharding_allowed : bool = True
108110
@@ -226,9 +228,7 @@ class WorkerTemplate:
226228 ),
227229 "event_persister" : WorkerTemplate (
228230 listener_resources = {"replication" },
229- shared_extra_conf = lambda worker_name : {
230- "stream_writers" : {"events" : [worker_name ]}
231- },
231+ stream_writers = {"events" },
232232 ),
233233 "background_worker" : WorkerTemplate (
234234 # This worker cannot be sharded. Therefore, there should only ever be one
@@ -257,17 +257,13 @@ class WorkerTemplate:
257257 "^/_matrix/client/(r0|v3|unstable)/.*/tags" ,
258258 "^/_matrix/client/(r0|v3|unstable)/.*/account_data" ,
259259 },
260- shared_extra_conf = lambda worker_name : {
261- "stream_writers" : {"account_data" : [worker_name ]}
262- },
260+ stream_writers = {"account_data" },
263261 sharding_allowed = False ,
264262 ),
265263 "presence" : WorkerTemplate (
266264 listener_resources = {"client" , "replication" },
267265 endpoint_patterns = {"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/" },
268- shared_extra_conf = lambda worker_name : {
269- "stream_writers" : {"presence" : [worker_name ]}
270- },
266+ stream_writers = {"presence" },
271267 sharding_allowed = False ,
272268 ),
273269 "receipts" : WorkerTemplate (
@@ -276,25 +272,19 @@ class WorkerTemplate:
276272 "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt" ,
277273 "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers" ,
278274 },
279- shared_extra_conf = lambda worker_name : {
280- "stream_writers" : {"receipts" : [worker_name ]}
281- },
275+ stream_writers = {"receipts" },
282276 sharding_allowed = False ,
283277 ),
284278 "to_device" : WorkerTemplate (
285279 listener_resources = {"client" , "replication" },
286280 endpoint_patterns = {"^/_matrix/client/(r0|v3|unstable)/sendToDevice/" },
287- shared_extra_conf = lambda worker_name : {
288- "stream_writers" : {"to_device" : [worker_name ]}
289- },
281+ stream_writers = {"to_device" },
290282 sharding_allowed = False ,
291283 ),
292284 "typing" : WorkerTemplate (
293285 listener_resources = {"client" , "replication" },
294286 endpoint_patterns = {"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing" },
295- shared_extra_conf = lambda worker_name : {
296- "stream_writers" : {"typing" : [worker_name ]}
297- },
287+ stream_writers = {"typing" },
298288 sharding_allowed = False ,
299289 ),
300290}
@@ -447,6 +437,8 @@ def merge_worker_template_configs(
447437 # (This is unused, but in principle sharding this hybrid worker type
448438 # would be allowed if both constituent types are shardable)
449439 sharding_allowed = left .sharding_allowed and right .sharding_allowed ,
440+ # include stream writers from both
441+ stream_writers = left .stream_writers | right .stream_writers ,
450442 )
451443
452444
@@ -462,7 +454,10 @@ def instantiate_worker_template(
462454 Returns: worker configuration dictionary
463455 """
464456 worker_config_dict = dataclasses .asdict (template )
465- worker_config_dict ["shared_extra_conf" ] = template .shared_extra_conf (worker_name )
457+ stream_writers_dict = {
458+ writer : worker_name for writer in template .stream_writers
459+ }
460+ worker_config_dict ["shared_extra_conf" ] = merged (template .shared_extra_conf (worker_name ), stream_writers_dict )
466461 worker_config_dict ["endpoint_patterns" ] = sorted (template .endpoint_patterns )
467462 worker_config_dict ["listener_resources" ] = sorted (template .listener_resources )
468463 return worker_config_dict
0 commit comments