From c6c2452df9bcf61057b97763b7ca355cf517b237 Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Tue, 30 Sep 2025 15:23:52 +0200 Subject: [PATCH 1/6] feat: implement project locks containing multiple job ids MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace old format Map with new format Map> to support configurable concurrent jobs per project. One-time Redis Migration automatically runs on startup and performs one-time data conversion. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../ProjectBatchLockController.kt | 26 ++- .../io/tolgee/batch/BatchJobTestUtil.kt | 2 +- .../batch/BatchJobConcurrentLauncher.kt | 2 +- .../batch/BatchJobProjectLockingManager.kt | 153 ++++++++++++------ .../configuration/tolgee/BatchProperties.kt | 3 + 5 files changed, 122 insertions(+), 64 deletions(-) diff --git a/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/administration/ProjectBatchLockController.kt b/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/administration/ProjectBatchLockController.kt index c140adad38..4fde515706 100644 --- a/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/administration/ProjectBatchLockController.kt +++ b/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/administration/ProjectBatchLockController.kt @@ -47,14 +47,13 @@ class ProjectBatchLockController( logger.debug("Retrieving all project batch locks") val locks = batchJobProjectLockingManager.getMap() - val lockModels = locks.map { (projectId, lockedJobId) -> - val lockStatus = when (lockedJobId) { - null -> LockStatus.UNINITIALIZED - 0L -> LockStatus.UNLOCKED + val lockModels = locks.entries.map { (projectId, lockedJobIds) -> + val lockStatus = when { + lockedJobIds.isEmpty() -> LockStatus.UNLOCKED else -> LockStatus.LOCKED } - val jobInfo = if (lockedJobId != null && lockedJobId > 0L) { + val jobInfos = lockedJobIds.mapNotNull { lockedJobId -> val jobDto = batchJobService.findJobDto(lockedJobId) if (jobDto == null) { logger.warn("Locked job $lockedJobId in project $projectId not found") @@ -67,15 +66,13 @@ class ProjectBatchLockController( createdAt = jobDto.createdAt ) } - } else { - null } ProjectLockModel( projectId = projectId, - lockedJobId = lockedJobId, + lockedJobIds = lockedJobIds, lockStatus = lockStatus, - jobInfo = jobInfo + jobInfos = jobInfos ) } @@ -113,9 +110,9 @@ class ProjectBatchLockController( */ data class ProjectLockModel( val projectId: Long, - val lockedJobId: Long?, + val lockedJobIds: Set, val lockStatus: LockStatus, - val jobInfo: JobInfo? + val jobInfos: List ) /** @@ -132,13 +129,10 @@ data class JobInfo( * Status of the project lock */ enum class LockStatus { - /** Lock is explicitly cleared (value = 0L) */ + /** Lock is explicitly cleared (value = empty set) */ UNLOCKED, - /** Lock has never been initialized (value = null) */ - UNINITIALIZED, - - /** Lock is held by a specific job (value = jobId) */ + /** Lock is held by one or more jobs (value = set of job IDs) */ LOCKED } diff --git a/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt b/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt index 5228abee9f..7b43a9e90a 100644 --- a/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt +++ b/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt @@ -409,7 +409,7 @@ class BatchJobTestUtil( fun verifyProjectJobLockReleased() { waitFor(pollTime = 200, timeout = 1000) { - batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id) == 0L + batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id).isEmpty() } } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt index 39e55f6dff..78c8716b7d 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt @@ -170,7 +170,7 @@ class BatchJobConcurrentLauncher( } /** - * Only single job can run in project at the same time + * There is a project level lock with configurable n concurrent locks allowed. */ if (!batchJobProjectLockingManager.canLockJobForProject(executionItem.jobId)) { logger.debug( diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt index dc1be2b728..9aa7846a71 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt @@ -2,15 +2,18 @@ package io.tolgee.batch import io.tolgee.batch.data.BatchJobDto import io.tolgee.component.UsingRedisProvider +import io.tolgee.configuration.tolgee.BatchProperties import io.tolgee.util.Logging import io.tolgee.util.logger -import org.redisson.api.RMap import org.redisson.api.RedissonClient +import org.springframework.beans.factory.InitializingBean import org.springframework.context.annotation.Lazy import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap +private const val REDIS_PROJECT_BATCH_JOB_LOCKS_KEY = "project_batch_job_locks" + /** * Only single job can be executed at the same time for one project. * @@ -19,13 +22,14 @@ import java.util.concurrent.ConcurrentMap @Component class BatchJobProjectLockingManager( private val batchJobService: BatchJobService, + private val batchProperties: BatchProperties, @Lazy private val redissonClient: RedissonClient, private val usingRedisProvider: UsingRedisProvider, -) : Logging { +) : Logging, InitializingBean { companion object { private val localProjectLocks by lazy { - ConcurrentHashMap() + ConcurrentHashMap>() } } @@ -51,18 +55,20 @@ class BatchJobProjectLockingManager( jobId: Long, ) { projectId ?: return - getMap().compute(projectId) { _, lockedJobId -> + getMap().compute(projectId) { _, lockedJobIds -> logger.debug("Unlocking job: $jobId for project $projectId") - if (lockedJobId == jobId) { + val currentJobs = lockedJobIds ?: emptySet() + if (currentJobs.contains(jobId)) { logger.debug("Unlocking job: $jobId for project $projectId") - return@compute 0L + val updatedJobs = currentJobs - jobId + return@compute updatedJobs.ifEmpty { emptySet() } } logger.debug("Job: $jobId for project $projectId is not locked") - return@compute lockedJobId + return@compute currentJobs } } - fun getMap(): ConcurrentMap { + fun getMap(): ConcurrentMap> { if (usingRedisProvider.areWeUsingRedis) { return getRedissonProjectLocks() } @@ -71,65 +77,84 @@ class BatchJobProjectLockingManager( private fun tryLockWithRedisson(batchJobDto: BatchJobDto): Boolean { val projectId = batchJobDto.projectId ?: return true - val computed = - getRedissonProjectLocks().compute(projectId) { _, value -> - computeFnBody(batchJobDto, value) - } - return computed == batchJobDto.id + val computedJobIds = + getRedissonProjectLocks().compute(projectId) { _, lockedJobIds -> + val newLockedJobIds = computeFnBody(batchJobDto, lockedJobIds ?: emptySet()) + logger.debug( + "While trying to lock on redis {} for project {} new lock value is {}", + batchJobDto.id, + batchJobDto.projectId, + newLockedJobIds + ) + newLockedJobIds + } ?: emptySet() + return computedJobIds.contains(batchJobDto.id) } - fun getLockedForProject(projectId: Long): Long? { + fun getLockedForProject(projectId: Long): Set { if (usingRedisProvider.areWeUsingRedis) { - return getRedissonProjectLocks()[projectId] + return getRedissonProjectLocks()[projectId] ?: emptySet() } - return localProjectLocks[projectId] + return localProjectLocks[projectId] ?: emptySet() } - private fun tryLockLocal(toLock: BatchJobDto): Boolean { - val projectId = toLock.projectId ?: return true - val computed = - localProjectLocks.compute(projectId) { _, value -> - val newLocked = computeFnBody(toLock, value) - logger.debug("While trying to lock ${toLock.id} for project ${toLock.projectId} new lock value is $newLocked") - newLocked - } - return computed == toLock.id + private fun tryLockLocal(batchJobDto: BatchJobDto): Boolean { + val projectId = batchJobDto.projectId ?: return true + val computedJobIds = + localProjectLocks.compute(projectId) { _, lockedJobIds -> + val newLockedJobIds = computeFnBody(batchJobDto, lockedJobIds ?: emptySet()) + logger.debug( + "While trying to lock locally {} for project {} new lock value is {}", + batchJobDto.id, + batchJobDto.projectId, + newLockedJobIds + ) + newLockedJobIds + } ?: emptySet() + return computedJobIds.contains(batchJobDto.id) } private fun computeFnBody( toLock: BatchJobDto, - currentValue: Long?, - ): Long { + lockedJobIds: Set, + ): Set { val projectId = toLock.projectId ?: throw IllegalStateException( "Project id is required. " + "Locking for project should not happen for non-project jobs.", ) - // nothing is locked - if (currentValue == 0L) { - logger.debug("Locking job ${toLock.id} for project ${toLock.projectId}, nothing is locked") - return toLock.id - } - // value for the project is not initialized yet - if (currentValue == null) { + // nothing is locked + if (lockedJobIds.isEmpty()) { logger.debug("Getting initial locked state from DB state") // we have to find out from database if there is any running job for the project - val initial = getInitialJobId(projectId) - logger.debug("Initial locked job $initial for project ${toLock.projectId}") - if (initial == null) { - logger.debug("No job found, locking ${toLock.id}") - return toLock.id + val initialJobId = getInitialJobId(projectId) + logger.info("Initial locked job $initialJobId for project ${toLock.projectId}") + if (initialJobId == null) { + logger.debug("No initial job found, locking only ${toLock.id}") + return setOf(toLock.id) } + val newLockedJobIds = mutableSetOf(initialJobId) + if (newLockedJobIds.size < batchProperties.projectConcurrency) { + logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds") + newLockedJobIds.add(toLock.id) + } else { + logger.debug( + "Cannot lock job ${toLock.id} for project $projectId, limit reached. Active jobs: $newLockedJobIds" + ) + } + return newLockedJobIds + } - logger.debug("Job found, locking $initial") - return initial + // standard case - there are some jobs locked + if (lockedJobIds.size < batchProperties.projectConcurrency) { + logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $lockedJobIds") + return lockedJobIds + toLock.id } - logger.debug("Job $currentValue is locked for project ${toLock.projectId}") // if we cannot lock, we are returning current value - return currentValue + return lockedJobIds } private fun getInitialJobId(projectId: Long): Long? { @@ -161,11 +186,47 @@ class BatchJobProjectLockingManager( return null } - private fun getRedissonProjectLocks(): RMap { - return redissonClient.getMap("project_batch_job_locks") + private fun getRedissonProjectLocks(): ConcurrentMap> { + return redissonClient.getMap(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) + } + + override fun afterPropertiesSet() { + // This runs first to check if redis has a map of the old format. + // If so, we migrate it to the new format. + if (!usingRedisProvider.areWeUsingRedis) { + logger.debug("Not using Redis, skipping migration check") + return + } + + val redisProjectBatchJobLocks = redissonClient.getMap(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) + val isOldFormat = redisProjectBatchJobLocks.values.any { it is Long || it == null } + if (!isOldFormat) { + logger.debug("Redis project locks are in new format, no migration needed") + return + } + + logger.info("Starting migration of project locks from old format (v1) to new format (v2)") + // First, copy all data from Redis to local memory + val localCopy = mutableMapOf>() + redisProjectBatchJobLocks.forEach { (projectId, jobId) -> + val jobSet = when (jobId) { + null, 0L -> emptySet() + else -> setOf(jobId as Long) + } + localCopy[projectId] = jobSet + } + logger.info("Copied ${localCopy.size} project locks from old format to local memory") + + // Write all data back in new format (this will overwrite the old format) + val newMap = getRedissonProjectLocks() + localCopy.forEach { (projectId, jobSet) -> + newMap[projectId] = jobSet + } + + logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format") } fun getLockedJobIds(): Set { - return getMap().values.filterNotNull().toSet() + return getMap().values.flatten().toSet() } } diff --git a/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt b/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt index b98d8a76eb..2fdcf97080 100644 --- a/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt +++ b/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt @@ -11,4 +11,7 @@ class BatchProperties { @DocProperty(description = "How many job chunks are added to the internal queue on each scheduled run") var chunkQueuePopulationSize: Int = 1_000 + + @DocProperty(description = "How many parallel jobs can be run at once per project across all Tolgee instances") + var projectConcurrency: Int = 1 } From a30479acefa78c6abfb38e8f6d06589360e98b6d Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Wed, 15 Oct 2025 09:25:50 +0200 Subject: [PATCH 2/6] chore: PR feedback --- .../batch/BatchJobProjectLockingManager.kt | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt index 9aa7846a71..e25c6364c0 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt @@ -56,7 +56,6 @@ class BatchJobProjectLockingManager( ) { projectId ?: return getMap().compute(projectId) { _, lockedJobIds -> - logger.debug("Unlocking job: $jobId for project $projectId") val currentJobs = lockedJobIds ?: emptySet() if (currentJobs.contains(jobId)) { logger.debug("Unlocking job: $jobId for project $projectId") @@ -79,7 +78,7 @@ class BatchJobProjectLockingManager( val projectId = batchJobDto.projectId ?: return true val computedJobIds = getRedissonProjectLocks().compute(projectId) { _, lockedJobIds -> - val newLockedJobIds = computeFnBody(batchJobDto, lockedJobIds ?: emptySet()) + val newLockedJobIds = computeLockedJobIdsForProject(batchJobDto, lockedJobIds ?: emptySet()) logger.debug( "While trying to lock on redis {} for project {} new lock value is {}", batchJobDto.id, @@ -102,7 +101,7 @@ class BatchJobProjectLockingManager( val projectId = batchJobDto.projectId ?: return true val computedJobIds = localProjectLocks.compute(projectId) { _, lockedJobIds -> - val newLockedJobIds = computeFnBody(batchJobDto, lockedJobIds ?: emptySet()) + val newLockedJobIds = computeLockedJobIdsForProject(batchJobDto, lockedJobIds ?: emptySet()) logger.debug( "While trying to lock locally {} for project {} new lock value is {}", batchJobDto.id, @@ -114,7 +113,7 @@ class BatchJobProjectLockingManager( return computedJobIds.contains(batchJobDto.id) } - private fun computeFnBody( + private fun computeLockedJobIdsForProject( toLock: BatchJobDto, lockedJobIds: Set, ): Set { @@ -136,14 +135,8 @@ class BatchJobProjectLockingManager( return setOf(toLock.id) } val newLockedJobIds = mutableSetOf(initialJobId) - if (newLockedJobIds.size < batchProperties.projectConcurrency) { - logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds") - newLockedJobIds.add(toLock.id) - } else { - logger.debug( - "Cannot lock job ${toLock.id} for project $projectId, limit reached. Active jobs: $newLockedJobIds" - ) - } + logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds") + newLockedJobIds.add(toLock.id) return newLockedJobIds } From dadad28b2428d4577c9334eae38e2cfb9d482619 Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Wed, 15 Oct 2025 11:26:31 +0200 Subject: [PATCH 3/6] chore: Add tests for project concurrency > 1 --- ...JobManagementControllerCancellationTest.kt | 177 ++++++++++++++++++ .../batch/BatchJobManagementControllerTest.kt | 157 ++++++++++++++++ 2 files changed, 334 insertions(+) diff --git a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt index 054b8a1007..89b93c9896 100644 --- a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt @@ -1,5 +1,6 @@ package io.tolgee.api.v2.controllers.batch +import io.tolgee.batch.BatchJobProjectLockingManager import io.tolgee.fixtures.andIsForbidden import io.tolgee.fixtures.andIsOk import io.tolgee.fixtures.waitFor @@ -30,16 +31,25 @@ class BatchJobManagementControllerCancellationTest : @Autowired lateinit var stuckBatchJobTestUtil: StuckBatchJobTestUtil + @Autowired + lateinit var batchJobProjectLockingManager: BatchJobProjectLockingManager + + @Autowired + lateinit var batchProperties: io.tolgee.configuration.tolgee.BatchProperties + var simulateLongRunningChunkRun = true + var originalProjectConcurrency = 1 @BeforeEach fun setup() { simulateLongRunningChunkRun = true + originalProjectConcurrency = batchProperties.projectConcurrency } @AfterEach fun clean() { simulateLongRunningChunkRun = false + batchProperties.projectConcurrency = originalProjectConcurrency } @Test @@ -135,4 +145,171 @@ class BatchJobManagementControllerCancellationTest : util.getSingleJob().status.assert.isEqualTo(BatchJobStatus.CANCELLED) } + + @Test + @ProjectJWTAuthTestMethod + fun `cancels a job with projectConcurrency=2`() { + val keys = testData.addTranslationOperationData(100) + saveAndPrepare() + + batchProperties.projectConcurrency = 2 + + val keyIds = keys.map { it.id }.toList() + + val count = AtomicInteger(0) + + doAnswer { + if (count.incrementAndGet() > 5) { + while (simulateLongRunningChunkRun) { + // this simulates long-running operation, which checks for active context + val context = it.arguments[2] as CoroutineContext + context.ensureActive() + Thread.sleep(10) + } + } + it.callRealMethod() + }.whenever(machineTranslationChunkProcessor).process(any(), any(), any(), any()) + + performProjectAuthPost( + "start-batch-job/machine-translate", + mapOf( + "keyIds" to keyIds, + "targetLanguageIds" to + listOf( + testData.projectBuilder.getLanguageByTag("cs")!!.self.id, + ), + ), + ).andIsOk + + waitFor { + batchJobConcurrentLauncher.runningJobs.size >= 5 + } + + val job = util.getSingleJob() + + // Verify the job is locked + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.assert.contains(job.id) + + performProjectAuthPut("batch-jobs/${job.id}/cancel") + .andIsOk + + waitForNotThrowing(pollTime = 100) { + executeInNewTransaction { + util.getSingleJob().status.assert.isEqualTo(BatchJobStatus.CANCELLED) + verify(batchJobActivityFinalizer, times(1)).finalizeActivityWhenJobCompleted(any()) + + // assert activity stored + entityManager.createQuery("""from ActivityRevision ar where ar.batchJob.id = :id""") + .setParameter("id", job.id).resultList + .assert.hasSize(1) + } + } + + // Verify the job lock was released + val lockedJobsAfterCancel = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfterCancel.assert.doesNotContain(job.id) + } + + @Test + @ProjectJWTAuthTestMethod + fun `cancels one of two running jobs with projectConcurrency=2`() { + val keys = testData.addTranslationOperationData(100) + saveAndPrepare() + + batchProperties.projectConcurrency = 2 + + val keyIds = keys.map { it.id }.toList() + + val firstJobCount = AtomicInteger(0) + val secondJobCount = AtomicInteger(0) + + doAnswer { + val jobId = it.getArgument(0).id + val allJobs = executeInNewTransaction { + batchJobService.getAllByProjectId(testData.project.id) + } + val isFirstJob = allJobs.size > 0 && jobId == allJobs[0].id + + val counter = if (isFirstJob) firstJobCount else secondJobCount + + if (counter.incrementAndGet() > 5) { + while (simulateLongRunningChunkRun) { + // this simulates long-running operation, which checks for active context + val context = it.arguments[2] as CoroutineContext + context.ensureActive() + Thread.sleep(10) + } + } + it.callRealMethod() + }.whenever(machineTranslationChunkProcessor).process(any(), any(), any(), any()) + + // Start first job + performProjectAuthPost( + "start-batch-job/machine-translate", + mapOf( + "keyIds" to keyIds, + "targetLanguageIds" to + listOf( + testData.projectBuilder.getLanguageByTag("cs")!!.self.id, + ), + ), + ).andIsOk + + waitFor(timeout = 5000) { + batchJobConcurrentLauncher.runningJobs.size >= 5 + } + + // Start second job + performProjectAuthPost( + "start-batch-job/machine-translate", + mapOf( + "keyIds" to keyIds, + "targetLanguageIds" to + listOf( + testData.projectBuilder.getLanguageByTag("cs")!!.self.id, + ), + ), + ).andIsOk + + waitFor(timeout = 5000) { + batchJobConcurrentLauncher.runningJobs.size >= 10 + } + + val jobs = executeInNewTransaction { + batchJobService.getAllByProjectId(testData.project.id) + } + jobs.size.assert.isEqualTo(2) + + // Verify both jobs are locked + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.size.assert.isEqualTo(2) + lockedJobs.assert.contains(jobs[0].id) + lockedJobs.assert.contains(jobs[1].id) + + // Cancel the first job + val firstJob = jobs[0] + performProjectAuthPut("batch-jobs/${firstJob.id}/cancel") + .andIsOk + + waitForNotThrowing(pollTime = 100) { + executeInNewTransaction { + batchJobService.getJobDto(firstJob.id).status.assert.isEqualTo(BatchJobStatus.CANCELLED) + } + } + + // Verify the first job lock was released but second job is still locked + val lockedJobsAfterCancel = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfterCancel.assert.doesNotContain(firstJob.id) + lockedJobsAfterCancel.assert.contains(jobs[1].id) + + // Let the second job complete + simulateLongRunningChunkRun = false + + waitForNotThrowing(pollTime = 100, timeout = 10000) { + executeInNewTransaction { + batchJobService.getJobDto(jobs[1].id).status.assert.isEqualTo(BatchJobStatus.SUCCESS) + } + } + } } diff --git a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt index 501677e0c0..1f96843561 100644 --- a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt @@ -23,10 +23,19 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes @Autowired lateinit var throwingService: ThrowingService + @Autowired + lateinit var batchProperties: io.tolgee.configuration.tolgee.BatchProperties + + @Autowired + lateinit var batchJobProjectLockingManager: io.tolgee.batch.BatchJobProjectLockingManager + + var originalProjectConcurrency = 1 + @AfterEach fun after() { batchJobConcurrentLauncher.pause = false clearForcedDate() + batchProperties.projectConcurrency = originalProjectConcurrency } @Test @@ -258,6 +267,154 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes performProjectAuthGet("batch-jobs/${job.id}") .andIsForbidden } + + @Test + @ProjectJWTAuthTestMethod + fun `returns list of jobs with projectConcurrency=2`() { + saveAndPrepare() + batchProperties.projectConcurrency = 2 + + val jobIds = ConcurrentHashMap.newKeySet() + var wait = true + + try { + doAnswer { + val id = it.getArgument(0).id + if (jobIds.size == 2 && !jobIds.contains(id)) { + while (wait) { + Thread.sleep(100) + } + } else { + jobIds.add(id) + } + } + .whenever(preTranslationByTmChunkProcessor) + .process(any(), any(), any(), any()) + + val jobs = (1..3).map { util.runChunkedJob(50) } + + // With projectConcurrency=2, we should be able to run 2 jobs concurrently + waitForNotThrowing(pollTime = 1000, timeout = 10000) { + val dtos = jobs.map { batchJobService.getJobDto(it.id) } + dtos.forEach { + val state = batchJobStateProvider.getCached(it.id) + println( + "Job ${it.id} status ${it.status} progress: ${state?.values?.sumOf { it.successTargets.size }}", + ) + } + dtos.count { it.status == BatchJobStatus.SUCCESS }.assert.isEqualTo(2) + dtos.count { it.status == BatchJobStatus.RUNNING }.assert.isEqualTo(1) + } + + // Verify that 2 jobs are locked for the project + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.assert.hasSize(2) + + performProjectAuthGet("batch-jobs?sort=status&sort=id") + .andIsOk.andAssertThatJson { + node("_embedded.batchJobs") { + isArray.hasSize(3) + node("[0].status").isEqualTo("RUNNING") + node("[0].progress").isEqualTo(0) + node("[1].id").isValidId + node("[1].status").isEqualTo("SUCCESS") + node("[1].progress").isEqualTo(50) + } + } + wait = false + + waitForNotThrowing(pollTime = 1000, timeout = 10000) { + val dtos = jobs.map { batchJobService.getJobDto(it.id) } + dtos.count { it.status == BatchJobStatus.SUCCESS }.assert.isEqualTo(3) + } + + performProjectAuthGet("batch-jobs?sort=status&sort=id") + .andIsOk.andAssertThatJson { + node("_embedded.batchJobs") { + isArray.hasSize(3) + node("[0].status").isEqualTo("SUCCESS") + } + } + + // Verify all locks are released after completion + val lockedJobsAfter = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfter.assert.isEmpty() + } finally { + wait = false + } + } + + @Test + @ProjectJWTAuthTestMethod + fun `returns list of current jobs with projectConcurrency=2`() { + saveAndPrepare() + batchProperties.projectConcurrency = 2 + + var wait = true + doAnswer { + while (wait) { + Thread.sleep(100) + } + }.whenever(preTranslationByTmChunkProcessor).process(any(), any(), any(), any()) + + val adminsJobs = (1..3).map { util.runChunkedJob(50) } + val anotherUsersJobs = (1..3).map { util.runChunkedJob(50, testData.anotherUser) } + + try { + waitForNotThrowing { + performProjectAuthGet("current-batch-jobs") + .andIsOk.andPrettyPrint.andAssertThatJson { + node("_embedded.batchJobs") { + isArray.hasSize(6) + // With projectConcurrency=2, we should have 2 RUNNING jobs and 4 PENDING + node("[0].status").isEqualTo("RUNNING") + node("[1].status").isEqualTo("RUNNING") + node("[2].status").isEqualTo("PENDING") + node("[3].status").isEqualTo("PENDING") + } + } + } + + // Verify that 2 jobs are locked for the project (one per user is allowed) + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.assert.hasSize(2) + + wait = false + + waitForNotThrowing(pollTime = 1000, timeout = 10000) { + val dtos = (adminsJobs + anotherUsersJobs).map { batchJobService.getJobDto(it.id) } + dtos.count { it.status == BatchJobStatus.SUCCESS }.assert.isEqualTo(6) + } + + performProjectAuthGet("current-batch-jobs") + .andIsOk.andAssertThatJson { + node("_embedded.batchJobs") { + isArray.hasSize(6) + node("[0].status").isEqualTo("SUCCESS") + } + } + + userAccount = testData.anotherUser + + performProjectAuthGet("current-batch-jobs") + .andIsOk.andAssertThatJson { + node("_embedded.batchJobs").isArray.hasSize(3) + } + + setForcedDate(currentDateProvider.date.addMinutes(61)) + + performProjectAuthGet("current-batch-jobs") + .andIsOk.andAssertThatJson { + node("_embedded.batchJobs").isAbsent() + } + + // Verify all locks are released after completion + val lockedJobsAfter = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfter.assert.isEmpty() + } finally { + wait = false + } + } } @Service From 213107cf9420269212c62fd7f3826f3ce93e7ff4 Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Wed, 15 Oct 2025 11:27:31 +0200 Subject: [PATCH 4/6] chore: Use parameterized testing to limit the changes --- ...JobManagementControllerCancellationTest.kt | 141 ++++--------- .../batch/BatchJobManagementControllerTest.kt | 193 ++++-------------- 2 files changed, 81 insertions(+), 253 deletions(-) diff --git a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt index 89b93c9896..df4e730a82 100644 --- a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt @@ -15,6 +15,8 @@ import kotlinx.coroutines.ensureActive import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.mockito.kotlin.* import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest @@ -52,107 +54,14 @@ class BatchJobManagementControllerCancellationTest : batchProperties.projectConcurrency = originalProjectConcurrency } - @Test - @ProjectJWTAuthTestMethod - fun `cancels a job`() { - batchDumper.finallyDump { - val keys = testData.addTranslationOperationData(100) - saveAndPrepare() - - val keyIds = keys.map { it.id }.toList() - - val count = AtomicInteger(0) - - doAnswer { - if (count.incrementAndGet() > 5) { - while (simulateLongRunningChunkRun) { - // this simulates long-running operation, which checks for active context - val context = it.arguments[2] as CoroutineContext - context.ensureActive() - Thread.sleep(10) - } - } - it.callRealMethod() - }.whenever(machineTranslationChunkProcessor).process(any(), any(), any(), any()) - - performProjectAuthPost( - "start-batch-job/machine-translate", - mapOf( - "keyIds" to keyIds, - "targetLanguageIds" to - listOf( - testData.projectBuilder.getLanguageByTag("cs")!!.self.id, - ), - ), - ).andIsOk - - waitFor { - batchJobConcurrentLauncher.runningJobs.size >= 5 - } - - val job = util.getSingleJob() - performProjectAuthPut("batch-jobs/${job.id}/cancel") - .andIsOk - - waitForNotThrowing(pollTime = 100) { - executeInNewTransaction { - util.getSingleJob().status.assert.isEqualTo(BatchJobStatus.CANCELLED) - verify(batchJobActivityFinalizer, times(1)).finalizeActivityWhenJobCompleted(any()) - - // assert activity stored - entityManager.createQuery("""from ActivityRevision ar where ar.batchJob.id = :id""") - .setParameter("id", job.id).resultList - .assert.hasSize(1) - } - } - } - } - - @Test - @ProjectJWTAuthTestMethod - fun `cannot cancel other's job`() { - saveAndPrepare() - - batchJobConcurrentLauncher.pause = true - - val job = util.runChunkedJob(100) - - userAccount = testData.anotherUser - - performProjectAuthPut("batch-jobs/${job.id}/cancel") - .andIsForbidden - } - - @Test - @ProjectJWTAuthTestMethod - fun `stuck job can be cancelled`() { - saveAndPrepare() - batchJobConcurrentLauncher.pause = true - val job = - stuckBatchJobTestUtil.createBatchJobWithExecutionStatuses( - testData.project, - BatchJobStatus.RUNNING, - setOf(BatchJobChunkExecutionStatus.CANCELLED), - ) - - executeInNewTransaction { - val merged = entityManager.merge(job) - merged.author = testData.user - entityManager.persist(merged) - } - - performProjectAuthPut("batch-jobs/${job.id}/cancel").andIsOk - - util.getSingleJob().status.assert.isEqualTo(BatchJobStatus.CANCELLED) - } - - @Test + @ParameterizedTest + @ValueSource(ints = [1, 2]) @ProjectJWTAuthTestMethod - fun `cancels a job with projectConcurrency=2`() { + fun `cancels a job`(projectConcurrency: Int) { val keys = testData.addTranslationOperationData(100) saveAndPrepare() - batchProperties.projectConcurrency = 2 + batchProperties.projectConcurrency = projectConcurrency val keyIds = keys.map { it.id }.toList() @@ -211,6 +120,44 @@ class BatchJobManagementControllerCancellationTest : lockedJobsAfterCancel.assert.doesNotContain(job.id) } + @Test + @ProjectJWTAuthTestMethod + fun `cannot cancel other's job`() { + saveAndPrepare() + + batchJobConcurrentLauncher.pause = true + + val job = util.runChunkedJob(100) + + userAccount = testData.anotherUser + + performProjectAuthPut("batch-jobs/${job.id}/cancel") + .andIsForbidden + } + + @Test + @ProjectJWTAuthTestMethod + fun `stuck job can be cancelled`() { + saveAndPrepare() + batchJobConcurrentLauncher.pause = true + val job = + stuckBatchJobTestUtil.createBatchJobWithExecutionStatuses( + testData.project, + BatchJobStatus.RUNNING, + setOf(BatchJobChunkExecutionStatus.CANCELLED), + ) + + executeInNewTransaction { + val merged = entityManager.merge(job) + merged.author = testData.user + entityManager.persist(merged) + } + + performProjectAuthPut("batch-jobs/${job.id}/cancel").andIsOk + + util.getSingleJob().status.assert.isEqualTo(BatchJobStatus.CANCELLED) + } + @Test @ProjectJWTAuthTestMethod fun `cancels one of two running jobs with projectConcurrency=2`() { diff --git a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt index 1f96843561..148cb78895 100644 --- a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt @@ -10,6 +10,8 @@ import io.tolgee.util.Logging import io.tolgee.util.addMinutes import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.mockito.kotlin.any import org.mockito.kotlin.doAnswer import org.mockito.kotlin.whenever @@ -79,10 +81,12 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes } } - @Test + @ParameterizedTest + @ValueSource(ints = [1, 2]) @ProjectJWTAuthTestMethod - fun `returns list of jobs`() { + fun `returns list of jobs`(projectConcurrency: Int) { saveAndPrepare() + batchProperties.projectConcurrency = projectConcurrency val jobIds = ConcurrentHashMap.newKeySet() var wait = true @@ -90,7 +94,7 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes try { doAnswer { val id = it.getArgument(0).id - if (jobIds.size == 2 && !jobIds.contains(id)) { + if (jobIds.size == projectConcurrency && !jobIds.contains(id)) { while (wait) { Thread.sleep(100) } @@ -103,6 +107,7 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes val jobs = (1..3).map { util.runChunkedJob(50) } + // With projectConcurrency=N, we should be able to run N jobs concurrently waitForNotThrowing(pollTime = 1000, timeout = 10000) { val dtos = jobs.map { batchJobService.getJobDto(it.id) } dtos.forEach { @@ -111,10 +116,14 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes "Job ${it.id} status ${it.status} progress: ${state?.values?.sumOf { it.successTargets.size }}", ) } - dtos.count { it.status == BatchJobStatus.SUCCESS }.assert.isEqualTo(2) + dtos.count { it.status == BatchJobStatus.SUCCESS }.assert.isEqualTo(projectConcurrency) dtos.count { it.status == BatchJobStatus.RUNNING }.assert.isEqualTo(1) } + // Verify that N jobs are locked for the project + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.size.assert.isEqualTo(projectConcurrency) + performProjectAuthGet("batch-jobs?sort=status&sort=id") .andIsOk.andAssertThatJson { node("_embedded.batchJobs") { @@ -140,6 +149,10 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes node("[0].status").isEqualTo("SUCCESS") } } + + // Verify all locks are released after completion + val lockedJobsAfter = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfter.assert.isEmpty() } finally { wait = false } @@ -173,10 +186,12 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes } } - @Test + @ParameterizedTest + @ValueSource(ints = [1, 2]) @ProjectJWTAuthTestMethod - fun `returns list of current jobs`() { + fun `returns list of current jobs`(projectConcurrency: Int) { saveAndPrepare() + batchProperties.projectConcurrency = projectConcurrency var wait = true doAnswer { @@ -194,13 +209,23 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes .andIsOk.andPrettyPrint.andAssertThatJson { node("_embedded.batchJobs") { isArray.hasSize(6) + // With projectConcurrency=N, we should have N RUNNING jobs and (6-N) PENDING node("[0].status").isEqualTo("RUNNING") - node("[1].status").isEqualTo("PENDING") - node("[2].status").isEqualTo("PENDING") + if (projectConcurrency == 2) { + node("[1].status").isEqualTo("RUNNING") + node("[2].status").isEqualTo("PENDING") + } else { + node("[1].status").isEqualTo("PENDING") + node("[2].status").isEqualTo("PENDING") + } } } } + // Verify that N jobs are locked for the project + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.size.assert.isEqualTo(projectConcurrency) + wait = false waitForNotThrowing(pollTime = 1000, timeout = 10000) { @@ -229,6 +254,10 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes .andIsOk.andAssertThatJson { node("_embedded.batchJobs").isAbsent() } + + // Verify all locks are released after completion + val lockedJobsAfter = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfter.assert.isEmpty() } finally { wait = false } @@ -267,154 +296,6 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes performProjectAuthGet("batch-jobs/${job.id}") .andIsForbidden } - - @Test - @ProjectJWTAuthTestMethod - fun `returns list of jobs with projectConcurrency=2`() { - saveAndPrepare() - batchProperties.projectConcurrency = 2 - - val jobIds = ConcurrentHashMap.newKeySet() - var wait = true - - try { - doAnswer { - val id = it.getArgument(0).id - if (jobIds.size == 2 && !jobIds.contains(id)) { - while (wait) { - Thread.sleep(100) - } - } else { - jobIds.add(id) - } - } - .whenever(preTranslationByTmChunkProcessor) - .process(any(), any(), any(), any()) - - val jobs = (1..3).map { util.runChunkedJob(50) } - - // With projectConcurrency=2, we should be able to run 2 jobs concurrently - waitForNotThrowing(pollTime = 1000, timeout = 10000) { - val dtos = jobs.map { batchJobService.getJobDto(it.id) } - dtos.forEach { - val state = batchJobStateProvider.getCached(it.id) - println( - "Job ${it.id} status ${it.status} progress: ${state?.values?.sumOf { it.successTargets.size }}", - ) - } - dtos.count { it.status == BatchJobStatus.SUCCESS }.assert.isEqualTo(2) - dtos.count { it.status == BatchJobStatus.RUNNING }.assert.isEqualTo(1) - } - - // Verify that 2 jobs are locked for the project - val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) - lockedJobs.assert.hasSize(2) - - performProjectAuthGet("batch-jobs?sort=status&sort=id") - .andIsOk.andAssertThatJson { - node("_embedded.batchJobs") { - isArray.hasSize(3) - node("[0].status").isEqualTo("RUNNING") - node("[0].progress").isEqualTo(0) - node("[1].id").isValidId - node("[1].status").isEqualTo("SUCCESS") - node("[1].progress").isEqualTo(50) - } - } - wait = false - - waitForNotThrowing(pollTime = 1000, timeout = 10000) { - val dtos = jobs.map { batchJobService.getJobDto(it.id) } - dtos.count { it.status == BatchJobStatus.SUCCESS }.assert.isEqualTo(3) - } - - performProjectAuthGet("batch-jobs?sort=status&sort=id") - .andIsOk.andAssertThatJson { - node("_embedded.batchJobs") { - isArray.hasSize(3) - node("[0].status").isEqualTo("SUCCESS") - } - } - - // Verify all locks are released after completion - val lockedJobsAfter = batchJobProjectLockingManager.getLockedForProject(testData.project.id) - lockedJobsAfter.assert.isEmpty() - } finally { - wait = false - } - } - - @Test - @ProjectJWTAuthTestMethod - fun `returns list of current jobs with projectConcurrency=2`() { - saveAndPrepare() - batchProperties.projectConcurrency = 2 - - var wait = true - doAnswer { - while (wait) { - Thread.sleep(100) - } - }.whenever(preTranslationByTmChunkProcessor).process(any(), any(), any(), any()) - - val adminsJobs = (1..3).map { util.runChunkedJob(50) } - val anotherUsersJobs = (1..3).map { util.runChunkedJob(50, testData.anotherUser) } - - try { - waitForNotThrowing { - performProjectAuthGet("current-batch-jobs") - .andIsOk.andPrettyPrint.andAssertThatJson { - node("_embedded.batchJobs") { - isArray.hasSize(6) - // With projectConcurrency=2, we should have 2 RUNNING jobs and 4 PENDING - node("[0].status").isEqualTo("RUNNING") - node("[1].status").isEqualTo("RUNNING") - node("[2].status").isEqualTo("PENDING") - node("[3].status").isEqualTo("PENDING") - } - } - } - - // Verify that 2 jobs are locked for the project (one per user is allowed) - val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) - lockedJobs.assert.hasSize(2) - - wait = false - - waitForNotThrowing(pollTime = 1000, timeout = 10000) { - val dtos = (adminsJobs + anotherUsersJobs).map { batchJobService.getJobDto(it.id) } - dtos.count { it.status == BatchJobStatus.SUCCESS }.assert.isEqualTo(6) - } - - performProjectAuthGet("current-batch-jobs") - .andIsOk.andAssertThatJson { - node("_embedded.batchJobs") { - isArray.hasSize(6) - node("[0].status").isEqualTo("SUCCESS") - } - } - - userAccount = testData.anotherUser - - performProjectAuthGet("current-batch-jobs") - .andIsOk.andAssertThatJson { - node("_embedded.batchJobs").isArray.hasSize(3) - } - - setForcedDate(currentDateProvider.date.addMinutes(61)) - - performProjectAuthGet("current-batch-jobs") - .andIsOk.andAssertThatJson { - node("_embedded.batchJobs").isAbsent() - } - - // Verify all locks are released after completion - val lockedJobsAfter = batchJobProjectLockingManager.getLockedForProject(testData.project.id) - lockedJobsAfter.assert.isEmpty() - } finally { - wait = false - } - } } @Service From 05908af6624595cd80f44c7c90c4324b6cfd39ba Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Wed, 15 Oct 2025 17:03:54 +0200 Subject: [PATCH 5/6] chore: PR feedback from coderabbit --- .../batch/BatchJobManagementControllerTest.kt | 6 ++++++ .../io/tolgee/batch/BatchJobProjectLockingManager.kt | 10 ++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt index 148cb78895..aba4508966 100644 --- a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerTest.kt @@ -9,6 +9,7 @@ import io.tolgee.testing.assert import io.tolgee.util.Logging import io.tolgee.util.addMinutes import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -33,6 +34,11 @@ class BatchJobManagementControllerTest : AbstractBatchJobManagementControllerTes var originalProjectConcurrency = 1 + @BeforeEach + fun setup() { + originalProjectConcurrency = batchProperties.projectConcurrency + } + @AfterEach fun after() { batchJobConcurrentLauncher.pause = false diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt index e25c6364c0..6993e37618 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt @@ -135,8 +135,14 @@ class BatchJobProjectLockingManager( return setOf(toLock.id) } val newLockedJobIds = mutableSetOf(initialJobId) - logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds") - newLockedJobIds.add(toLock.id) + if (newLockedJobIds.size < batchProperties.projectConcurrency) { + logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds") + newLockedJobIds.add(toLock.id) + } else { + logger.debug( + "Cannot lock job ${toLock.id} for project $projectId, limit reached. Active jobs: $newLockedJobIds" + ) + } return newLockedJobIds } From 20ee69f32c327af267b10d0bc1fbd9fdccba64ae Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Sat, 18 Oct 2025 18:39:15 +0200 Subject: [PATCH 6/6] chore: readd batch dumper to test --- ...JobManagementControllerCancellationTest.kt | 96 ++++++++++--------- .../test/kotlin/io/tolgee/util/BatchDumper.kt | 12 +++ 2 files changed, 61 insertions(+), 47 deletions(-) diff --git a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt index df4e730a82..d49775bc2c 100644 --- a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/batch/BatchJobManagementControllerCancellationTest.kt @@ -58,66 +58,68 @@ class BatchJobManagementControllerCancellationTest : @ValueSource(ints = [1, 2]) @ProjectJWTAuthTestMethod fun `cancels a job`(projectConcurrency: Int) { - val keys = testData.addTranslationOperationData(100) - saveAndPrepare() - batchProperties.projectConcurrency = projectConcurrency - val keyIds = keys.map { it.id }.toList() + batchDumper.finallyDumpAll { + val keys = testData.addTranslationOperationData(100) + saveAndPrepare() - val count = AtomicInteger(0) + val keyIds = keys.map { it.id }.toList() - doAnswer { - if (count.incrementAndGet() > 5) { - while (simulateLongRunningChunkRun) { - // this simulates long-running operation, which checks for active context - val context = it.arguments[2] as CoroutineContext - context.ensureActive() - Thread.sleep(10) + val count = AtomicInteger(0) + + doAnswer { + if (count.incrementAndGet() > 5) { + while (simulateLongRunningChunkRun) { + // this simulates long-running operation, which checks for active context + val context = it.arguments[2] as CoroutineContext + context.ensureActive() + Thread.sleep(10) + } } + it.callRealMethod() + }.whenever(machineTranslationChunkProcessor).process(any(), any(), any(), any()) + + performProjectAuthPost( + "start-batch-job/machine-translate", + mapOf( + "keyIds" to keyIds, + "targetLanguageIds" to + listOf( + testData.projectBuilder.getLanguageByTag("cs")!!.self.id, + ), + ), + ).andIsOk + + waitFor { + batchJobConcurrentLauncher.runningJobs.size >= 5 } - it.callRealMethod() - }.whenever(machineTranslationChunkProcessor).process(any(), any(), any(), any()) - - performProjectAuthPost( - "start-batch-job/machine-translate", - mapOf( - "keyIds" to keyIds, - "targetLanguageIds" to - listOf( - testData.projectBuilder.getLanguageByTag("cs")!!.self.id, - ), - ), - ).andIsOk - - waitFor { - batchJobConcurrentLauncher.runningJobs.size >= 5 - } - val job = util.getSingleJob() + val job = util.getSingleJob() - // Verify the job is locked - val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) - lockedJobs.assert.contains(job.id) + // Verify the job is locked + val lockedJobs = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobs.assert.contains(job.id) - performProjectAuthPut("batch-jobs/${job.id}/cancel") - .andIsOk + performProjectAuthPut("batch-jobs/${job.id}/cancel") + .andIsOk - waitForNotThrowing(pollTime = 100) { - executeInNewTransaction { - util.getSingleJob().status.assert.isEqualTo(BatchJobStatus.CANCELLED) - verify(batchJobActivityFinalizer, times(1)).finalizeActivityWhenJobCompleted(any()) + waitForNotThrowing(pollTime = 100) { + executeInNewTransaction { + util.getSingleJob().status.assert.isEqualTo(BatchJobStatus.CANCELLED) + verify(batchJobActivityFinalizer, times(1)).finalizeActivityWhenJobCompleted(any()) - // assert activity stored - entityManager.createQuery("""from ActivityRevision ar where ar.batchJob.id = :id""") - .setParameter("id", job.id).resultList - .assert.hasSize(1) + // assert activity stored + entityManager.createQuery("""from ActivityRevision ar where ar.batchJob.id = :id""") + .setParameter("id", job.id).resultList + .assert.hasSize(1) + } } - } - // Verify the job lock was released - val lockedJobsAfterCancel = batchJobProjectLockingManager.getLockedForProject(testData.project.id) - lockedJobsAfterCancel.assert.doesNotContain(job.id) + // Verify the job lock was released + val lockedJobsAfterCancel = batchJobProjectLockingManager.getLockedForProject(testData.project.id) + lockedJobsAfterCancel.assert.doesNotContain(job.id) + } } @Test diff --git a/backend/app/src/test/kotlin/io/tolgee/util/BatchDumper.kt b/backend/app/src/test/kotlin/io/tolgee/util/BatchDumper.kt index c6563cb75e..95081e35b4 100644 --- a/backend/app/src/test/kotlin/io/tolgee/util/BatchDumper.kt +++ b/backend/app/src/test/kotlin/io/tolgee/util/BatchDumper.kt @@ -78,8 +78,20 @@ class BatchDumper( } } + fun finallyDumpAll(fn: () -> T): T { + return try { + fn() + } finally { + getAllJobs().forEach { + this.dump(it.id) + } + } + } + fun getSingleJob(): BatchJob = entityManager.createQuery("""from BatchJob""", BatchJob::class.java).singleResult + fun getAllJobs(): List = entityManager.createQuery("""from BatchJob""", BatchJob::class.java).resultList + private fun dumpQueuedItems( jobId: Long, stringBuilder: StringBuilder,