@@ -14,6 +14,8 @@ export default class Queue {
1414 private ctx : Context ;
1515 private snapshotNames : Array < string > = [ ] ;
1616 private variants : Array < string > = [ ] ;
17+ private activeProcessingCount : number = 0 ;
18+ private readonly MAX_CONCURRENT_PROCESSING = 5 ;
1719
1820 constructor ( ctx : Context ) {
1921 this . ctx = ctx ;
@@ -275,15 +277,65 @@ export default class Queue {
275277
276278 private async processNext ( ) : Promise < void > {
277279 if ( ! this . isEmpty ( ) ) {
280+ const useRemoteDiscovery = this . ctx . env . USE_REMOTE_DISCOVERY || this . ctx . config . useRemoteDiscovery ;
281+
282+ if ( useRemoteDiscovery && ! this . ctx . config . delayedUpload && ! this . ctx . config . allowDuplicateSnapshotNames ) {
283+ let maxConcurrentProcessing = this . ctx . env . MAX_CONCURRENT_PROCESSING === 0 ? this . MAX_CONCURRENT_PROCESSING : this . ctx . env . MAX_CONCURRENT_PROCESSING ;
284+ if ( maxConcurrentProcessing > 15 || maxConcurrentProcessing < 1 ) {
285+ this . ctx . log . info ( `Larger than 15 concurrent processing. Setting to 5.` ) ;
286+ maxConcurrentProcessing = 5 ;
287+ }
288+
289+ this . ctx . log . info ( `Max concurrent processing: ${ maxConcurrentProcessing } ` ) ;
290+ const snapshotsToProcess : Array < Snapshot > = [ ] ;
291+ const maxSnapshots = Math . min ( maxConcurrentProcessing - this . activeProcessingCount , this . snapshots . length ) ;
292+
293+ for ( let i = 0 ; i < maxSnapshots ; i ++ ) {
294+ let snapshot ;
295+ if ( this . ctx . config . delayedUpload ) {
296+ snapshot = this . snapshots . pop ( ) ;
297+ } else {
298+ snapshot = this . snapshots . shift ( ) ;
299+ }
300+ if ( snapshot ) {
301+ snapshotsToProcess . push ( snapshot ) ;
302+ }
303+ }
304+
305+ if ( snapshotsToProcess . length > 0 ) {
306+ this . activeProcessingCount += snapshotsToProcess . length ;
307+ const processingPromises = snapshotsToProcess . map ( snapshot => this . processSnapshot ( snapshot ) ) ;
308+ await Promise . allSettled ( processingPromises ) ;
309+ this . activeProcessingCount -= snapshotsToProcess . length ;
310+
311+ if ( ! this . isEmpty ( ) ) {
312+ this . processNext ( ) ;
313+ } else {
314+ this . processing = false ;
315+ }
316+ return ;
317+ }
318+ }
319+
278320 let snapshot ;
279321 if ( this . ctx . config . delayedUpload ) {
280322 snapshot = this . snapshots . pop ( ) ;
281323 } else {
282324 snapshot = this . snapshots . shift ( ) ;
283325 }
284- try {
285- this . processingSnapshot = snapshot ?. name ;
286- let drop = false ;
326+ if ( snapshot ) {
327+ await this . processSnapshot ( snapshot ) ;
328+ this . processNext ( ) ;
329+ }
330+ } else {
331+ this . processing = false ;
332+ }
333+ }
334+
335+ private async processSnapshot ( snapshot : Snapshot ) : Promise < void > {
336+ try {
337+ this . processingSnapshot = snapshot ?. name ;
338+ let drop = false ;
287339
288340
289341 if ( this . ctx . isStartExec ) {
@@ -450,7 +502,6 @@ export default class Queue {
450502 if ( snapshot ?. options ?. contextId ) {
451503 this . ctx . contextToSnapshotMap ?. set ( snapshot ?. options ?. contextId , '2' ) ;
452504 }
453- this . processNext ( ) ;
454505 } else {
455506 let approvalThreshold = snapshot ?. options ?. approvalThreshold || this . ctx . config . approvalThreshold ;
456507 let rejectionThreshold = snapshot ?. options ?. rejectionThreshold || this . ctx . config . rejectionThreshold ;
@@ -487,10 +538,6 @@ export default class Queue {
487538 this . ctx . log . debug ( `Closed browser context for snapshot ${ snapshot . name } ` ) ;
488539 }
489540 }
490- this . processNext ( ) ;
491- } else {
492- this . processing = false ;
493- }
494541 }
495542
496543 isProcessing ( ) : boolean {
0 commit comments