2323import org .apache .amoro .ServerTableIdentifier ;
2424import org .apache .amoro .api .BlockableOperation ;
2525import org .apache .amoro .api .OptimizingTaskId ;
26+ import org .apache .amoro .api .OptimizingTaskResult ;
2627import org .apache .amoro .exception .OptimizingClosedException ;
2728import org .apache .amoro .exception .PersistenceException ;
29+ import org .apache .amoro .exception .TaskNotFoundException ;
2830import org .apache .amoro .optimizing .MetricsSummary ;
2931import org .apache .amoro .optimizing .OptimizingType ;
3032import org .apache .amoro .optimizing .RewriteFilesInput ;
4446import org .apache .amoro .server .persistence .mapper .TableProcessMapper ;
4547import org .apache .amoro .server .process .TableProcessMeta ;
4648import org .apache .amoro .server .resource .OptimizerInstance ;
49+ import org .apache .amoro .server .resource .OptimizerThread ;
4750import org .apache .amoro .server .resource .QuotaProvider ;
4851import org .apache .amoro .server .table .DefaultTableRuntime ;
4952import org .apache .amoro .server .table .blocker .TableBlocker ;
@@ -90,7 +93,6 @@ public class OptimizingQueue extends PersistentBase {
9093
9194 private final QuotaProvider quotaProvider ;
9295 private final Queue <TableOptimizingProcess > tableQueue = new LinkedTransferQueue <>();
93- private final Queue <TaskRuntime <?>> retryTaskQueue = new LinkedTransferQueue <>();
9496 private final SchedulingPolicy scheduler ;
9597 private final CatalogManager catalogManager ;
9698 private final Executor planExecutor ;
@@ -200,24 +202,23 @@ public void releaseTable(DefaultTableRuntime tableRuntime) {
200202
201203 private void clearProcess (OptimizingProcess optimizingProcess ) {
202204 tableQueue .removeIf (process -> process .getProcessId () == optimizingProcess .getProcessId ());
203- retryTaskQueue .removeIf (
204- taskRuntime -> taskRuntime .getTaskId ().getProcessId () == optimizingProcess .getProcessId ());
205205 }
206206
207- public TaskRuntime <?> pollTask (long maxWaitTime , boolean breakQuotaLimit ) {
207+ public TaskRuntime <?> pollTask (
208+ OptimizerThread thread , long maxWaitTime , boolean breakQuotaLimit ) {
208209 long deadline = calculateDeadline (maxWaitTime );
209- TaskRuntime <?> task = fetchTask ( );
210+ TaskRuntime <?> task = fetchScheduledTask ( thread , true );
210211 while (task == null && waitTask (deadline )) {
211- task = fetchTask ( );
212+ task = fetchScheduledTask ( thread , true );
212213 }
213214 if (task == null && breakQuotaLimit && planningTables .isEmpty ()) {
214- task = fetchScheduledTask (false );
215+ task = fetchScheduledTask (thread , false );
215216 }
216217 return task ;
217218 }
218219
219- public TaskRuntime <?> pollTask (long maxWaitTime ) {
220- return pollTask (maxWaitTime , false );
220+ public TaskRuntime <?> pollTask (OptimizerThread thread , long maxWaitTime ) {
221+ return pollTask (thread , maxWaitTime , true );
221222 }
222223
223224 private long calculateDeadline (long maxWaitTime ) {
@@ -240,14 +241,9 @@ private boolean waitTask(long waitDeadline) {
240241 }
241242 }
242243
243- private TaskRuntime <?> fetchTask () {
244- TaskRuntime <?> task = retryTaskQueue .poll ();
245- return task != null ? task : fetchScheduledTask (true );
246- }
247-
248- private TaskRuntime <?> fetchScheduledTask (boolean needQuotaChecking ) {
244+ private TaskRuntime <?> fetchScheduledTask (OptimizerThread thread , boolean needQuotaChecking ) {
249245 return tableQueue .stream ()
250- .map (process -> process .poll (needQuotaChecking ))
246+ .map (process -> process .poll (thread , needQuotaChecking ))
251247 .filter (Objects ::nonNull )
252248 .findFirst ()
253249 .orElse (null );
@@ -352,12 +348,12 @@ private TableOptimizingProcess planInternal(DefaultTableRuntime tableRuntime) {
352348 }
353349 }
354350
355- public TaskRuntime <?> getTask (OptimizingTaskId taskId ) {
356- return tableQueue . stream ()
357- . filter ( p -> p . getProcessId () == taskId . getProcessId ())
358- . findFirst ()
359- . map ( p -> p . getTaskMap (). get ( taskId ))
360- . orElse ( null );
351+ public void ackTask (OptimizingTaskId taskId , OptimizerThread thread ) {
352+ findProcess ( taskId ). ackTask ( taskId , thread );
353+ }
354+
355+ public void completeTask ( OptimizerThread thread , OptimizingTaskResult result ) {
356+ findProcess ( result . getTaskId ()). completeTask ( thread , result );
361357 }
362358
363359 public List <TaskRuntime <?>> collectTasks () {
@@ -374,8 +370,7 @@ public List<TaskRuntime<?>> collectTasks(Predicate<TaskRuntime<?>> predicate) {
374370 }
375371
376372 public void retryTask (TaskRuntime <?> taskRuntime ) {
377- taskRuntime .reset ();
378- retryTaskQueue .offer (taskRuntime );
373+ findProcess (taskRuntime .getTaskId ()).resetTask ((TaskRuntime <RewriteStageTask >) taskRuntime );
379374 }
380375
381376 public ResourceGroup getOptimizerGroup () {
@@ -402,6 +397,13 @@ public void dispose() {
402397 this .metrics .unregister ();
403398 }
404399
400+ private TableOptimizingProcess findProcess (OptimizingTaskId taskId ) {
401+ return tableQueue .stream ()
402+ .filter (p -> p .getProcessId () == taskId .getProcessId ())
403+ .findFirst ()
404+ .orElseThrow (() -> new TaskNotFoundException (taskId ));
405+ }
406+
405407 private double getAvailableCore () {
406408 // the available core should be at least 1
407409 return Math .max (quotaProvider .getTotalQuota (optimizerGroup .getName ()), 1 );
@@ -437,26 +439,32 @@ private class TableOptimizingProcess implements OptimizingProcess {
437439 private Map <String , Long > toSequence = Maps .newHashMap ();
438440 private boolean hasCommitted = false ;
439441
440- public TaskRuntime <?> poll (boolean needQuotaChecking ) {
441- if (lock .tryLock ()) {
442- try {
443- TaskRuntime <?> task = null ;
444- if (status != ProcessStatus .KILLED && status != ProcessStatus .FAILED ) {
445- int actualQuota = getActualQuota ();
446- int quotaLimit = getQuotaLimit ();
447- if (!needQuotaChecking || actualQuota < quotaLimit ) {
448- task = taskQueue .poll ();
442+ public TaskRuntime <?> poll (OptimizerThread thread , boolean needQuotaChecking ) {
443+ try {
444+ // Wait 10ms here for some light operation like poll/ack
445+ if (lock .tryLock (10 , TimeUnit .MILLISECONDS )) {
446+ try {
447+ TaskRuntime <?> task = null ;
448+ if (status != ProcessStatus .KILLED && status != ProcessStatus .FAILED ) {
449+ int actualQuota = getActualQuota ();
450+ int quotaLimit = getQuotaLimit ();
451+ if (!needQuotaChecking || actualQuota < quotaLimit ) {
452+ task = taskQueue .poll ();
453+ }
449454 }
455+ if (task != null ) {
456+ optimizingTasksMap
457+ .computeIfAbsent (tableRuntime .getTableIdentifier (), k -> new AtomicInteger (0 ))
458+ .incrementAndGet ();
459+ task .schedule (thread );
460+ }
461+ return task ;
462+ } finally {
463+ lock .unlock ();
450464 }
451- if (task != null ) {
452- optimizingTasksMap
453- .computeIfAbsent (tableRuntime .getTableIdentifier (), k -> new AtomicInteger (0 ))
454- .incrementAndGet ();
455- }
456- return task ;
457- } finally {
458- lock .unlock ();
459465 }
466+ } catch (InterruptedException e ) {
467+ // ignore it.
460468 }
461469 return null ;
462470 }
@@ -543,6 +551,34 @@ public void close(boolean needCommit) {
543551 }
544552 }
545553
554+ private void ackTask (OptimizingTaskId taskId , OptimizerThread thread ) {
555+ TaskRuntime <?> taskRuntime = getTaskRuntime (taskId );
556+ lock .lock ();
557+ try {
558+ taskRuntime .ack (thread );
559+ } finally {
560+ lock .unlock ();
561+ }
562+ }
563+
564+ private void completeTask (OptimizerThread thread , OptimizingTaskResult result ) {
565+ TaskRuntime <?> taskRuntime = getTaskRuntime (result .getTaskId ());
566+ lock .lock ();
567+ try {
568+ taskRuntime .complete (thread , result );
569+ } finally {
570+ lock .unlock ();
571+ }
572+ }
573+
574+ private TaskRuntime <?> getTaskRuntime (OptimizingTaskId taskId ) {
575+ TaskRuntime <?> taskRuntime = getTaskMap ().get (taskId );
576+ if (taskRuntime == null ) {
577+ throw new TaskNotFoundException (taskId );
578+ }
579+ return taskRuntime ;
580+ }
581+
546582 private void acceptResult (TaskRuntime <?> taskRuntime ) {
547583 lock .lock ();
548584 try {
@@ -612,6 +648,16 @@ private void acceptResult(TaskRuntime<?> taskRuntime) {
612648 }
613649 }
614650
651+ private void resetTask (TaskRuntime <RewriteStageTask > taskRuntime ) {
652+ lock .lock ();
653+ try {
654+ taskRuntime .reset ();
655+ taskQueue .add (taskRuntime );
656+ } finally {
657+ lock .unlock ();
658+ }
659+ }
660+
615661 @ Override
616662 public boolean isClosed () {
617663 return status == ProcessStatus .KILLED ;
0 commit comments