1717from octue .cloud .pub_sub .message_handler import OrderedMessageHandler
1818from octue .cloud .service_id import convert_service_id_to_pub_sub_form , create_service_sruid , validate_service_sruid
1919from octue .compatibility import warn_if_incompatible
20+ from octue .utils .decoders import OctueJSONDecoder
2021from octue .utils .encoders import OctueJSONEncoder
2122from octue .utils .exceptions import convert_exception_to_primitives
2223from octue .utils .objects import get_nested_attribute
@@ -286,14 +287,12 @@ def ask(
286287 )
287288 answer_subscription .create (allow_existing = False )
288289
289- serialised_input_manifest = None
290-
291290 if input_manifest is not None :
292291 input_manifest .use_signed_urls_for_datasets ()
293- serialised_input_manifest = input_manifest .serialise ()
292+ input_manifest = input_manifest .to_primitive ()
294293
295294 self ._send_message (
296- {"input_values" : input_values , "input_manifest" : serialised_input_manifest , "children" : children },
295+ {"input_values" : input_values , "input_manifest" : input_manifest , "children" : children },
297296 topic = question_topic ,
298297 question_uuid = question_uuid ,
299298 forward_logs = subscribe_to_logs ,
@@ -474,6 +473,8 @@ def _parse_question(self, question):
474473 :param dict|Message question:
475474 :return (dict, str, bool, str|None, bool):
476475 """
476+ logger .info ("%r received a question." , self )
477+
477478 try :
478479 # Parse question directly from Pub/Sub or Dataflow.
479480 data = json .loads (question .data .decode ())
@@ -486,7 +487,9 @@ def _parse_question(self, question):
486487 # Parse question from Google Cloud Run.
487488 data = json .loads (base64 .b64decode (question ["data" ]).decode ("utf-8" ).strip ())
488489
489- logger .info ("%r received a question." , self )
490+ # Keep backwards compatibility with questions from Octue services running `octue<0.41.1`.
491+ if isinstance (data ["input_manifest" ], str ):
492+ data ["input_manifest" ] = json .loads (data ["input_manifest" ], cls = OctueJSONDecoder )
490493
491494 question_uuid = get_nested_attribute (question , "attributes.question_uuid" )
492495 forward_logs = bool (int (get_nested_attribute (question , "attributes.forward_logs" )))
@@ -504,4 +507,5 @@ def _parse_question(self, question):
504507 except AttributeError :
505508 allow_save_diagnostics_data_on_crash = False
506509
510+ logger .info ("%r parsed the question successfully." , self )
507511 return data , question_uuid , forward_logs , parent_sdk_version , allow_save_diagnostics_data_on_crash
0 commit comments