Skip to content

Commit 8b950fc

Browse files
committed
OpenConceptLab/ocl_issues#2042 Do not schedule all bulk import subtasks at once
1 parent 3090bed commit 8b950fc

File tree

3 files changed

+37
-23
lines changed

3 files changed

+37
-23
lines changed

core/common/tasks.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from json import JSONDecodeError
55

66
from billiard.exceptions import WorkerLostError
7+
from celery import chord
78
from celery.utils.log import get_task_logger
89
from dateutil.relativedelta import relativedelta
910
from django.apps import apps
@@ -287,6 +288,12 @@ def bulk_import_subtask(path, username, owner_type, owner, resource_type, files)
287288
return ImporterSubtask(path, username, owner_type, owner, resource_type, files).run()
288289

289290

291+
@app.task(retry_kwargs={'max_retries': 0}, compression='gzip')
292+
def bulk_import_queue(task_queue):
293+
tasks = task_queue.pop(0)
294+
return chord(tasks, bulk_import_queue.si(task_queue)).apply_async(queue='concurrent')
295+
296+
290297
@app.task(retry_kwargs={'max_retries': 0})
291298
def bulk_import_subtask_empty():
292299
"""Used if group has only one task to prevent celery from converting the group to a single task"""

core/importers/importer.py

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import zipfile
1111
from zipfile import ZipFile
1212
from celery.result import AsyncResult, result_from_tuple
13-
from celery import group, chain
13+
from celery import group
1414

1515
import ijson
1616
import requests
@@ -24,7 +24,7 @@
2424

2525
from core import settings
2626
from core.common.serializers import IdentifierSerializer
27-
from core.common.tasks import bulk_import_subtask, bulk_import_subtask_empty
27+
from core.common.tasks import bulk_import_subtask, bulk_import_subtask_empty, bulk_import_queue
2828
from core.common.tasks import import_finisher
2929
from core.code_systems.converter import CodeSystemConverter
3030
from core.common.utils import get_export_service
@@ -111,10 +111,11 @@ def import_async_result(self):
111111
return None
112112

113113
def revoke(self):
114-
import_group = self.import_async_result
115-
while import_group is not None:
116-
import_group.revoke() # Revokes all tasks in a group
117-
import_group = import_group.parent
114+
import_final_task = self.import_async_result
115+
import_final_task.revoke()
116+
for task_id in self.subtask_ids:
117+
child = AsyncResult(task_id)
118+
child.revoke()
118119

119120
@import_async_result.setter
120121
def import_async_result(self, import_async_result):
@@ -231,7 +232,6 @@ def run(self): # pylint: disable=too-many-locals
231232
time_started = timezone.now()
232233
resource_types = ['CodeSystem', 'ValueSet', 'ConceptMap']
233234
resource_types.extend(ResourceImporter.get_resource_types())
234-
235235
if not self.path.startswith('/'): # not local path
236236
key = self.path
237237
protocol_index = key.find('://')
@@ -429,14 +429,14 @@ def calculate_batch_size(self, resources):
429429
for _, count in item.items():
430430
all_count += count
431431
if all_count > 50000:
432-
task_batch_size = all_count / 1000
432+
task_batch_size = round(all_count / 1000)
433433
else:
434434
task_batch_size = self.MIN_BATCH_SIZE
435435
return task_batch_size
436436

437437
def schedule_tasks(self, tasks):
438438
subtask_ids = []
439-
chained_tasks = chain()
439+
group_queue = []
440440
for task in tasks:
441441
group_tasks = []
442442
for group_task in task:
@@ -446,14 +446,21 @@ def schedule_tasks(self, tasks):
446446
group_tasks.append(bulk_import_subtask.si(group_task['path'], group_task['username'],
447447
group_task['owner_type'], group_task['owner'],
448448
group_task['resource_type'], group_task['files'])
449-
.set(queue='concurrent', task_id=subtask_id))
449+
.set(task_id=subtask_id))
450450
if len(group_tasks) == 1: # Prevent celery from converting group to a single task
451-
group_tasks.append(bulk_import_subtask_empty.si().set(queue='concurrent'))
451+
group_tasks.append(bulk_import_subtask_empty.si())
452+
453+
group_queue.append(group(group_tasks))
454+
455+
final_task_id = uuid()
456+
group_queue.append(import_finisher.si(self.task_id).set(task_id=final_task_id))
452457

453-
chained_tasks |= group(group_tasks)
454-
chained_tasks |= import_finisher.si(self.task_id).set(queue='concurrent')
458+
# Celery cannot handle chain of groups that have hundreds of tasks thus we use a task that schedules
459+
# a group of tasks once the previous group is done.
460+
bulk_import_queue.si(group_queue).apply_async(queue='concurrent')
455461

456-
final_task = chained_tasks.apply_async(queue='concurrent')
462+
# We pass the final task id to be able to track the end of execution and track progress.
463+
final_task = AsyncResult(final_task_id)
457464
return final_task, subtask_ids
458465

459466
def is_importable_file(self, file_name):

core/importers/models.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def __init__(self, data, user, update_if_exists=False):
116116

117117
@classmethod
118118
def can_handle(cls, obj):
119-
return isinstance(obj, dict) and obj.get('type', '').lower() == cls.get_resource_type()
119+
return isinstance(obj, dict) and obj.get('type', '').lower() == cls.get_resource_type().lower()
120120

121121
@staticmethod
122122
def get_resource_type():
@@ -187,7 +187,7 @@ class OrganizationImporter(BaseResourceImporter):
187187

188188
@staticmethod
189189
def get_resource_type():
190-
return 'organization'
190+
return 'Organization'
191191

192192
def exists(self):
193193
return self.get_queryset().exists()
@@ -232,7 +232,7 @@ class SourceImporter(BaseResourceImporter):
232232

233233
@staticmethod
234234
def get_resource_type():
235-
return 'source'
235+
return 'Source'
236236

237237
def exists(self):
238238
return self.get_queryset().exists()
@@ -285,7 +285,7 @@ class SourceVersionImporter(BaseResourceImporter):
285285

286286
@staticmethod
287287
def get_resource_type():
288-
return 'source version'
288+
return 'Source Version'
289289

290290
def exists(self):
291291
return Source.objects.filter(
@@ -323,7 +323,7 @@ class CollectionImporter(BaseResourceImporter):
323323

324324
@staticmethod
325325
def get_resource_type():
326-
return 'collection'
326+
return 'Collection'
327327

328328
def exists(self):
329329
return self.get_queryset().exists()
@@ -376,7 +376,7 @@ class CollectionVersionImporter(BaseResourceImporter):
376376

377377
@staticmethod
378378
def get_resource_type():
379-
return 'collection version'
379+
return 'Collection Version'
380380

381381
def exists(self):
382382
return Collection.objects.filter(
@@ -412,7 +412,7 @@ class ConceptImporter(BaseResourceImporter):
412412

413413
@staticmethod
414414
def get_resource_type():
415-
return 'concept'
415+
return 'Concept'
416416

417417
def __init__(self, data, user, update_if_exists):
418418
super().__init__(data, user, update_if_exists)
@@ -506,7 +506,7 @@ class MappingImporter(BaseResourceImporter):
506506

507507
@staticmethod
508508
def get_resource_type():
509-
return 'mapping'
509+
return 'Mapping'
510510

511511
def __init__(self, data, user, update_if_exists):
512512
super().__init__(data, user, update_if_exists)
@@ -645,7 +645,7 @@ class ReferenceImporter(BaseResourceImporter):
645645

646646
@staticmethod
647647
def get_resource_type():
648-
return 'reference'
648+
return 'Reference'
649649

650650
def exists(self):
651651
return False

0 commit comments

Comments
 (0)