11/*
2- * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
2+ * Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
5757 * @author Stephane Maldini
5858 */
5959final class SinkManyEmitterProcessor <T > extends Flux <T > implements InternalManySink <T >,
60- Sinks .ManyWithUpstream <T >, CoreSubscriber <T >, Scannable , Disposable , ContextHolder {
60+ Sinks .ManyWithUpstream <T >, CoreSubscriber <T >, Scannable , Disposable , ContextHolder {
6161
6262 @ SuppressWarnings ("rawtypes" )
6363 static final FluxPublish .PubSubInner [] EMPTY = new FluxPublish .PublishInner [0 ];
@@ -201,6 +201,9 @@ public void onComplete() {
201201
202202 @ Override
203203 public EmitResult tryEmitComplete () {
204+ if (isCancelled ()) {
205+ return EmitResult .FAIL_CANCELLED ;
206+ }
204207 if (done ) {
205208 return EmitResult .FAIL_TERMINATED ;
206209 }
@@ -217,6 +220,9 @@ public void onError(Throwable throwable) {
217220 @ Override
218221 public EmitResult tryEmitError (Throwable t ) {
219222 Objects .requireNonNull (t , "tryEmitError must be invoked with a non-null Throwable" );
223+ if (isCancelled ()) {
224+ return EmitResult .FAIL_CANCELLED ;
225+ }
220226 if (done ) {
221227 return EmitResult .FAIL_TERMINATED ;
222228 }
@@ -241,6 +247,9 @@ public void onNext(T t) {
241247
242248 @ Override
243249 public EmitResult tryEmitNext (T t ) {
250+ if (isCancelled ()) {
251+ return EmitResult .FAIL_CANCELLED ;
252+ }
244253 if (done ) {
245254 return Sinks .EmitResult .FAIL_TERMINATED ;
246255 }
@@ -271,6 +280,23 @@ public EmitResult tryEmitNext(T t) {
271280 return subscribers == EMPTY ? EmitResult .FAIL_ZERO_SUBSCRIBER : EmitResult .FAIL_OVERFLOW ;
272281 }
273282 drain ();
283+
284+ // This final check is critical for handling a race between this emit operation
285+ // and a concurrent cancellation from another thread.
286+ //
287+ // The race condition scenario:
288+ // 1. This thread passes the initial isCancelled() check at the top of the method.
289+ // 2. This thread successfully offers an item to the queue.
290+ // 3. Concurrently, another thread disposes the last subscriber, which cancels the sink
291+ // and triggers a drain that cleans up the just-offered item.
292+ //
293+ // Without this check, we would return EmitResult.OK, but the item has already been
294+ // discarded. This check ensures we accurately report FAIL_CANCELLED, reflecting
295+ // the final state of the operation.
296+ if (isCancelled ()) {
297+ return EmitResult .FAIL_CANCELLED ;
298+ }
299+
274300 return EmitResult .OK ;
275301 }
276302
@@ -382,7 +408,7 @@ public Object scanUnsafe(Attr key) {
382408 return null ;
383409 }
384410
385- final void drain () {
411+ void drain () {
386412 if (WIP .getAndIncrement (this ) != 0 ) {
387413 return ;
388414 }
@@ -397,11 +423,9 @@ final void drain() {
397423
398424 boolean empty = q == null || q .isEmpty ();
399425
400- if (checkTerminated (d , empty )) {
401- return ;
402- }
426+ cleanupIfTerminated (d , empty );
403427
404- FluxPublish .PubSubInner <T >[] a = subscribers ;
428+ FluxPublish .PubSubInner <T >[] a = subscribers ;
405429
406430 if (a != EMPTY && !empty ) {
407431 long maxRequested = Long .MAX_VALUE ;
@@ -431,10 +455,8 @@ final void drain() {
431455 d = true ;
432456 v = null ;
433457 }
434- if (checkTerminated (d , v == null )) {
435- return ;
436- }
437- if (sourceMode != Fuseable .SYNC ) {
458+ cleanupIfTerminated (d , v == null );
459+ if (sourceMode != Fuseable .SYNC ) {
438460 s .request (1 );
439461 }
440462 continue ;
@@ -458,16 +480,14 @@ final void drain() {
458480
459481 empty = v == null ;
460482
461- if (checkTerminated (d , empty )) {
462- return ;
463- }
483+ cleanupIfTerminated (d , empty );
464484
465- if (empty ) {
485+ if (empty ) {
466486 //async mode only needs to break but SYNC mode needs to perform terminal cleanup here...
467487 if (sourceMode == Fuseable .SYNC ) {
468488 //the q is empty
469489 done = true ;
470- checkTerminated (true , true );
490+ cleanupIfTerminated (true , true );
471491 }
472492 break ;
473493 }
@@ -494,10 +514,8 @@ final void drain() {
494514 }
495515 else if ( sourceMode == Fuseable .SYNC ) {
496516 done = true ;
497- if (checkTerminated (true , empty )) { //empty can be true if no subscriber
498- break ;
499- }
500- }
517+ cleanupIfTerminated (true , empty );//empty can be true if no subscriber
518+ }
501519
502520 missed = WIP .addAndGet (this , -missed );
503521 if (missed == 0 ) {
@@ -511,7 +529,14 @@ FluxPublish.PubSubInner<T>[] terminate() {
511529 return SUBSCRIBERS .getAndSet (this , TERMINATED );
512530 }
513531
514- boolean checkTerminated (boolean d , boolean empty ) {
532+ /**
533+ * Inspects the current state and, if terminal, performs the necessary cleanup actions
534+ * like clearing the queue and signaling subscribers.
535+ *
536+ * @param d the current `done` state
537+ * @param empty if the queue is currently empty
538+ */
539+ void cleanupIfTerminated (boolean d , boolean empty ) {
515540 if (s == Operators .cancelledSubscription ()) {
516541 if (autoCancel ) {
517542 terminate ();
@@ -520,7 +545,7 @@ boolean checkTerminated(boolean d, boolean empty) {
520545 q .clear ();
521546 }
522547 }
523- return true ;
548+ return ;
524549 }
525550 if (d ) {
526551 Throwable e = error ;
@@ -532,19 +557,16 @@ boolean checkTerminated(boolean d, boolean empty) {
532557 for (FluxPublish .PubSubInner <T > inner : terminate ()) {
533558 inner .actual .onError (e );
534559 }
535- return true ;
536560 }
537561 else if (empty ) {
538562 for (FluxPublish .PubSubInner <T > inner : terminate ()) {
539563 inner .actual .onComplete ();
540564 }
541- return true ;
542565 }
543566 }
544- return false ;
545567 }
546568
547- final boolean add (EmitterInner <T > inner ) {
569+ boolean add (EmitterInner <T > inner ) {
548570 for (; ; ) {
549571 FluxPublish .PubSubInner <T >[] a = subscribers ;
550572 if (a == TERMINATED ) {
@@ -560,7 +582,7 @@ final boolean add(EmitterInner<T> inner) {
560582 }
561583 }
562584
563- final void remove (FluxPublish .PubSubInner <T > inner ) {
585+ void remove (FluxPublish .PubSubInner <T > inner ) {
564586 for (; ; ) {
565587 FluxPublish .PubSubInner <T >[] a = subscribers ;
566588 if (a == TERMINATED || a == EMPTY ) {
@@ -591,14 +613,11 @@ final void remove(FluxPublish.PubSubInner<T> inner) {
591613 if (SUBSCRIBERS .compareAndSet (this , a , b )) {
592614 //contrary to FluxPublish, there is a possibility of auto-cancel, which
593615 //happens when the removed inner makes the subscribers array EMPTY
594- if (autoCancel && b == EMPTY && Operators .terminate (S , this )) {
595- if (WIP .getAndIncrement (this ) != 0 ) {
596- return ;
597- }
598- terminate ();
599- Queue <T > q = queue ;
600- if (q != null ) {
601- q .clear ();
616+ if (autoCancel && b == EMPTY && !isCancelled ()) {
617+ if (Operators .terminate (S , this )) {
618+ // The state is now CANCELLED.
619+ // Trigger a drain so the serialized drain-loop can perform the cleanup
620+ drain ();
602621 }
603622 }
604623 return ;
@@ -653,5 +672,4 @@ public void dispose() {
653672 }
654673 }
655674
656-
657675}
0 commit comments