Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def get_regular_task(queue=None):
if not messages:
return None

task = get_task_from_message(messages[0])
task = get_task_from_message(messages[0], queue)
if task:
return task

Expand Down Expand Up @@ -296,7 +296,7 @@ def get_postprocess_task():
messages = pubsub_puller.get_messages(max_messages=1)
if not messages:
return None
task = get_task_from_message(messages[0])
task = get_task_from_message(messages[0], POSTPROCESS_QUEUE)
if task:
logs.info('Pulled from postprocess queue.')
return task
Expand All @@ -311,7 +311,7 @@ def get_preprocess_task():
messages = pubsub_puller.get_messages(max_messages=1)
if not messages:
return None
task = get_task_from_message(messages[0])
task = get_task_from_message(messages[0], PREPROCESS_QUEUE)
if task:
logs.info('Pulled from preprocess queue.')
return task
Expand Down Expand Up @@ -377,9 +377,9 @@ def get_task():
return task


def construct_payload(command, argument, job):
def construct_payload(command, argument, job, queue=None):
"""Constructs payload for task, a standard description of tasks."""
return ' '.join([command, str(argument), str(job)])
return ' '.join([command, str(argument), str(job), str(queue)])


class Task:
Expand All @@ -392,24 +392,26 @@ def __init__(self,
eta=None,
is_command_override=False,
high_end=False,
extra_info=None):
extra_info=None,
queue=None):
self.command = command
self.argument = argument
self.job = job
self.eta = eta
self.is_command_override = is_command_override
self.high_end = high_end
self.extra_info = extra_info
self.queue = queue

def __repr__(self):
return f'Task: {self.command} {self.argument} {self.job}'
return f'Task: {self.command} {self.argument} {self.job} {self.queue}'

def attribute(self, _):
return None

def payload(self):
"""Get the payload."""
return construct_payload(self.command, self.argument, self.job)
return construct_payload(self.command, self.argument, self.job, self.queue)

def to_pubsub_message(self):
"""Convert the task to a pubsub message."""
Expand Down Expand Up @@ -437,6 +439,10 @@ def lease(self):
yield
track_task_end()

def set_queue(self, queue):
self.queue = queue
return self


class PubSubTask(Task):
"""A Pub/Sub task."""
Expand Down Expand Up @@ -503,7 +509,7 @@ def dont_retry(self):
self._pubsub_message.ack()


def get_task_from_message(message) -> Optional[PubSubTask]:
def get_task_from_message(message, queue=None) -> Optional[PubSubTask]:
"""Returns a task constructed from the first of |messages| if possible."""
if message is None:
return None
Expand All @@ -514,6 +520,7 @@ def get_task_from_message(message) -> Optional[PubSubTask]:
message.ack()
return None

task = task.set_queue(queue)
# Check that this task should be run now (past the ETA). Otherwise we defer
# its execution.
if task.defer():
Expand All @@ -528,15 +535,15 @@ def get_utask_mains() -> List[PubSubTask]:
pubsub_puller = PubSubPuller(UTASK_MAINS_QUEUE)
messages = pubsub_puller.get_messages_time_limited(MAX_UTASKS,
UTASK_QUEUE_PULL_SECONDS)
return handle_multiple_utask_main_messages(messages)
return handle_multiple_utask_main_messages(messages, UTASK_MAINS_QUEUE)


def handle_multiple_utask_main_messages(messages) -> List[PubSubTask]:
def handle_multiple_utask_main_messages(messages, queue) -> List[PubSubTask]:
"""Merges tasks specified in |messages| into a list for processing on this
bot."""
tasks = []
for message in messages:
task = get_task_from_message(message)
task = get_task_from_message(message, queue)
if task is None:
continue
tasks.append(task)
Expand Down
12 changes: 8 additions & 4 deletions src/clusterfuzz/_internal/tests/appengine/common/tasks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ def test_high_end(self):
self.assertEqual('test', task.command)
self.assertEqual('high', task.argument)
self.assertEqual('job', task.job)
self.assertEqual('test high job', task.payload())
self.assertEqual('high-end-jobs-linux', task.queue)
self.assertEqual('test high job high-end-jobs-linux', task.payload())

def test_regular(self):
"""Test regular tasks."""
Expand All @@ -98,7 +99,8 @@ def test_regular(self):
self.assertEqual('test', task.command)
self.assertEqual('normal', task.argument)
self.assertEqual('job', task.job)
self.assertEqual('test normal job', task.payload())
self.assertEqual('jobs-linux', task.queue)
self.assertEqual('test normal job jobs-linux', task.payload())

def test_preemptible(self):
"""Test preemptible bot tasks."""
Expand All @@ -124,7 +126,8 @@ def test_defer(self):
self.assertEqual('test', task.command)
self.assertEqual('normal4', task.argument)
self.assertEqual('job', task.job)
self.assertEqual('test normal4 job', task.payload())
self.assertEqual('jobs-linux', task.queue)
self.assertEqual('test normal4 job jobs-linux', task.payload())

self.assertEqual(3, mock_modify.call_count)
mock_modify.assert_has_calls([
Expand All @@ -142,7 +145,8 @@ def test_command_override(self):
self.assertEqual('test', task.command)
self.assertEqual('override', task.argument)
self.assertEqual('job', task.job)
self.assertEqual('test override job', task.payload())
self.assertEqual(None, task.queue)
self.assertEqual('test override job None', task.payload())


class LeaseTaskTest(unittest.TestCase):
Expand Down
18 changes: 18 additions & 0 deletions src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def test_no_message(self):

def test_success(self):
mock_task = mock.Mock(defer=mock.Mock(return_value=False))
mock_task.set_queue.return_value = mock_task
with mock.patch(
'clusterfuzz._internal.base.tasks.initialize_task',
return_value=mock_task):
Expand All @@ -278,3 +279,20 @@ def test_defer(self):
'clusterfuzz._internal.base.tasks.initialize_task',
return_value=mock_task):
self.assertEqual(tasks.get_task_from_message(mock.Mock()), None)

def test_set_queue(self):
"""Tests the set_queue method of a task."""
mock_queue = mock.Mock()
mock_task = mock.Mock()

mock_task.configure_mock(
queue=mock_queue,
set_queue=mock.Mock(return_value=mock_task),
defer=mock.Mock(return_value=False))

with mock.patch(
'clusterfuzz._internal.base.tasks.initialize_task',
return_value=mock_task):
task = tasks.get_task_from_message(mock.Mock())

self.assertEqual(task.queue, mock_queue)
Loading