From 948ef62a80ee3da2f2436e804ceba02df81d96db Mon Sep 17 00:00:00 2001 From: dhavalts1989 Date: Mon, 7 Apr 2025 15:40:51 +0530 Subject: [PATCH] Tests the set_queue() method of a task Resolves core/../../tasks_test.py and appengine/../tasks_test.py --- .../_internal/base/tasks/__init__.py | 31 ++++++++++++------- .../tests/appengine/common/tasks_test.py | 12 ++++--- .../tests/core/base/tasks/tasks_test.py | 18 +++++++++++ 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/src/clusterfuzz/_internal/base/tasks/__init__.py b/src/clusterfuzz/_internal/base/tasks/__init__.py index 2f7b5200655..2f1a9892c4f 100644 --- a/src/clusterfuzz/_internal/base/tasks/__init__.py +++ b/src/clusterfuzz/_internal/base/tasks/__init__.py @@ -197,7 +197,7 @@ def get_regular_task(queue=None): if not messages: return None - task = get_task_from_message(messages[0]) + task = get_task_from_message(messages[0], queue) if task: return task @@ -296,7 +296,7 @@ def get_postprocess_task(): messages = pubsub_puller.get_messages(max_messages=1) if not messages: return None - task = get_task_from_message(messages[0]) + task = get_task_from_message(messages[0], POSTPROCESS_QUEUE) if task: logs.info('Pulled from postprocess queue.') return task @@ -311,7 +311,7 @@ def get_preprocess_task(): messages = pubsub_puller.get_messages(max_messages=1) if not messages: return None - task = get_task_from_message(messages[0]) + task = get_task_from_message(messages[0], PREPROCESS_QUEUE) if task: logs.info('Pulled from preprocess queue.') return task @@ -377,9 +377,9 @@ def get_task(): return task -def construct_payload(command, argument, job): +def construct_payload(command, argument, job, queue=None): """Constructs payload for task, a standard description of tasks.""" - return ' '.join([command, str(argument), str(job)]) + return ' '.join([command, str(argument), str(job), str(queue)]) class Task: @@ -392,7 +392,8 @@ def __init__(self, eta=None, is_command_override=False, high_end=False, - extra_info=None): + extra_info=None, + queue=None): self.command = command self.argument = argument self.job = job @@ -400,16 +401,17 @@ def __init__(self, self.is_command_override = is_command_override self.high_end = high_end self.extra_info = extra_info + self.queue = queue def __repr__(self): - return f'Task: {self.command} {self.argument} {self.job}' + return f'Task: {self.command} {self.argument} {self.job} {self.queue}' def attribute(self, _): return None def payload(self): """Get the payload.""" - return construct_payload(self.command, self.argument, self.job) + return construct_payload(self.command, self.argument, self.job, self.queue) def to_pubsub_message(self): """Convert the task to a pubsub message.""" @@ -437,6 +439,10 @@ def lease(self): yield track_task_end() + def set_queue(self, queue): + self.queue = queue + return self + class PubSubTask(Task): """A Pub/Sub task.""" @@ -503,7 +509,7 @@ def dont_retry(self): self._pubsub_message.ack() -def get_task_from_message(message) -> Optional[PubSubTask]: +def get_task_from_message(message, queue=None) -> Optional[PubSubTask]: """Returns a task constructed from the first of |messages| if possible.""" if message is None: return None @@ -514,6 +520,7 @@ def get_task_from_message(message) -> Optional[PubSubTask]: message.ack() return None + task = task.set_queue(queue) # Check that this task should be run now (past the ETA). Otherwise we defer # its execution. if task.defer(): @@ -528,15 +535,15 @@ def get_utask_mains() -> List[PubSubTask]: pubsub_puller = PubSubPuller(UTASK_MAINS_QUEUE) messages = pubsub_puller.get_messages_time_limited(MAX_UTASKS, UTASK_QUEUE_PULL_SECONDS) - return handle_multiple_utask_main_messages(messages) + return handle_multiple_utask_main_messages(messages, UTASK_MAINS_QUEUE) -def handle_multiple_utask_main_messages(messages) -> List[PubSubTask]: +def handle_multiple_utask_main_messages(messages, queue) -> List[PubSubTask]: """Merges tasks specified in |messages| into a list for processing on this bot.""" tasks = [] for message in messages: - task = get_task_from_message(message) + task = get_task_from_message(message, queue) if task is None: continue tasks.append(task) diff --git a/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py b/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py index 2a99dc9dfb1..c4dddaac8c0 100644 --- a/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py @@ -85,7 +85,8 @@ def test_high_end(self): self.assertEqual('test', task.command) self.assertEqual('high', task.argument) self.assertEqual('job', task.job) - self.assertEqual('test high job', task.payload()) + self.assertEqual('high-end-jobs-linux', task.queue) + self.assertEqual('test high job high-end-jobs-linux', task.payload()) def test_regular(self): """Test regular tasks.""" @@ -98,7 +99,8 @@ def test_regular(self): self.assertEqual('test', task.command) self.assertEqual('normal', task.argument) self.assertEqual('job', task.job) - self.assertEqual('test normal job', task.payload()) + self.assertEqual('jobs-linux', task.queue) + self.assertEqual('test normal job jobs-linux', task.payload()) def test_preemptible(self): """Test preemptible bot tasks.""" @@ -124,7 +126,8 @@ def test_defer(self): self.assertEqual('test', task.command) self.assertEqual('normal4', task.argument) self.assertEqual('job', task.job) - self.assertEqual('test normal4 job', task.payload()) + self.assertEqual('jobs-linux', task.queue) + self.assertEqual('test normal4 job jobs-linux', task.payload()) self.assertEqual(3, mock_modify.call_count) mock_modify.assert_has_calls([ @@ -142,7 +145,8 @@ def test_command_override(self): self.assertEqual('test', task.command) self.assertEqual('override', task.argument) self.assertEqual('job', task.job) - self.assertEqual('test override job', task.payload()) + self.assertEqual(None, task.queue) + self.assertEqual('test override job None', task.payload()) class LeaseTaskTest(unittest.TestCase): diff --git a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py index 2ecb75ffa2f..b66f1764c2d 100644 --- a/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py +++ b/src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py @@ -259,6 +259,7 @@ def test_no_message(self): def test_success(self): mock_task = mock.Mock(defer=mock.Mock(return_value=False)) + mock_task.set_queue.return_value = mock_task with mock.patch( 'clusterfuzz._internal.base.tasks.initialize_task', return_value=mock_task): @@ -278,3 +279,20 @@ def test_defer(self): 'clusterfuzz._internal.base.tasks.initialize_task', return_value=mock_task): self.assertEqual(tasks.get_task_from_message(mock.Mock()), None) + + def test_set_queue(self): + """Tests the set_queue method of a task.""" + mock_queue = mock.Mock() + mock_task = mock.Mock() + + mock_task.configure_mock( + queue=mock_queue, + set_queue=mock.Mock(return_value=mock_task), + defer=mock.Mock(return_value=False)) + + with mock.patch( + 'clusterfuzz._internal.base.tasks.initialize_task', + return_value=mock_task): + task = tasks.get_task_from_message(mock.Mock()) + + self.assertEqual(task.queue, mock_queue)