Skip to content

Commit 500587e

Browse files
committed
Panache Support multiple persistence unit in Hibernate Reactive
* Integration test for multiple reactive persistence units and Panache Backported from ORM the handling of different persistence units in entity in Panache * `@WithSessionOnDemand` works only with the default persistence unit * withSession overload to take the PU name * Execute update without entity runs on default session * Repository flush() flushes all PUs
1 parent de98066 commit 500587e

File tree

26 files changed

+669
-89
lines changed

26 files changed

+669
-89
lines changed

extensions/panache/hibernate-reactive-panache-common/runtime/src/main/java/io/quarkus/hibernate/reactive/panache/common/runtime/AbstractJpaOperations.java

Lines changed: 101 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package io.quarkus.hibernate.reactive.panache.common.runtime;
22

3+
import static io.quarkus.hibernate.orm.runtime.PersistenceUnitUtil.DEFAULT_PERSISTENCE_UNIT_NAME;
4+
35
import java.util.ArrayList;
6+
import java.util.Arrays;
47
import java.util.Collections;
58
import java.util.List;
69
import java.util.Map;
710
import java.util.Map.Entry;
11+
import java.util.stream.Collectors;
812
import java.util.stream.Stream;
913

1014
import jakarta.persistence.LockModeType;
@@ -19,6 +23,11 @@
1923
import io.smallrye.mutiny.Uni;
2024

2125
public abstract class AbstractJpaOperations<PanacheQueryType> {
26+
private static volatile Map<String, String> entityToPersistenceUnit = Collections.emptyMap();
27+
28+
public static void setEntityToPersistenceUnit(Map<String, String> map) {
29+
entityToPersistenceUnit = Collections.unmodifiableMap(map);
30+
}
2231

2332
// FIXME: make it configurable?
2433
static final long TIMEOUT_MS = 5000;
@@ -34,7 +43,7 @@ protected abstract PanacheQueryType createPanacheQuery(Uni<Mutiny.Session> sessi
3443
// Instance methods
3544

3645
public Uni<Void> persist(Object entity) {
37-
return persist(getSession(), entity);
46+
return persist(getSession(entity.getClass()), entity);
3847
}
3948

4049
public Uni<Void> persist(Uni<Mutiny.Session> sessionUni, Object entity) {
@@ -67,20 +76,60 @@ public Uni<Void> persist(Stream<?> entities) {
6776
}
6877

6978
public Uni<Void> persist(Object... entities) {
70-
return getSession().chain(session -> session.persistAll(entities));
79+
Map<String, List<Object>> sessions = Arrays.stream(entities)
80+
.collect(Collectors.groupingBy(e -> entityToPersistenceUnit.get(e.getClass().getName())));
81+
82+
List<Uni<Void>> results = new ArrayList<>();
83+
for (Entry<String, List<Object>> entry : sessions.entrySet()) {
84+
results.add(getSession(entry.getKey()).chain(session -> session.persistAll(entry.getValue().toArray())));
85+
}
86+
87+
return Uni.combine().all().unis(results).discardItems();
7188
}
7289

7390
public Uni<Void> delete(Object entity) {
74-
return getSession().chain(session -> session.remove(entity));
91+
return getSession(entity.getClass()).chain(session -> session.remove(entity));
7592
}
7693

7794
public boolean isPersistent(Object entity) {
78-
Mutiny.Session current = SessionOperations.getCurrentSession();
79-
return current != null ? current.contains(entity) : false;
95+
Session currentSession = getCurrentSession(entity.getClass());
96+
if (currentSession == null) {
97+
// No active session so object can't be persistent
98+
return false;
99+
}
100+
101+
return currentSession.contains(entity);
102+
}
103+
104+
public Session getCurrentSession(Class<?> entityClass) {
105+
String persistenceUnitName = entityToPersistenceUnit.get(entityClass.getName());
106+
return SessionOperations.getCurrentSession(persistenceUnitName);
80107
}
81108

109+
/*
110+
* Used by Panache repositories.
111+
*
112+
* This method flushes *all* persistence units.
113+
* Ideally, it should flush only the session associated with the repository’s entity,
114+
* but we don’t currently track the entity inside the repository.
115+
*
116+
* In Panache blocking, Quarkus injects the current EntityManager.
117+
* In reactive mode, however, there is not yet an injectable Mutiny.Session
118+
* (see https://github.com/quarkusio/quarkus/issues/47462).
119+
*/
82120
public Uni<Void> flush() {
83-
return getSession().chain(Session::flush);
121+
return allSessions()
122+
.chain(sessions -> {
123+
List<Uni<Void>> flushes = sessions.stream()
124+
.map(Session::flush)
125+
.toList();
126+
127+
return Uni.combine().all().unis(flushes).discardItems();
128+
});
129+
}
130+
131+
public Uni<Void> flush(Object entity) {
132+
return getSession(entity.getClass()).chain(Session::flush);
84133
}
85134

86135
public int paramCount(Object[] params) {
@@ -95,11 +144,11 @@ public int paramCount(Map<String, Object> params) {
95144
// Queries
96145

97146
public Uni<?> findById(Class<?> entityClass, Object id) {
98-
return getSession().chain(session -> session.find(entityClass, id));
147+
return getSession(entityClass).chain(session -> session.find(entityClass, id));
99148
}
100149

101150
public Uni<?> findById(Class<?> entityClass, Object id, LockModeType lockModeType) {
102-
return getSession()
151+
return getSession(entityClass)
103152
.chain(session -> session.find(entityClass, id, LockModeConverter.convertToLockMode(lockModeType)));
104153
}
105154

@@ -108,7 +157,7 @@ public PanacheQueryType find(Class<?> entityClass, String panacheQuery, Object..
108157
}
109158

110159
public PanacheQueryType find(Class<?> entityClass, String panacheQuery, Sort sort, Object... params) {
111-
Uni<Mutiny.Session> session = getSession();
160+
Uni<Mutiny.Session> session = getSession(entityClass);
112161
if (PanacheJpaUtil.isNamedQuery(panacheQuery)) {
113162
String namedQuery = panacheQuery.substring(1);
114163
if (sort != null) {
@@ -128,7 +177,7 @@ public PanacheQueryType find(Class<?> entityClass, String panacheQuery, Map<Stri
128177
}
129178

130179
public PanacheQueryType find(Class<?> entityClass, String panacheQuery, Sort sort, Map<String, Object> params) {
131-
Uni<Mutiny.Session> session = getSession();
180+
Uni<Mutiny.Session> session = getSession(entityClass);
132181
if (PanacheJpaUtil.isNamedQuery(panacheQuery)) {
133182
String namedQuery = panacheQuery.substring(1);
134183
if (sort != null) {
@@ -177,13 +226,13 @@ public Uni<List<?>> list(Class<?> entityClass, String query, Sort sort, Paramete
177226

178227
public PanacheQueryType findAll(Class<?> entityClass) {
179228
String query = "FROM " + PanacheJpaUtil.getEntityName(entityClass);
180-
Uni<Mutiny.Session> session = getSession();
229+
Uni<Mutiny.Session> session = getSession(entityClass);
181230
return createPanacheQuery(session, query, null, null, null);
182231
}
183232

184233
public PanacheQueryType findAll(Class<?> entityClass, Sort sort) {
185234
String query = "FROM " + PanacheJpaUtil.getEntityName(entityClass);
186-
Uni<Mutiny.Session> session = getSession();
235+
Uni<Mutiny.Session> session = getSession(entityClass);
187236
return createPanacheQuery(session, query, null, PanacheJpaUtil.toOrderBy(sort), null);
188237
}
189238

@@ -196,7 +245,7 @@ public Uni<List<?>> listAll(Class<?> entityClass, Sort sort) {
196245
}
197246

198247
public Uni<Long> count(Class<?> entityClass) {
199-
return getSession()
248+
return getSession(entityClass)
200249
.chain(session -> session
201250
.createSelectionQuery("FROM " + PanacheJpaUtil.getEntityName(entityClass), entityClass)
202251
.getResultCount());
@@ -206,13 +255,13 @@ public Uni<Long> count(Class<?> entityClass) {
206255
public Uni<Long> count(Class<?> entityClass, String panacheQuery, Object... params) {
207256

208257
if (PanacheJpaUtil.isNamedQuery(panacheQuery))
209-
return (Uni) getSession().chain(session -> {
258+
return (Uni) getSession(entityClass).chain(session -> {
210259
String namedQueryName = panacheQuery.substring(1);
211260
NamedQueryUtil.checkNamedQuery(entityClass, namedQueryName);
212261
return bindParameters(session.createNamedQuery(namedQueryName, Long.class), params).getSingleResult();
213262
});
214263

215-
return getSession().chain(session -> bindParameters(
264+
return getSession(entityClass).chain(session -> bindParameters(
216265
session.createSelectionQuery(PanacheJpaUtil.createQueryForCount(entityClass, panacheQuery, paramCount(params)),
217266
Object.class),
218267
params).getResultCount())
@@ -223,13 +272,13 @@ public Uni<Long> count(Class<?> entityClass, String panacheQuery, Object... para
223272
public Uni<Long> count(Class<?> entityClass, String panacheQuery, Map<String, Object> params) {
224273

225274
if (PanacheJpaUtil.isNamedQuery(panacheQuery))
226-
return getSession().chain(session -> {
275+
return getSession(entityClass).chain(session -> {
227276
String namedQueryName = panacheQuery.substring(1);
228277
NamedQueryUtil.checkNamedQuery(entityClass, namedQueryName);
229278
return bindParameters(session.createNamedQuery(namedQueryName, Long.class), params).getSingleResult();
230279
});
231280

232-
return getSession().chain(session -> bindParameters(
281+
return getSession(entityClass).chain(session -> bindParameters(
233282
session.createSelectionQuery(PanacheJpaUtil.createQueryForCount(entityClass, panacheQuery, paramCount(params)),
234283
Object.class),
235284
params).getResultCount())
@@ -258,7 +307,7 @@ public Uni<Boolean> exists(Class<?> entityClass, String query, Parameters params
258307
}
259308

260309
public Uni<Long> deleteAll(Class<?> entityClass) {
261-
return getSession().chain(
310+
return getSession(entityClass).chain(
262311
session -> session.createMutationQuery("DELETE FROM " + PanacheJpaUtil.getEntityName(entityClass))
263312
.executeUpdate()
264313
.map(Integer::longValue));
@@ -272,20 +321,20 @@ public Uni<Boolean> deleteById(Class<?> entityClass, Object id) {
272321
if (entity == null) {
273322
return Uni.createFrom().item(false);
274323
}
275-
return getSession().chain(session -> session.remove(entity).map(v -> true));
324+
return getSession(entityClass).chain(session -> session.remove(entity).map(v -> true));
276325
});
277326
}
278327

279328
public Uni<Long> delete(Class<?> entityClass, String panacheQuery, Object... params) {
280329

281330
if (PanacheJpaUtil.isNamedQuery(panacheQuery))
282-
return getSession().chain(session -> {
331+
return getSession(entityClass).chain(session -> {
283332
String namedQueryName = panacheQuery.substring(1);
284333
NamedQueryUtil.checkNamedQuery(entityClass, namedQueryName);
285334
return bindParameters(session.createNamedQuery(namedQueryName), params).executeUpdate().map(Integer::longValue);
286335
});
287336

288-
return getSession().chain(session -> bindParameters(
337+
return getSession(entityClass).chain(session -> bindParameters(
289338
session.createMutationQuery(PanacheJpaUtil.createDeleteQuery(entityClass, panacheQuery, paramCount(params))),
290339
params)
291340
.executeUpdate().map(Integer::longValue))
@@ -296,13 +345,13 @@ public Uni<Long> delete(Class<?> entityClass, String panacheQuery, Object... par
296345
public Uni<Long> delete(Class<?> entityClass, String panacheQuery, Map<String, Object> params) {
297346

298347
if (PanacheJpaUtil.isNamedQuery(panacheQuery))
299-
return getSession().chain(session -> {
348+
return getSession(entityClass).chain(session -> {
300349
String namedQueryName = panacheQuery.substring(1);
301350
NamedQueryUtil.checkNamedQuery(entityClass, namedQueryName);
302351
return bindParameters(session.createNamedQuery(namedQueryName), params).executeUpdate().map(Integer::longValue);
303352
});
304353

305-
return getSession().chain(session -> bindParameters(
354+
return getSession(entityClass).chain(session -> bindParameters(
306355
session.createMutationQuery(PanacheJpaUtil.createDeleteQuery(entityClass, panacheQuery, paramCount(params))),
307356
params)
308357
.executeUpdate().map(Integer::longValue))
@@ -322,7 +371,7 @@ public IllegalStateException implementationInjectionMissing() {
322371
public Uni<Integer> executeUpdate(Class<?> entityClass, String panacheQuery, Object... params) {
323372

324373
if (PanacheJpaUtil.isNamedQuery(panacheQuery))
325-
return (Uni) getSession().chain(session -> {
374+
return (Uni) getSession(entityClass).chain(session -> {
326375
String namedQueryName = panacheQuery.substring(1);
327376
NamedQueryUtil.checkNamedQuery(entityClass, namedQueryName);
328377
return bindParameters(session.createNamedQuery(namedQueryName), params).executeUpdate();
@@ -337,7 +386,7 @@ public Uni<Integer> executeUpdate(Class<?> entityClass, String panacheQuery, Obj
337386
public Uni<Integer> executeUpdate(Class<?> entityClass, String panacheQuery, Map<String, Object> params) {
338387

339388
if (PanacheJpaUtil.isNamedQuery(panacheQuery))
340-
return (Uni) getSession().chain(session -> {
389+
return (Uni) getSession(entityClass).chain(session -> {
341390
String namedQueryName = panacheQuery.substring(1);
342391
NamedQueryUtil.checkNamedQuery(entityClass, namedQueryName);
343392
return bindParameters(session.createNamedQuery(namedQueryName), params).executeUpdate();
@@ -364,8 +413,22 @@ public Uni<Integer> update(Class<?> entityClass, String query, Object... params)
364413
//
365414
// Static helpers
366415

416+
public static Uni<List<Session>> allSessions() {
417+
return SessionOperations.allSessions();
418+
}
419+
367420
public static Uni<Mutiny.Session> getSession() {
368-
return SessionOperations.getSession();
421+
return getSession(DEFAULT_PERSISTENCE_UNIT_NAME);
422+
}
423+
424+
public static Uni<Mutiny.Session> getSession(Class<?> clazz) {
425+
String className = clazz.getName();
426+
String persistenceUnitName = entityToPersistenceUnit.get(className);
427+
return getSession(persistenceUnitName);
428+
}
429+
430+
public static Uni<Mutiny.Session> getSession(String persistenceUnitName) {
431+
return SessionOperations.getSession(persistenceUnitName);
369432
}
370433

371434
public static Mutiny.Query<?> bindParameters(Mutiny.Query<?> query, Object[] params) {
@@ -395,13 +458,21 @@ public static <T extends Mutiny.AbstractQuery> T bindParameters(T query, Map<Str
395458
return query;
396459
}
397460

461+
/**
462+
* Execute update on default persistence unit
463+
*/
398464
public static Uni<Integer> executeUpdate(String query, Object... params) {
399-
return getSession().chain(session -> bindParameters(session.createMutationQuery(query), params)
400-
.executeUpdate());
465+
return getSession(DEFAULT_PERSISTENCE_UNIT_NAME)
466+
.chain(session -> bindParameters(session.createMutationQuery(query), params)
467+
.executeUpdate());
401468
}
402469

470+
/**
471+
* Execute update on default persistence unit
472+
*/
403473
public static Uni<Integer> executeUpdate(String query, Map<String, Object> params) {
404-
return getSession().chain(session -> bindParameters(session.createMutationQuery(query), params)
405-
.executeUpdate());
474+
return getSession(DEFAULT_PERSISTENCE_UNIT_NAME)
475+
.chain(session -> bindParameters(session.createMutationQuery(query), params)
476+
.executeUpdate());
406477
}
407478
}

0 commit comments

Comments
 (0)