Skip to content

Commit e1915c8

Browse files
#issue3406
Signed-off-by: Aayush Patidar <patidaraayush053@gmail.com>
1 parent 1b8dd11 commit e1915c8

File tree

4 files changed

+235
-13
lines changed

4 files changed

+235
-13
lines changed

augur/application/config.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,12 @@ def redact_setting_value(section_name, setting_name, value):
9090
"core_collection_interval_days": 15,
9191
"secondary_collection_interval_days": 10,
9292
"facade_collection_interval_days": 10,
93-
"ml_collection_interval_days": 40
93+
"ml_collection_interval_days": 40,
94+
"non_repo_domain_tasks_interval_in_days": 30,
95+
"retry_errored_repos_cron_hour": 0,
96+
"retry_errored_repos_cron_minute": 0,
97+
"process_contributors_interval_in_seconds": 3600,
98+
"create_collection_status_records_interval_in_seconds": 86400
9499
},
95100
"Message_Insights": {
96101
"insight_days": 30,

augur/tasks/init/celery_app.py

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,15 @@ def setup_periodic_tasks(sender, **kwargs):
235235
logger.info(f"Scheduling collection every {collection_interval/60} minutes")
236236
sender.add_periodic_task(collection_interval, augur_collection_monitor.s())
237237

238-
#Do longer tasks less often
239-
logger.info(f"Scheduling data analysis every 30 days")
240-
thirty_days_in_seconds = 30*24*60*60
241-
sender.add_periodic_task(thirty_days_in_seconds, non_repo_domain_tasks.s())
238+
# non_repo_domain_tasks
239+
non_repo_domain_tasks_val = config.get_value('Tasks', 'non_repo_domain_tasks_interval_in_days')
240+
non_repo_domain_tasks_interval = int(non_repo_domain_tasks_val) if non_repo_domain_tasks_val is not None else 30
241+
if non_repo_domain_tasks_interval > 0:
242+
logger.info(f"Scheduling non_repo_domain_tasks every {non_repo_domain_tasks_interval} days")
243+
sender.add_periodic_task(datetime.timedelta(days=non_repo_domain_tasks_interval), non_repo_domain_tasks.s())
244+
else:
245+
logger.info("non_repo_domain_tasks is disabled (interval <= 0)")
246+
242247

243248
mat_views_interval = int(config.get_value('Celery', 'refresh_materialized_views_interval_in_days'))
244249
if mat_views_interval > 0:
@@ -250,14 +255,32 @@ def setup_periodic_tasks(sender, **kwargs):
250255
# logger.info(f"Scheduling update of collection weights on midnight each day")
251256
# sender.add_periodic_task(crontab(hour=0, minute=0),augur_collection_update_weights.s())
252257

253-
logger.info(f"Setting 404 repos to be marked for retry on midnight each day")
254-
sender.add_periodic_task(crontab(hour=0, minute=0),retry_errored_repos.s())
255-
256-
one_hour_in_seconds = 60*60
257-
sender.add_periodic_task(one_hour_in_seconds, process_contributors.s())
258-
259-
one_day_in_seconds = 24*60*60
260-
sender.add_periodic_task(one_day_in_seconds, create_collection_status_records.s())
258+
# retry_errored_repos
259+
retry_repos_hour_val = config.get_value('Tasks', 'retry_errored_repos_cron_hour')
260+
retry_repos_minute_val = config.get_value('Tasks', 'retry_errored_repos_cron_minute')
261+
retry_repos_hour = int(retry_repos_hour_val) if retry_repos_hour_val is not None else 0
262+
retry_repos_minute = int(retry_repos_minute_val) if retry_repos_minute_val is not None else 0
263+
264+
logger.info(f"Scheduling retry_errored_repos at {retry_repos_hour}:{retry_repos_minute}")
265+
sender.add_periodic_task(crontab(hour=retry_repos_hour, minute=retry_repos_minute), retry_errored_repos.s())
266+
267+
# process_contributors
268+
process_contributors_val = config.get_value('Tasks', 'process_contributors_interval_in_seconds')
269+
process_contributors_interval = int(process_contributors_val) if process_contributors_val is not None else 3600
270+
if process_contributors_interval > 0:
271+
logger.info(f"Scheduling process_contributors every {process_contributors_interval} seconds")
272+
sender.add_periodic_task(process_contributors_interval, process_contributors.s())
273+
else:
274+
logger.info("process_contributors is disabled")
275+
276+
# create_collection_status_records
277+
status_records_val = config.get_value('Tasks', 'create_collection_status_records_interval_in_seconds')
278+
create_collection_status_records_interval = int(status_records_val) if status_records_val is not None else 86400
279+
if create_collection_status_records_interval > 0:
280+
logger.info(f"Scheduling create_collection_status_records every {create_collection_status_records_interval} seconds")
281+
sender.add_periodic_task(create_collection_status_records_interval, create_collection_status_records.s())
282+
else:
283+
logger.info("create_collection_status_records is disabled")
261284

262285
@after_setup_logger.connect
263286
def setup_loggers(*args,**kwargs):

docs/source/development-guide/configuration-file-reference.rst

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,47 @@ Configuration file reference
33

44
Augur's configuration template file, which generates your locally deployed ``augur.config.json`` file, is found at ``augur/config.py``. You will notice a small collection of workers are turned on to start with, by examining the ``switch`` variable within the ``Workers`` block of the config file. You can also specify the number of processes to spawn for each worker using the ``workers`` command. The default is one, and we recommend you start here. If you are going to spawn multiple workers, be sure you have enough credentials cached in the ``augur_operations.worker_oath`` table for the platforms you use.
55

6+
Task Scheduling Configuration
7+
-----------------------------
8+
9+
The following keys in the ``[Tasks]`` section control the scheduling of periodic Celery tasks. You can adjust these intervals or disable tasks by setting the value to <= 0 (for interval-based tasks).
10+
11+
- ``collection_interval``: Interval in seconds for the main collection monitor. Default: 30.
12+
- ``core_collection_interval_days``: Interval in days for core collection. Default: 15.
13+
- ``secondary_collection_interval_days``: Interval in days for secondary collection. Default: 10.
14+
- ``facade_collection_interval_days``: Interval in days for facade collection. Default: 10.
15+
- ``ml_collection_interval_days``: Interval in days for ML collection. Default: 40.
16+
17+
**New Periodic Task Intervals:**
18+
19+
- ``non_repo_domain_tasks_interval_in_days``:
20+
- Description: Schedule interval for non-repository domain tasks (e.g., data analysis).
21+
- Default: 30 days.
22+
- Unit: Days.
23+
- Disable: Set to <= 0.
24+
25+
- ``retry_errored_repos_cron_hour``:
26+
- Description: Hour (0-23) to run the retry errored repos task.
27+
- Default: 0 (Midnight).
28+
- Unit: Hour of day.
29+
30+
- ``retry_errored_repos_cron_minute``:
31+
- Description: Minute (0-59) to run the retry errored repos task.
32+
- Default: 0.
33+
- Unit: Minute of hour.
34+
35+
- ``process_contributors_interval_in_seconds``:
36+
- Description: Interval to process contributors.
37+
- Default: 3600 (1 hour).
38+
- Unit: Seconds.
39+
- Disable: Set to <= 0.
40+
41+
- ``create_collection_status_records_interval_in_seconds``:
42+
- Description: Interval to create collection status records.
43+
- Default: 86400 (1 day).
44+
- Unit: Seconds.
45+
- Disable: Set to <= 0.
46+
647
If you have questions or would like to help please open an issue on GitHub_.
748

849
.. _GitHub: https://github.com/chaoss/augur/issues
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
2+
import pytest
3+
from unittest.mock import MagicMock, patch, ANY
4+
import datetime
5+
import sys
6+
7+
# Mock modules that might cause side effects on import or are hard to set up
8+
sys.modules['augur.tasks.init'] = MagicMock()
9+
sys.modules['augur.tasks.init'].get_redis_conn_values = MagicMock(return_value=(0, 'redis://localhost:6379/'))
10+
sys.modules['augur.tasks.init'].get_rabbitmq_conn_string = MagicMock(return_value='amqp://guest:guest@localhost:5672//')
11+
12+
# We need to ensure we can import the module now
13+
# The module augur.tasks.init.celery_app imports get_redis_conn_values etc from augur.tasks.init
14+
# which we just mocked.
15+
16+
from augur.tasks.init.celery_app import setup_periodic_tasks, split_tasks_into_groups
17+
18+
@pytest.fixture
19+
def mock_sender():
20+
sender = MagicMock()
21+
return sender
22+
23+
@pytest.fixture
24+
def mock_config_factory():
25+
def _create_config(settings):
26+
config_mock = MagicMock()
27+
28+
def get_value(section, key):
29+
val = settings.get(section, {}).get(key)
30+
return val
31+
32+
config_mock.get_value.side_effect = get_value
33+
return config_mock
34+
return _create_config
35+
36+
class TestCeleryScheduler:
37+
38+
@patch('augur.tasks.init.celery_app.AugurConfig')
39+
@patch('augur.tasks.init.celery_app.DatabaseSession')
40+
@patch('augur.tasks.init.celery_app.temporary_database_engine')
41+
@patch('augur.tasks.init.celery_app.non_repo_domain_tasks')
42+
@patch('augur.tasks.init.celery_app.retry_errored_repos')
43+
@patch('augur.tasks.init.celery_app.process_contributors')
44+
@patch('augur.tasks.init.celery_app.create_collection_status_records')
45+
@patch('augur.tasks.init.celery_app.refresh_materialized_views')
46+
@patch('augur.tasks.init.celery_app.augur_collection_monitor')
47+
def test_setup_periodic_tasks_scheduling(
48+
self,
49+
mock_monitor,
50+
mock_refresh_views,
51+
mock_create_status,
52+
mock_process_contributors,
53+
mock_retry_repos,
54+
mock_non_repo_tasks,
55+
mock_temp_engine,
56+
mock_db_session,
57+
mock_AugurConfig,
58+
mock_sender,
59+
mock_config_factory
60+
):
61+
# Setup config
62+
settings = {
63+
'Tasks': {
64+
'collection_interval': 30,
65+
'non_repo_domain_tasks_interval_in_days': 15,
66+
'retry_errored_repos_cron_hour': 2,
67+
'retry_errored_repos_cron_minute': 30,
68+
'process_contributors_interval_in_seconds': 1200,
69+
'create_collection_status_records_interval_in_seconds': 4000
70+
},
71+
'Celery': {
72+
'refresh_materialized_views_interval_in_days': 1
73+
}
74+
}
75+
mock_AugurConfig.return_value = mock_config_factory(settings)
76+
77+
# Run setup
78+
setup_periodic_tasks(mock_sender)
79+
80+
# Verify calls
81+
82+
# 1. non_repo_domain_tasks: 15 days
83+
# The code usually converts days to seconds or timedelta
84+
# 15 days = 15 * 24 * 60 * 60 = 1296000 seconds
85+
86+
# We need to inspect what arguments add_periodic_task was called with.
87+
# It's called multiple times.
88+
89+
# Look for non_repo_domain_tasks
90+
# We can check if it was called with approximately the right value or check if the task signature was passed
91+
92+
# Let's verify that the task was scheduled with the correct interval
93+
# Since we haven't implemented the change yet, we expect this to fail if we ran it, or pass if we check for the NEW behavior we want.
94+
# This test defines the expected behavior.
95+
96+
# Expected:
97+
# non_repo_domain_tasks.s() passed as second arg
98+
# 15 * 86400 or timedelta(days=15) passed as first arg
99+
100+
# Find call for non_repo_domain_tasks
101+
found_non_repo = False
102+
for call in mock_sender.add_periodic_task.call_args_list:
103+
args, _ = call
104+
interval = args[0]
105+
task_sig = args[1]
106+
107+
if task_sig == mock_non_repo_tasks.s():
108+
found_non_repo = True
109+
# Check interval.
110+
# Our implementation will likely use timedelta or seconds.
111+
# 15 days in seconds is 1296000
112+
if isinstance(interval, (int, float)):
113+
assert interval == 1296000
114+
elif isinstance(interval, datetime.timedelta):
115+
assert interval.days == 15
116+
117+
assert found_non_repo, "non_repo_domain_tasks was not scheduled"
118+
119+
@patch('augur.tasks.init.celery_app.AugurConfig')
120+
@patch('augur.tasks.init.celery_app.DatabaseSession')
121+
@patch('augur.tasks.init.celery_app.temporary_database_engine')
122+
@patch('augur.tasks.init.celery_app.process_contributors')
123+
def test_disabled_tasks(
124+
self,
125+
mock_process_contributors,
126+
mock_temp_engine,
127+
mock_db_session,
128+
mock_AugurConfig,
129+
mock_sender,
130+
mock_config_factory
131+
):
132+
settings = {
133+
'Tasks': {
134+
'collection_interval': 30,
135+
'process_contributors_interval_in_seconds': 0, # Disabled
136+
},
137+
'Celery': {
138+
'refresh_materialized_views_interval_in_days': 1
139+
}
140+
}
141+
mock_AugurConfig.return_value = mock_config_factory(settings)
142+
143+
setup_periodic_tasks(mock_sender)
144+
145+
# Verify process_contributors was NOT scheduled
146+
found_process = False
147+
for call in mock_sender.add_periodic_task.call_args_list:
148+
args, _ = call
149+
task_sig = args[1]
150+
if task_sig == mock_process_contributors.s():
151+
found_process = True
152+
153+
assert not found_process, "process_contributors should be disabled"

0 commit comments

Comments
 (0)