Skip to content
45 changes: 40 additions & 5 deletions src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,16 @@ def default_queue_suffix():
logs.info(f'QUEUE_OVERRIDE is [{queue_override}]. '
f'Platform is {environment.platform()}')
if queue_override:
return queue_suffix_for_platform(queue_override)
platform = queue_override
else:
platform = environment.platform()

platform_suffix = queue_suffix_for_platform(platform)
base_os_version = environment.get_value('BASE_OS_VERSION')
if base_os_version and 'LINUX' in platform.upper():
platform_suffix = f'{platform_suffix}-{base_os_version}'

return queue_suffix_for_platform(environment.platform())
return platform_suffix


def regular_queue(prefix=JOBS_PREFIX):
Expand Down Expand Up @@ -296,7 +303,13 @@ def get_postprocess_task():
# wasting our precious non-linux bots on generic postprocess tasks.
if not environment.platform().lower() == 'linux':
return None
pubsub_puller = PubSubPuller(POSTPROCESS_QUEUE)

queue_name = POSTPROCESS_QUEUE
base_os_version = environment.get_value('BASE_OS_VERSION')
if base_os_version:
queue_name = f'{queue_name}-{base_os_version}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be an static configuration.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. In this case, the dynamic approach is needed so the bot can be aware of its own OS environment. The BASE_OS_VERSION is set at the infrastructure level (in the Docker image), allowing each worker to correctly identify which filtered queue it should pull from.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Couldn't we set the POSTPROCESS_QUEUE = -<base_os_version> on the start bot? Then we can just use the POSTPROCESS_QUEUE instead of creating it everytime?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I considered that, but I prefer keeping the logic inside the function for clarity.

This way, it's explicit that the queue name is conditional, and we avoid modifying a global constant at startup, which could be confusing to debug later. The performance cost is negligible, so I think this approach is cleaner and easier to maintain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree this approach improves the clarity, and I think that following the pattern of having a constant would be easier to debug, but it's your call

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. For me, it's a trade-off between having the logic in one place at startup vs. having it be explicit where the value is used.

I personally prefer keeping the conditional logic self-contained in the function, as I think it's clearer for future readers.

Thanks for the discussion, I appreciate the feedback.


pubsub_puller = PubSubPuller(queue_name)
logs.info('Pulling from postprocess queue')
messages = pubsub_puller.get_messages(max_messages=1)
if not messages:
Expand All @@ -312,7 +325,12 @@ def allow_all_tasks():


def get_preprocess_task():
pubsub_puller = PubSubPuller(PREPROCESS_QUEUE)
queue_name = PREPROCESS_QUEUE
base_os_version = environment.get_value('BASE_OS_VERSION')
if base_os_version:
queue_name = f'{queue_name}-{base_os_version}'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same


pubsub_puller = PubSubPuller(queue_name)
messages = pubsub_puller.get_messages(max_messages=1)
if not messages:
return None
Expand Down Expand Up @@ -587,7 +605,12 @@ def get_task_from_message(message, queue=None, can_defer=True,
def get_utask_mains() -> List[PubSubTask]:
"""Returns a list of tasks for preprocessing many utasks on this bot and then
running the uworker_mains in the same batch job."""
pubsub_puller = PubSubPuller(UTASK_MAIN_QUEUE)
queue_name = UTASK_MAIN_QUEUE
base_os_version = environment.get_value('BASE_OS_VERSION')
if base_os_version:
queue_name = f'{queue_name}-{base_os_version}'

pubsub_puller = PubSubPuller(queue_name)
messages = pubsub_puller.get_messages_time_limited(MAX_UTASKS,
UTASK_QUEUE_PULL_SECONDS)
return handle_multiple_utask_main_messages(messages, UTASK_MAIN_QUEUE)
Expand Down Expand Up @@ -758,6 +781,18 @@ def add_task(command,
if not job:
raise Error(f'Job {job_type} not found.')

# Determine base_os_version.
base_os_version = job.base_os_version
if job.is_external():
oss_fuzz_project = data_types.OssFuzzProject.get_by_id(job.project)
if oss_fuzz_project and oss_fuzz_project.base_os_version:
base_os_version = oss_fuzz_project.base_os_version

if base_os_version:
if extra_info is None:
extra_info = {}
extra_info['base_os_version'] = base_os_version

if job.is_external():
external_tasks.add_external_task(command, argument, job)
return
Expand Down
155 changes: 154 additions & 1 deletion src/clusterfuzz/_internal/tests/core/base/tasks/tasks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for tasks."""

import unittest
from unittest import mock

from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.datastore import data_types
from clusterfuzz._internal.tests.test_libs import test_utils


class InitializeTaskTest(unittest.TestCase):
Expand Down Expand Up @@ -296,3 +297,155 @@ def test_set_queue(self):
task = tasks.get_task_from_message(mock.Mock())

self.assertEqual(task.queue, mock_queue)


@test_utils.with_cloud_emulators('datastore')
@mock.patch('clusterfuzz._internal.base.tasks.bulk_add_tasks')
@mock.patch('clusterfuzz._internal.base.external_tasks.add_external_task')
class AddTaskTest(unittest.TestCase):
"""Tests for add_task."""

def setUp(self):
self.oss_fuzz_project = data_types.OssFuzzProject(
name='d8', base_os_version='ubuntu-24-04')
self.oss_fuzz_project.put()

@mock.patch('clusterfuzz._internal.base.tasks.data_types.Job.query')
def test_add_task_internal_job_with_os_version(
self, mock_job_query, mock_add_external, mock_bulk_add):
"""Test add_task with an internal job and an OS version."""
mock_job = mock.MagicMock()
mock_job.base_os_version = 'ubuntu-20-04'
mock_job.project = 'd8'
mock_job.is_external.return_value = False
mock_job_query.return_value.get.return_value = mock_job

tasks.add_task('regression', '123', 'linux_asan_d8_dbg')

mock_add_external.assert_not_called()
mock_bulk_add.assert_called_once()
task_payload = mock_bulk_add.call_args[0][0][0]
self.assertEqual(task_payload.extra_info['base_os_version'], 'ubuntu-20-04')

@mock.patch('clusterfuzz._internal.base.tasks.data_types.Job.query')
def test_add_task_external_job_with_os_version(
self, mock_job_query, mock_add_external, mock_bulk_add):
"""Test add_task with an external (OSS-Fuzz) job and an OS version."""
mock_job = mock.MagicMock()
mock_job.base_os_version = 'ubuntu-20-04'
mock_job.project = 'd8'
mock_job.is_external.return_value = True
mock_job_query.return_value.get.return_value = mock_job

tasks.add_task('regression', '123', 'linux_asan_d8_dbg')

mock_bulk_add.assert_not_called()
mock_add_external.assert_called_once()


@mock.patch('clusterfuzz._internal.base.tasks.PubSubPuller')
@mock.patch('clusterfuzz._internal.system.environment.get_value')
class GetTaskQueueSelectionTest(unittest.TestCase):
"""Tests for dynamic queue selection in get_*_task functions."""

def test_get_preprocess_task_without_os_version(self, mock_env_get,
mock_puller):
"""Tests that get_preprocess_task selects the default queue."""
mock_puller.return_value.get_messages.return_value = []
mock_env_get.return_value = None
tasks.get_preprocess_task()
mock_puller.assert_called_with('preprocess')

def test_get_preprocess_task_with_os_version(self, mock_env_get, mock_puller):
"""Tests that get_preprocess_task selects the suffixed queue."""
mock_puller.return_value.get_messages.return_value = []
mock_env_get.return_value = 'ubuntu-24-04'
tasks.get_preprocess_task()
mock_puller.assert_called_with('preprocess-ubuntu-24-04')

@mock.patch(
'clusterfuzz._internal.base.tasks.task_utils.is_remotely_executing_utasks'
)
def test_get_postprocess_task_without_os_version(self, mock_is_remote,
mock_env_get, mock_puller):
"""Tests that get_postprocess_task selects the default queue."""
mock_is_remote.return_value = True
mock_puller.return_value.get_messages.return_value = []
with mock.patch(
'clusterfuzz._internal.system.environment.platform') as mock_platform:
mock_platform.return_value.lower.return_value = 'linux'
mock_env_get.return_value = None
tasks.get_postprocess_task()
mock_puller.assert_called_with('postprocess')

@mock.patch(
'clusterfuzz._internal.base.tasks.task_utils.is_remotely_executing_utasks'
)
def test_get_postprocess_task_with_os_version(self, mock_is_remote,
mock_env_get, mock_puller):
"""Tests that get_postprocess_task selects the suffixed queue."""
mock_is_remote.return_value = True
mock_puller.return_value.get_messages.return_value = []
with mock.patch(
'clusterfuzz._internal.system.environment.platform') as mock_platform:
mock_platform.return_value.lower.return_value = 'linux'
mock_env_get.return_value = 'ubuntu-24-04'
tasks.get_postprocess_task()
mock_puller.assert_called_with('postprocess-ubuntu-24-04')

def test_get_utask_mains_without_os_version(self, mock_env_get, mock_puller):
"""Tests that get_utask_mains selects the default queue."""
mock_puller.return_value.get_messages_time_limited.return_value = []
mock_env_get.return_value = None
tasks.get_utask_mains()
mock_puller.assert_called_with('utask_main')

def test_get_utask_mains_with_os_version(self, mock_env_get, mock_puller):
"""Tests that get_utask_mains selects the suffixed queue."""
mock_puller.return_value.get_messages_time_limited.return_value = []
mock_env_get.return_value = 'ubuntu-24-04'
tasks.get_utask_mains()
mock_puller.assert_called_with('utask_main-ubuntu-24-04')


@mock.patch('clusterfuzz._internal.system.environment.get_value')
@mock.patch('clusterfuzz._internal.system.environment.platform')
class QueueNameGenerationTest(unittest.TestCase):
"""Tests for queue name generation functions."""

def test_default_queue_suffix_linux_no_os_version(self, mock_platform,
mock_env_get):
"""Tests queue suffix for Linux without an OS version."""
mock_env_get.side_effect = lambda key, default='': {'QUEUE_OVERRIDE': ''}.get(key, default)
mock_platform.return_value = 'LINUX'
self.assertEqual(tasks.default_queue_suffix(), '-linux')

def test_default_queue_suffix_linux_with_os_version(self, mock_platform,
mock_env_get):
"""Tests queue suffix for Linux with an OS version."""
mock_env_get.side_effect = lambda key, default='': {
'BASE_OS_VERSION': 'ubuntu-24-04',
'QUEUE_OVERRIDE': ''
}.get(key, default)
mock_platform.return_value = 'LINUX'
self.assertEqual(tasks.default_queue_suffix(), '-linux-ubuntu-24-04')

def test_default_queue_suffix_mac_no_os_version(self, mock_platform,
mock_env_get):
"""Tests queue suffix for Mac without an OS version."""
mock_env_get.side_effect = lambda key, default='': {
'BASE_OS_VERSION': '',
'QUEUE_OVERRIDE': ''
}.get(key, default)
mock_platform.return_value = 'MAC'
self.assertEqual(tasks.default_queue_suffix(), '-mac')

def test_default_queue_suffix_mac_with_os_version(self, mock_platform,
mock_env_get):
"""Tests queue suffix for Mac with an OS version (should be ignored)."""
mock_env_get.side_effect = lambda key, default='': {
'BASE_OS_VERSION': 'ubuntu-24-04',
'QUEUE_OVERRIDE': ''
}.get(key, default)
mock_platform.return_value = 'MAC'
self.assertEqual(tasks.default_queue_suffix(), '-mac')
Loading