@@ -679,6 +679,7 @@ async def _register_subscription(
679
679
groups ,
680
680
publish_callback ,
681
681
unsubscribed_callback ,
682
+ initial_payload ,
682
683
notification_queue_limit = None ,
683
684
):
684
685
"""Register a new subscription when client subscribes.
@@ -697,6 +698,7 @@ async def _register_subscription(
697
698
subscription groups current subscription belongs to.
698
699
unsubscribed_callback: Called to notify when a client
699
700
unsubscribes.
701
+ initial_payload: Initial payload.
700
702
notification_queue_limit: LImit for the subscribtion
701
703
notification queue. Default is used if not set.
702
704
@@ -720,13 +722,25 @@ async def _register_subscription(
720
722
queue_size = self .subscription_notification_queue_limit
721
723
notification_queue = asyncio .Queue (maxsize = queue_size )
722
724
725
+ # Enqueue the initial payload.
726
+ if initial_payload is not self .SKIP :
727
+ notification_queue .put_nowait (Serializer .serialize (initial_payload ))
728
+
723
729
# Start an endless task which listens the `notification_queue`
724
730
# and invokes subscription "resolver" on new notifications.
725
731
async def notifier ():
726
732
"""Watch the notification queue and notify clients."""
727
733
728
734
# Assert we run in a proper thread.
729
735
self ._assert_thread ()
736
+
737
+ # Dirty hack to partially workaround the race between:
738
+ # 1) call to `result.subscribe` in `_on_gql_start`; and
739
+ # 2) call to `trigger.on_next` below in this function.
740
+ # The first call must be earlier. Otherwise, first one or more notifications
741
+ # may be lost.
742
+ await asyncio .sleep (1 )
743
+
730
744
while True :
731
745
serialized_payload = await notification_queue .get ()
732
746
0 commit comments