Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions kolibri/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,6 @@
TEMP_KOLIBRI_HOME = "./.pytest_kolibri_home"


@pytest.fixture(scope="session")
def django_db_setup(
request,
django_db_setup,
):
def dispose_sqlalchemy():
from kolibri.core.tasks.main import connection

connection.dispose()

request.addfinalizer(dispose_sqlalchemy)


@pytest.fixture(scope="session", autouse=True)
def global_fixture():
if not os.path.exists(TEMP_KOLIBRI_HOME):
Expand Down
7 changes: 5 additions & 2 deletions kolibri/core/auth/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from django.core.management import call_command
from django.core.management.base import CommandError
from django.utils import timezone
from morango.errors import MorangoError
from requests.exceptions import HTTPError
from rest_framework import serializers
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.exceptions import ValidationError
Expand Down Expand Up @@ -41,9 +43,9 @@
from kolibri.core.tasks.permissions import IsAdminForJob
from kolibri.core.tasks.permissions import IsSuperAdmin
from kolibri.core.tasks.permissions import NotProvisioned
from kolibri.core.tasks.utils import DatabaseLockedError
from kolibri.core.tasks.utils import get_current_job
from kolibri.core.tasks.validation import JobValidator
from kolibri.utils.time_utils import naive_utc_datetime
from kolibri.utils.translation import gettext as _


Expand Down Expand Up @@ -469,7 +471,7 @@ def enqueue_soud_sync_processing():

# Check if there is already an enqueued job
try:
converted_next_run = naive_utc_datetime(timezone.now() + next_run)
converted_next_run = timezone.now() + next_run
orm_job = job_storage.get_orm_job(SOUD_SYNC_PROCESSING_JOB_ID)
if (
orm_job.state not in (State.COMPLETED, State.FAILED, State.CANCELED)
Expand Down Expand Up @@ -607,6 +609,7 @@ def validate(self, data):
permission_classes=[IsSuperAdmin() | NotProvisioned()],
status_fn=status_fn,
long_running=True,
retry_on=[DatabaseLockedError, MorangoError, HTTPError],
)
def peeruserimport(command, **kwargs):
call_command(command, **kwargs)
Expand Down
19 changes: 6 additions & 13 deletions kolibri/core/auth/test/test_auth_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from kolibri.core.tasks.exceptions import JobRunning
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
from kolibri.utils.time_utils import naive_utc_datetime


DUMMY_PASSWORD = "password"
Expand All @@ -64,7 +63,7 @@ def fake_job(**kwargs):


class dummy_orm_job_data(object):
scheduled_time = datetime.datetime(year=2023, month=1, day=1, tzinfo=None)
scheduled_time = datetime.datetime(year=2023, month=1, day=1)
repeat = 5
interval = 8600
retry_interval = 5
Expand Down Expand Up @@ -701,7 +700,7 @@ def test_enqueue_soud_sync_processing__future__scheduled(
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=30)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.QUEUED
mock_job.scheduled_time = naive_utc_datetime(timezone.now())
mock_job.scheduled_time = timezone.now()
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_not_called()

Expand All @@ -714,7 +713,7 @@ def test_enqueue_soud_sync_processing__future__running(
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=1)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.RUNNING
mock_job.scheduled_time = naive_utc_datetime(timezone.now())
mock_job.scheduled_time = timezone.now()
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_not_called()

Expand All @@ -727,9 +726,7 @@ def test_enqueue_soud_sync_processing__future__reschedule(
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=10)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.QUEUED
mock_job.scheduled_time = naive_utc_datetime(
timezone.now() + datetime.timedelta(seconds=15)
)
mock_job.scheduled_time = timezone.now() + datetime.timedelta(seconds=15)
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))

Expand All @@ -743,9 +740,7 @@ def test_enqueue_soud_sync_processing__completed__enqueue(
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.COMPLETED
# far in the past
mock_job.scheduled_time = naive_utc_datetime(
timezone.now() - datetime.timedelta(seconds=100)
)
mock_job.scheduled_time = timezone.now() - datetime.timedelta(seconds=100)
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))

Expand All @@ -759,9 +754,7 @@ def test_enqueue_soud_sync_processing__race__already_running(
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.COMPLETED
# far in the past
mock_job.scheduled_time = naive_utc_datetime(
timezone.now() - datetime.timedelta(seconds=100)
)
mock_job.scheduled_time = timezone.now() - datetime.timedelta(seconds=100)
mock_task.enqueue_in.side_effect = JobRunning()
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))
Expand Down
8 changes: 4 additions & 4 deletions kolibri/core/tasks/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

from django.http.response import Http404
from django.utils.decorators import method_decorator
from django.utils.timezone import make_aware
from django.views.decorators.csrf import csrf_protect
from pytz import utc
from rest_framework import decorators
from rest_framework import serializers
from rest_framework import status
Expand Down Expand Up @@ -100,8 +98,7 @@ def _job_to_response(self, job):
"args": job.args,
"kwargs": job.kwargs,
"extra_metadata": job.extra_metadata,
# Output is UTC naive, coerce to UTC aware.
"scheduled_datetime": make_aware(orm_job.scheduled_time, utc).isoformat(),
"scheduled_datetime": orm_job.scheduled_time.isoformat(),
"repeat": orm_job.repeat,
"repeat_interval": orm_job.interval,
"retry_interval": orm_job.retry_interval,
Expand Down Expand Up @@ -162,6 +159,7 @@ def _enqueue_job_based_on_enqueue_args(self, registered_task, job, enqueue_args)
interval=enqueue_args.get("repeat_interval", 0),
repeat=enqueue_args.get("repeat", 0),
retry_interval=enqueue_args.get("retry_interval", None),
max_retries=enqueue_args.get("max_retries", None),
)
elif enqueue_args.get("enqueue_in"):
return job_storage.enqueue_in(
Expand All @@ -172,12 +170,14 @@ def _enqueue_job_based_on_enqueue_args(self, registered_task, job, enqueue_args)
interval=enqueue_args.get("repeat_interval", 0),
repeat=enqueue_args.get("repeat", 0),
retry_interval=enqueue_args.get("retry_interval", None),
max_retries=enqueue_args.get("max_retries", None),
)
return job_storage.enqueue_job(
job,
queue=registered_task.queue,
priority=enqueue_args.get("priority", registered_task.priority),
retry_interval=enqueue_args.get("retry_interval", None),
max_retries=enqueue_args.get("max_retries", None),
)

def create(self, request):
Expand Down
3 changes: 3 additions & 0 deletions kolibri/core/tasks/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def register_task(
permission_classes=None,
long_running=False,
status_fn=None,
retry_on=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job avoiding a classic Python gotcha! (passing mutable values as default arguments, such as [] is a very common mistake that can cause issues)

):
"""
Registers the decorated function as task.
Expand All @@ -36,6 +37,7 @@ def register_task(
permission_classes=permission_classes,
long_running=long_running,
status_fn=status_fn,
retry_on=retry_on,
)

return RegisteredTask(
Expand All @@ -49,4 +51,5 @@ def register_task(
permission_classes=permission_classes,
long_running=long_running,
status_fn=status_fn,
retry_on=retry_on,
)
9 changes: 7 additions & 2 deletions kolibri/core/tasks/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ def execute(self):

args, kwargs = copy.copy(self.args), copy.copy(self.kwargs)

exception = None

try:
# First check whether the job has been cancelled
self.check_for_cancel()
Expand All @@ -370,16 +372,19 @@ def execute(self):
except UserCancelledError:
self.storage.mark_job_as_canceled(self.job_id)
except Exception as e:
exception = e
# If any error occurs, mark the job as failed and save the exception
traceback_str = traceback.format_exc()
e.traceback = traceback_str
logger.error(
"Job {} raised an exception: {}".format(self.job_id, traceback_str)
)
self.storage.mark_job_as_failed(self.job_id, e, traceback_str)

self.storage.reschedule_finished_job_if_needed(
self.job_id, delay=self._retry_in_delay, **self._retry_in_kwargs
self.job_id,
delay=self._retry_in_delay,
exception=exception,
**self._retry_in_kwargs,
)
setattr(current_state_tracker, "job", None)

Expand Down
9 changes: 1 addition & 8 deletions kolibri/core/tasks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@
from django.utils.functional import SimpleLazyObject

from kolibri.core.tasks.storage import Storage
from kolibri.core.tasks.utils import db_connection
from kolibri.core.tasks.worker import Worker
from kolibri.utils import conf


logger = logging.getLogger(__name__)


connection = SimpleLazyObject(db_connection)


def __job_storage():
return Storage(
connection=connection,
)
return Storage()


# This storage instance should be used to access job_storage db.
Expand All @@ -28,7 +22,6 @@ def __job_storage():
def initialize_workers(log_queue=None):
logger.info("Starting async task workers.")
return Worker(
connection=connection,
regular_workers=conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"],
high_workers=conf.OPTIONS["Tasks"]["HIGH_PRIORITY_WORKERS"],
log_queue=log_queue,
Expand Down
56 changes: 56 additions & 0 deletions kolibri/core/tasks/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Initial migration for the jobs table. This model/index creation should be
# skipped if the table was previously created by SQLAlchemy.
from django.db import migrations
from django.db import models

from kolibri.core.tasks.operations import AddIndexIfNotExists
from kolibri.core.tasks.operations import CreateModelIfNotExists


class Migration(migrations.Migration):

initial = True

dependencies = []

operations = [
CreateModelIfNotExists(
name="Job",
fields=[
("id", models.CharField(max_length=36, primary_key=True)),
("state", models.CharField(max_length=20, db_index=True)),
("func", models.CharField(max_length=200, db_index=True)),
("priority", models.IntegerField(db_index=True)),
("queue", models.CharField(max_length=50, db_index=True)),
("saved_job", models.TextField()),
("time_created", models.DateTimeField(null=True, blank=True)),
("time_updated", models.DateTimeField(null=True, blank=True)),
("interval", models.IntegerField(default=0)),
("retry_interval", models.IntegerField(null=True, blank=True)),
("repeat", models.IntegerField(null=True, blank=True)),
("scheduled_time", models.DateTimeField(null=True, blank=True)),
(
"worker_host",
models.CharField(max_length=100, null=True, blank=True),
),
(
"worker_process",
models.CharField(max_length=50, null=True, blank=True),
),
(
"worker_thread",
models.CharField(max_length=50, null=True, blank=True),
),
("worker_extra", models.TextField(null=True, blank=True)),
],
options={
"db_table": "jobs",
},
),
AddIndexIfNotExists(
model_name="job",
index=models.Index(
fields=["queue", "scheduled_time"], name="queue__scheduled_time"
),
),
]
23 changes: 23 additions & 0 deletions kolibri/core/tasks/migrations/0002_add_retries_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 3.2.25 on 2025-10-15 21:14
from django.db import migrations
from django.db import models


class Migration(migrations.Migration):

dependencies = [
("kolibritasks", "0001_initial"),
]

operations = [
migrations.AddField(
model_name="job",
name="max_retries",
field=models.IntegerField(blank=True, null=True),
),
migrations.AddField(
model_name="job",
name="retries",
field=models.IntegerField(blank=True, null=True),
),
]
Empty file.
Loading
Loading