14
14
import java .util .concurrent .RejectedExecutionException ;
15
15
import java .util .concurrent .TimeUnit ;
16
16
import java .util .concurrent .locks .LockSupport ;
17
+ import java .util .concurrent .locks .ReentrantLock ;
17
18
18
19
import java9 .util .Objects ;
19
20
import java9 .util .function .BiConsumer ;
146
147
* @since 9
147
148
*/
148
149
public class SubmissionPublisher <T > implements Publisher <T > {
149
- // CVS rev. 1.81
150
+ // CVS rev. 1.83
150
151
/*
151
152
* Most mechanics are handled by BufferedSubscription. This class
152
153
* mainly tracks subscribers and ensures sequentiality, by using
153
- * built-in synchronization locks across public methods. Using
154
- * built-in locks works well in the most typical case in which
155
- * only one thread submits items. We extend this idea in
156
- * submission methods by detecting single-ownership to reduce
157
- * producer-consumer synchronization strength .
154
+ * locks across public methods, to ensure thread-safety in the
155
+ * presence of multiple sources and maintain acquire-release
156
+ * ordering around user operations. However, we also track whether
157
+ * there is only a single source, and if so streamline some buffer
158
+ * operations by avoiding some atomics .
158
159
*/
159
160
160
161
/** The largest possible power of two array size. */
@@ -209,6 +210,8 @@ private static final class ThreadPerTaskExecutor implements Executor {
209
210
*/
210
211
BufferedSubscription <T > clients ;
211
212
213
+ /** Lock for exclusion across multiple sources */
214
+ final ReentrantLock lock ;
212
215
/** Run status, updated only within locks */
213
216
volatile boolean closed ;
214
217
/** Set true on first call to subscribe, to initialize possible owner */
@@ -248,6 +251,7 @@ public SubmissionPublisher(Executor executor, int maxBufferCapacity,
248
251
Objects .requireNonNull (executor );
249
252
if (maxBufferCapacity <= 0 )
250
253
throw new IllegalArgumentException ("capacity must be positive" );
254
+ this .lock = new ReentrantLock ();
251
255
this .executor = executor ;
252
256
this .onNextHandler = handler ;
253
257
this .maxBufferCapacity = roundCapacity (maxBufferCapacity );
@@ -311,13 +315,15 @@ public SubmissionPublisher() {
311
315
*/
312
316
public void subscribe (Subscriber <? super T > subscriber ) {
313
317
Objects .requireNonNull (subscriber );
318
+ ReentrantLock lock = this .lock ;
314
319
int max = maxBufferCapacity ; // allocate initial array
315
320
Object [] array = new Object [max < INITIAL_CAPACITY ?
316
321
max : INITIAL_CAPACITY ];
317
322
BufferedSubscription <T > subscription =
318
323
new BufferedSubscription <T >(subscriber , executor , onNextHandler ,
319
324
array , max );
320
- synchronized (this ) {
325
+ lock .lock ();
326
+ try {
321
327
if (!subscribed ) {
322
328
subscribed = true ;
323
329
owner = Thread .currentThread ();
@@ -352,6 +358,8 @@ else if (subscriber.equals(b.subscriber)) {
352
358
pred = b ;
353
359
b = next ;
354
360
}
361
+ } finally {
362
+ lock .unlock ();
355
363
}
356
364
}
357
365
@@ -364,7 +372,9 @@ private int doOffer(T item, long nanos,
364
372
Objects .requireNonNull (item );
365
373
int lag = 0 ;
366
374
boolean complete , unowned ;
367
- synchronized (this ) {
375
+ ReentrantLock lock = this .lock ;
376
+ lock .lock ();
377
+ try {
368
378
Thread t = Thread .currentThread (), o ;
369
379
BufferedSubscription <T > b = clients ;
370
380
if ((unowned = ((o = owner ) != t )) && o != null )
@@ -395,6 +405,8 @@ else if (stat > lag)
395
405
if (retries != null || cleanMe )
396
406
lag = retryOffer (item , nanos , onDrop , retries , lag , cleanMe );
397
407
}
408
+ } finally {
409
+ lock .unlock ();
398
410
}
399
411
if (complete )
400
412
throw new IllegalStateException ("Closed" );
@@ -583,14 +595,18 @@ public int offer(T item, long timeout, TimeUnit unit,
583
595
* subscribers have yet completed.
584
596
*/
585
597
public void close () {
598
+ ReentrantLock lock = this .lock ;
586
599
if (!closed ) {
587
600
BufferedSubscription <T > b ;
588
- synchronized (this ) {
601
+ lock .lock ();
602
+ try {
589
603
// no need to re-check closed here
590
604
b = clients ;
591
605
clients = null ;
592
606
owner = null ;
593
607
closed = true ;
608
+ } finally {
609
+ lock .unlock ();
594
610
}
595
611
while (b != null ) {
596
612
BufferedSubscription <T > next = b .next ;
@@ -614,16 +630,20 @@ public void close() {
614
630
*/
615
631
public void closeExceptionally (Throwable error ) {
616
632
Objects .requireNonNull (error );
633
+ ReentrantLock lock = this .lock ;
617
634
if (!closed ) {
618
635
BufferedSubscription <T > b ;
619
- synchronized (this ) {
636
+ lock .lock ();
637
+ try {
620
638
b = clients ;
621
639
if (!closed ) { // don't clobber racing close
622
640
closedException = error ;
623
641
clients = null ;
624
642
owner = null ;
625
643
closed = true ;
626
644
}
645
+ } finally {
646
+ lock .unlock ();
627
647
}
628
648
while (b != null ) {
629
649
BufferedSubscription <T > next = b .next ;
@@ -661,7 +681,9 @@ public Throwable getClosedException() {
661
681
*/
662
682
public boolean hasSubscribers () {
663
683
boolean nonEmpty = false ;
664
- synchronized (this ) {
684
+ ReentrantLock lock = this .lock ;
685
+ lock .lock ();
686
+ try {
665
687
for (BufferedSubscription <T > b = clients ; b != null ;) {
666
688
BufferedSubscription <T > next = b .next ;
667
689
if (b .isClosed ()) {
@@ -673,6 +695,8 @@ public boolean hasSubscribers() {
673
695
break ;
674
696
}
675
697
}
698
+ } finally {
699
+ lock .unlock ();
676
700
}
677
701
return nonEmpty ;
678
702
}
@@ -683,9 +707,15 @@ public boolean hasSubscribers() {
683
707
* @return the number of current subscribers
684
708
*/
685
709
public int getNumberOfSubscribers () {
686
- synchronized (this ) {
687
- return cleanAndCount ();
710
+ int n ;
711
+ ReentrantLock lock = this .lock ;
712
+ lock .lock ();
713
+ try {
714
+ n = cleanAndCount ();
715
+ } finally {
716
+ lock .unlock ();
688
717
}
718
+ return n ;
689
719
}
690
720
691
721
/**
@@ -715,7 +745,9 @@ public int getMaxBufferCapacity() {
715
745
*/
716
746
public List <Subscriber <? super T >> getSubscribers () {
717
747
ArrayList <Subscriber <? super T >> subs = new ArrayList <Subscriber <? super T >>();
718
- synchronized (this ) {
748
+ ReentrantLock lock = this .lock ;
749
+ lock .lock ();
750
+ try {
719
751
BufferedSubscription <T > pred = null , next ;
720
752
for (BufferedSubscription <T > b = clients ; b != null ; b = next ) {
721
753
next = b .next ;
@@ -731,6 +763,8 @@ public List<Subscriber<? super T>> getSubscribers() {
731
763
pred = b ;
732
764
}
733
765
}
766
+ } finally {
767
+ lock .unlock ();
734
768
}
735
769
return subs ;
736
770
}
@@ -744,8 +778,11 @@ public List<Subscriber<? super T>> getSubscribers() {
744
778
*/
745
779
public boolean isSubscribed (Subscriber <? super T > subscriber ) {
746
780
Objects .requireNonNull (subscriber );
781
+ boolean subscribed = false ;
782
+ ReentrantLock lock = this .lock ;
747
783
if (!closed ) {
748
- synchronized (this ) {
784
+ lock .lock ();
785
+ try {
749
786
BufferedSubscription <T > pred = null , next ;
750
787
for (BufferedSubscription <T > b = clients ; b != null ; b = next ) {
751
788
next = b .next ;
@@ -756,14 +793,16 @@ public boolean isSubscribed(Subscriber<? super T> subscriber) {
756
793
else
757
794
pred .next = next ;
758
795
}
759
- else if (subscriber .equals (b .subscriber ))
760
- return true ;
796
+ else if (subscribed = subscriber .equals (b .subscriber ))
797
+ break ;
761
798
else
762
799
pred = b ;
763
800
}
801
+ } finally {
802
+ lock .unlock ();
764
803
}
765
804
}
766
- return false ;
805
+ return subscribed ;
767
806
}
768
807
769
808
/**
@@ -776,7 +815,9 @@ else if (subscriber.equals(b.subscriber))
776
815
public long estimateMinimumDemand () {
777
816
long min = Long .MAX_VALUE ;
778
817
boolean nonEmpty = false ;
779
- synchronized (this ) {
818
+ ReentrantLock lock = this .lock ;
819
+ lock .lock ();
820
+ try {
780
821
BufferedSubscription <T > pred = null , next ;
781
822
for (BufferedSubscription <T > b = clients ; b != null ; b = next ) {
782
823
int n ; long d ;
@@ -795,6 +836,8 @@ public long estimateMinimumDemand() {
795
836
pred = b ;
796
837
}
797
838
}
839
+ } finally {
840
+ lock .unlock ();
798
841
}
799
842
return nonEmpty ? min : 0 ;
800
843
}
@@ -807,7 +850,9 @@ public long estimateMinimumDemand() {
807
850
*/
808
851
public int estimateMaximumLag () {
809
852
int max = 0 ;
810
- synchronized (this ) {
853
+ ReentrantLock lock = this .lock ;
854
+ lock .lock ();
855
+ try {
811
856
BufferedSubscription <T > pred = null , next ;
812
857
for (BufferedSubscription <T > b = clients ; b != null ; b = next ) {
813
858
int n ;
@@ -825,6 +870,8 @@ public int estimateMaximumLag() {
825
870
pred = b ;
826
871
}
827
872
}
873
+ } finally {
874
+ lock .unlock ();
828
875
}
829
876
return max ;
830
877
}
0 commit comments