@@ -197,7 +197,7 @@ def get_regular_task(queue=None):
197197    if  not  messages :
198198      return  None 
199199
200-     task  =  get_task_from_message (messages [0 ])
200+     task  =  get_task_from_message (messages [0 ],  queue )
201201    if  task :
202202      return  task 
203203
@@ -296,7 +296,7 @@ def get_postprocess_task():
296296  messages  =  pubsub_puller .get_messages (max_messages = 1 )
297297  if  not  messages :
298298    return  None 
299-   task  =  get_task_from_message (messages [0 ])
299+   task  =  get_task_from_message (messages [0 ],  POSTPROCESS_QUEUE )
300300  if  task :
301301    logs .info ('Pulled from postprocess queue.' )
302302  return  task 
@@ -311,7 +311,7 @@ def get_preprocess_task():
311311  messages  =  pubsub_puller .get_messages (max_messages = 1 )
312312  if  not  messages :
313313    return  None 
314-   task  =  get_task_from_message (messages [0 ])
314+   task  =  get_task_from_message (messages [0 ],  PREPROCESS_QUEUE )
315315  if  task :
316316    logs .info ('Pulled from preprocess queue.' )
317317  return  task 
@@ -377,9 +377,9 @@ def get_task():
377377  return  task 
378378
379379
380- def  construct_payload (command , argument , job ):
380+ def  construct_payload (command , argument , job ,  queue = None ):
381381  """Constructs payload for task, a standard description of tasks.""" 
382-   return  ' ' .join ([command , str (argument ), str (job )])
382+   return  ' ' .join ([command , str (argument ), str (job ),  str ( queue ) ])
383383
384384
385385class  Task :
@@ -392,24 +392,26 @@ def __init__(self,
392392               eta = None ,
393393               is_command_override = False ,
394394               high_end = False ,
395-                extra_info = None ):
395+                extra_info = None ,
396+                queue = None ):
396397    self .command  =  command 
397398    self .argument  =  argument 
398399    self .job  =  job 
399400    self .eta  =  eta 
400401    self .is_command_override  =  is_command_override 
401402    self .high_end  =  high_end 
402403    self .extra_info  =  extra_info 
404+     self .queue  =  queue 
403405
404406  def  __repr__ (self ):
405-     return  f'Task: { self .command } { self .argument } { self .job }  
407+     return  f'Task: { self .command } { self .argument } { self .job }   { self . queue }  
406408
407409  def  attribute (self , _ ):
408410    return  None 
409411
410412  def  payload (self ):
411413    """Get the payload.""" 
412-     return  construct_payload (self .command , self .argument , self .job )
414+     return  construct_payload (self .command , self .argument , self .job ,  self . queue )
413415
414416  def  to_pubsub_message (self ):
415417    """Convert the task to a pubsub message.""" 
@@ -437,6 +439,10 @@ def lease(self):
437439    yield 
438440    track_task_end ()
439441
442+   def  set_queue (self , queue ):
443+     self .queue  =  queue 
444+     return  self 
445+ 
440446
441447class  PubSubTask (Task ):
442448  """A Pub/Sub task.""" 
@@ -503,7 +509,7 @@ def dont_retry(self):
503509    self ._pubsub_message .ack ()
504510
505511
506- def  get_task_from_message (message ) ->  Optional [PubSubTask ]:
512+ def  get_task_from_message (message ,  queue = None ) ->  Optional [PubSubTask ]:
507513  """Returns a task constructed from the first of |messages| if possible.""" 
508514  if  message  is  None :
509515    return  None 
@@ -514,6 +520,7 @@ def get_task_from_message(message) -> Optional[PubSubTask]:
514520    message .ack ()
515521    return  None 
516522
523+   task  =  task .set_queue (queue )
517524  # Check that this task should be run now (past the ETA). Otherwise we defer 
518525  # its execution. 
519526  if  task .defer ():
@@ -528,15 +535,15 @@ def get_utask_mains() -> List[PubSubTask]:
528535  pubsub_puller  =  PubSubPuller (UTASK_MAINS_QUEUE )
529536  messages  =  pubsub_puller .get_messages_time_limited (MAX_UTASKS ,
530537                                                     UTASK_QUEUE_PULL_SECONDS )
531-   return  handle_multiple_utask_main_messages (messages )
538+   return  handle_multiple_utask_main_messages (messages ,  UTASK_MAINS_QUEUE )
532539
533540
534- def  handle_multiple_utask_main_messages (messages ) ->  List [PubSubTask ]:
541+ def  handle_multiple_utask_main_messages (messages ,  queue ) ->  List [PubSubTask ]:
535542  """Merges tasks specified in |messages| into a list for processing on this 
536543  bot.""" 
537544  tasks  =  []
538545  for  message  in  messages :
539-     task  =  get_task_from_message (message )
546+     task  =  get_task_from_message (message ,  queue )
540547    if  task  is  None :
541548      continue 
542549    tasks .append (task )
0 commit comments