44import functools
55import json
66import logging
7+ import threading
78import uuid
89
910import google .api_core .exceptions
2526
2627
2728logger = logging .getLogger (__name__ )
29+ send_message_lock = threading .Lock ()
2830
2931DEFAULT_NAMESPACE = "default"
3032ANSWERS_NAMESPACE = "answers"
@@ -99,14 +101,15 @@ def received_messages(self):
99101 return self ._message_handler .received_messages
100102 return None
101103
102- def serve (self , timeout = None , delete_topic_and_subscription_on_exit = False , allow_existing = False ):
104+ def serve (self , timeout = None , delete_topic_and_subscription_on_exit = False , allow_existing = False , detach = False ):
103105 """Start the service as a child, waiting to accept questions from any other Octue service using Google Pub/Sub
104106 on the same Google Cloud project. Questions are accepted, processed, and answered asynchronously.
105107
106108 :param float|None timeout: time in seconds after which to shut down the child
107109 :param bool delete_topic_and_subscription_on_exit: if `True`, delete the child's topic and subscription on exiting serving mode
108110 :param bool allow_existing: if `True`, allow starting a service for which the topic and/or subscription already exists (indicating an existing service) - this connects this service to the existing service's topic and subscription
109- :return None:
111+ :param bool detach: if `True`, detach from the subscription future. The future and subscriber are returned so can still be cancelled and closed manually. Note that the topic and subscription are not automatically deleted on exit if this option is chosen.
112+ :return (google.cloud.pubsub_v1.subscriber.futures.StreamingPullFuture, google.cloud.pubsub_v1.SubscriberClient):
110113 """
111114 logger .info ("Starting %r." , self )
112115
@@ -119,39 +122,47 @@ def serve(self, timeout=None, delete_topic_and_subscription_on_exit=False, allow
119122 expiration_time = None ,
120123 )
121124
122- subscriber = pubsub_v1 .SubscriberClient ()
123-
124125 try :
125126 topic .create (allow_existing = allow_existing )
126127 subscription .create (allow_existing = allow_existing )
128+ except google .api_core .exceptions .AlreadyExists :
129+ raise octue .exceptions .ServiceAlreadyExists (f"A service with the ID { self .id !r} already exists." )
130+
131+ subscriber = pubsub_v1 .SubscriberClient ()
132+
133+ try :
127134 future = subscriber .subscribe (subscription = subscription .path , callback = self .answer )
128135
129136 logger .info (
130137 "You can now ask this service questions at %r using the `octue.resources.Child` class." ,
131138 self .id ,
132139 )
133140
134- try :
135- future . result ( timeout = timeout )
136- except ( TimeoutError , concurrent . futures . TimeoutError , KeyboardInterrupt ) :
137- future . cancel ()
138-
139- except google . api_core . exceptions . AlreadyExists :
140- raise octue . exceptions . ServiceAlreadyExists ( f"A service with the ID { self . id !r } already exists." )
141+ # If not detaching, keep answering questions until the subscriber times out (or forever if there's no
142+ # timeout).
143+ if not detach :
144+ try :
145+ future . result ( timeout = timeout )
146+ except ( TimeoutError , concurrent . futures . TimeoutError , KeyboardInterrupt ) :
147+ future . cancel ( )
141148
142149 finally :
143- if delete_topic_and_subscription_on_exit :
144- try :
145- if subscription .creation_triggered_locally :
146- subscription .delete ()
150+ # If not detaching, delete the topic and subscription deletion if required and close the subscriber.
151+ if not detach :
152+ if delete_topic_and_subscription_on_exit :
153+ try :
154+ if subscription .creation_triggered_locally :
155+ subscription .delete ()
156+
157+ if topic .creation_triggered_locally :
158+ topic .delete ()
147159
148- if topic . creation_triggered_locally :
149- topic . delete ( )
160+ except Exception :
161+ logger . error ( "Deletion of topic and/or subscription %r failed." , topic . name )
150162
151- except Exception :
152- logger .error ("Deletion of topic and/or subscription %r failed." , topic .name )
163+ subscriber .close ()
153164
154- subscriber . close ()
165+ return future , subscriber
155166
156167 def answer (self , question , answer_topic = None , heartbeat_interval = 120 , timeout = 30 ):
157168 """Answer a question from a parent - i.e. run the child's app on the given data and return the output values.
@@ -214,7 +225,6 @@ def answer(self, question, answer_topic=None, heartbeat_interval=120, timeout=30
214225 "type" : "result" ,
215226 "output_values" : analysis .output_values ,
216227 "output_manifest" : serialised_output_manifest ,
217- "message_number" : topic .messages_published ,
218228 },
219229 topic = topic ,
220230 timeout = timeout ,
@@ -377,7 +387,6 @@ def send_exception(self, topic, timeout=30):
377387 "exception_type" : exception ["type" ],
378388 "exception_message" : exception_message ,
379389 "traceback" : exception ["traceback" ],
380- "message_number" : topic .messages_published ,
381390 },
382391 topic = topic ,
383392 timeout = timeout ,
@@ -386,28 +395,36 @@ def send_exception(self, topic, timeout=30):
386395 def _send_message (self , message , topic , timeout = 30 , ** attributes ):
387396 """Send a JSON-serialised message to the given topic with optional message attributes.
388397
389- :param any message: any JSON-serialisable python primitive to send as a message
398+ :param dict message: JSON-serialisable data to send as a message
390399 :param octue.cloud.pub_sub.topic.Topic topic: the Pub/Sub topic to send the message to
391400 :param int|float timeout: the timeout for sending the message in seconds
392401 :param attributes: key-value pairs to attach to the message - the values must be strings or bytes
393402 :return None:
394403 """
395- attributes ["octue_sdk_version" ] = self ._local_sdk_version
396- converted_attributes = {}
397-
398- for key , value in attributes .items ():
399- if isinstance (value , bool ):
400- value = str (int (value ))
401- converted_attributes [key ] = value
402-
403- self .publisher .publish (
404- topic = topic .path ,
405- data = json .dumps (message , cls = OctueJSONEncoder ).encode (),
406- retry = retry .Retry (deadline = timeout ),
407- ** converted_attributes ,
408- )
404+ with send_message_lock :
405+ attributes ["octue_sdk_version" ] = self ._local_sdk_version
406+
407+ # This would be better placed in the Pub/Sub message's attributes but has been left in `message` for
408+ # inter-service backwards compatibility.
409+ message ["message_number" ] = topic .messages_published
410+ converted_attributes = {}
411+
412+ for key , value in attributes .items ():
413+ if isinstance (value , bool ):
414+ value = str (int (value ))
415+ elif isinstance (value , (int , float )):
416+ value = str (value )
417+
418+ converted_attributes [key ] = value
419+
420+ self .publisher .publish (
421+ topic = topic .path ,
422+ data = json .dumps (message , cls = OctueJSONEncoder ).encode (),
423+ retry = retry .Retry (deadline = timeout ),
424+ ** converted_attributes ,
425+ )
409426
410- topic .messages_published += 1
427+ topic .messages_published += 1
411428
412429 def _send_delivery_acknowledgment (self , topic , timeout = 30 ):
413430 """Send an acknowledgement of question receipt to the parent.
@@ -416,18 +433,17 @@ def _send_delivery_acknowledgment(self, topic, timeout=30):
416433 :param float timeout: time in seconds after which to give up sending
417434 :return None:
418435 """
419- logger .info ("%r acknowledged receipt of question." , self )
420-
421436 self ._send_message (
422437 {
423438 "type" : "delivery_acknowledgement" ,
424439 "delivery_time" : str (datetime .datetime .now ()),
425- "message_number" : topic .messages_published ,
426440 },
427441 topic = topic ,
428442 timeout = timeout ,
429443 )
430444
445+ logger .info ("%r acknowledged receipt of question." , self )
446+
431447 def _send_heartbeat (self , topic , timeout = 30 ):
432448 """Send a heartbeat to the parent, indicating that the service is alive.
433449
@@ -439,13 +455,12 @@ def _send_heartbeat(self, topic, timeout=30):
439455 {
440456 "type" : "heartbeat" ,
441457 "time" : str (datetime .datetime .now ()),
442- "message_number" : topic .messages_published ,
443458 },
444459 topic = topic ,
445460 timeout = timeout ,
446461 )
447462
448- logger .debug ("Heartbeat sent." )
463+ logger .debug ("Heartbeat sent by %r." , self )
449464
450465 def _send_monitor_message (self , data , topic , timeout = 30 ):
451466 """Send a monitor message to the parent.
@@ -455,18 +470,17 @@ def _send_monitor_message(self, data, topic, timeout=30):
455470 :param float timeout: time in seconds to retry sending the message
456471 :return None:
457472 """
458- logger .debug ("%r sending monitor message." , self )
459-
460473 self ._send_message (
461474 {
462475 "type" : "monitor_message" ,
463476 "data" : json .dumps (data , cls = OctueJSONEncoder ),
464- "message_number" : topic .messages_published ,
465477 },
466478 topic = topic ,
467479 timeout = timeout ,
468480 )
469481
482+ logger .debug ("Monitor message sent by %r." , self )
483+
470484 def _parse_question (self , question ):
471485 """Parse a question in the Google Cloud Pub/Sub or Google Cloud Run format.
472486
0 commit comments