3131import io .objectbox .reactive .DataObserver ;
3232import io .objectbox .reactive .DataPublisher ;
3333import io .objectbox .reactive .DataPublisherUtils ;
34+ import io .objectbox .reactive .SubscriptionBuilder ;
3435
36+ /**
37+ * A {@link DataPublisher} that notifies {@link DataObserver}s about changes in an entity box.
38+ * Publishing is requested when a subscription is {@link SubscriptionBuilder#observer(DataObserver) observed} and
39+ * then by {@link BoxStore} for each {@link BoxStore#txCommitted(Transaction, int[]) txCommitted}.
40+ * Publish requests are processed on a single thread, one at a time, in the order publishing was requested.
41+ */
42+ @ SuppressWarnings ("rawtypes" )
3543@ Internal
3644class ObjectClassPublisher implements DataPublisher <Class >, Runnable {
3745 final BoxStore boxStore ;
3846 final MultimapSet <Integer , DataObserver <Class >> observersByEntityTypeId = MultimapSet .create (SetType .THREAD_SAFE );
39- final Deque <int []> changesQueue = new ArrayDeque <>();
47+ private final Deque <PublishRequest > changesQueue = new ArrayDeque <>();
48+ private static class PublishRequest {
49+ @ Nullable private final DataObserver <Class > observer ;
50+ private final int [] entityTypeIds ;
51+ PublishRequest (@ Nullable DataObserver <Class > observer , int [] entityTypeIds ) {
52+ this .observer = observer ;
53+ this .entityTypeIds = entityTypeIds ;
54+ }
55+ }
4056 volatile boolean changePublisherRunning ;
4157
4258 ObjectClassPublisher (BoxStore boxStore ) {
@@ -76,21 +92,19 @@ private void unsubscribe(DataObserver<Class> observer, int entityTypeId) {
7692 }
7793
7894 @ Override
79- public void publishSingle (final DataObserver <Class > observer , @ Nullable final Object forClass ) {
80- boxStore .internalScheduleThread (new Runnable () {
81- @ Override
82- public void run () {
83- Collection <Class > entityClasses = forClass != null ? Collections .singletonList ((Class ) forClass ) :
84- boxStore .getAllEntityClasses ();
85- for (Class entityClass : entityClasses ) {
86- try {
87- observer .onData (entityClass );
88- } catch (RuntimeException e ) {
89- handleObserverException (entityClass );
90- }
91- }
95+ public void publishSingle (DataObserver <Class > observer , @ Nullable Object forClass ) {
96+ int [] entityTypeIds = forClass != null
97+ ? new int []{boxStore .getEntityTypeIdOrThrow ((Class ) forClass )}
98+ : boxStore .getAllEntityTypeIds ();
99+
100+ synchronized (changesQueue ) {
101+ changesQueue .add (new PublishRequest (observer , entityTypeIds ));
102+ // Only one thread at a time.
103+ if (!changePublisherRunning ) {
104+ changePublisherRunning = true ;
105+ boxStore .internalScheduleThread (this );
92106 }
93- });
107+ }
94108 }
95109
96110 private void handleObserverException (Class objectClass ) {
@@ -107,39 +121,50 @@ private void handleObserverException(Class objectClass) {
107121 */
108122 void publish (int [] entityTypeIdsAffected ) {
109123 synchronized (changesQueue ) {
110- changesQueue .add (entityTypeIdsAffected );
111- // Only one thread at a time
124+ changesQueue .add (new PublishRequest ( null , entityTypeIdsAffected ) );
125+ // Only one thread at a time.
112126 if (!changePublisherRunning ) {
113127 changePublisherRunning = true ;
114128 boxStore .internalScheduleThread (this );
115129 }
116130 }
117131 }
118132
133+ /**
134+ * Processes publish requests using a single thread to prevent any data generated by observers to get stale.
135+ * This publisher on its own can NOT deliver stale data (the entity class types do not change).
136+ * However, a {@link DataObserver} of this publisher might apply a {@link io.objectbox.reactive.DataTransformer}
137+ * which queries for data which CAN get stale if delivered out of order.
138+ */
119139 @ Override
120140 public void run () {
121141 try {
122142 while (true ) {
123- // We do not join all available array, just in case the app relies on a specific order
124- int [] entityTypeIdsAffected ;
143+ PublishRequest request ;
125144 synchronized (changesQueue ) {
126- entityTypeIdsAffected = changesQueue .pollFirst ();
127- if (entityTypeIdsAffected == null ) {
145+ request = changesQueue .pollFirst ();
146+ if (request == null ) {
128147 changePublisherRunning = false ;
129148 break ;
130149 }
131150 }
132- for (int entityTypeId : entityTypeIdsAffected ) {
133- Collection <DataObserver <Class >> observers = observersByEntityTypeId .get (entityTypeId );
134- if (observers != null && !observers .isEmpty ()) {
135- Class objectClass = boxStore .getEntityClassOrThrow (entityTypeId );
136- try {
137- for (DataObserver <Class > observer : observers ) {
138- observer .onData (objectClass );
139- }
140- } catch (RuntimeException e ) {
141- handleObserverException (objectClass );
151+
152+ for (int entityTypeId : request .entityTypeIds ) {
153+ // If no specific observer specified, notify all current observers.
154+ Collection <DataObserver <Class >> observers = request .observer != null
155+ ? Collections .singletonList (request .observer )
156+ : observersByEntityTypeId .get (entityTypeId );
157+ if (observers == null || observers .isEmpty ()) {
158+ continue ; // No observers for this entity type.
159+ }
160+
161+ Class entityClass = boxStore .getEntityClassOrThrow (entityTypeId );
162+ try {
163+ for (DataObserver <Class > observer : observers ) {
164+ observer .onData (entityClass );
142165 }
166+ } catch (RuntimeException e ) {
167+ handleObserverException (entityClass );
143168 }
144169 }
145170 }
0 commit comments