Skip to content

Commit 3e9a86d

Browse files
Merge pull request #271 from cloudera/main
Release 1.25.0
2 parents def6fd3 + c230f1c commit 3e9a86d

File tree

47 files changed

+3106
-279
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+3106
-279
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
3+
* (C) Cloudera, Inc. 2024
4+
* All rights reserved.
5+
*
6+
* Applicable Open Source License: Apache 2.0
7+
*
8+
* NOTE: Cloudera open source products are modular software products
9+
* made up of hundreds of individual components, each of which was
10+
* individually copyrighted. Each Cloudera open source product is a
11+
* collective work under U.S. Copyright Law. Your license to use the
12+
* collective work is as provided in your written agreement with
13+
* Cloudera. Used apart from the collective work, this file is
14+
* licensed for your use pursuant to the open source license
15+
* identified above.
16+
*
17+
* This code is provided to you pursuant a written agreement with
18+
* (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
19+
* this code. If you do not have a written agreement with Cloudera nor
20+
* with an authorized and properly licensed third party, you do not
21+
* have any rights to access nor to use this code.
22+
*
23+
* Absent a written agreement with Cloudera, Inc. ("Cloudera") to the
24+
* contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
25+
* KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
26+
* WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
27+
* IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
28+
* FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
29+
* AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
30+
* ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
31+
* OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
32+
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
33+
* CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
34+
* RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
35+
* BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
36+
* DATA.
37+
******************************************************************************/
38+
39+
package com.cloudera.cai.rag.configuration;
40+
41+
import org.jdbi.v3.core.HandleCallback;
42+
import org.jdbi.v3.core.HandleConsumer;
43+
import org.jdbi.v3.core.Jdbi;
44+
45+
/**
46+
* Database operations implementation that delegates to a Jdbi instance. This is the production
47+
* implementation used throughout the application.
48+
*/
49+
public class DatabaseOperations {
50+
private final Jdbi jdbi;
51+
52+
public DatabaseOperations(Jdbi jdbi) {
53+
this.jdbi = jdbi;
54+
}
55+
56+
public <X extends Exception> void useHandle(HandleConsumer<X> handleConsumer) throws X {
57+
jdbi.useHandle(handleConsumer);
58+
}
59+
60+
public <X extends Exception> void useTransaction(HandleConsumer<X> handleConsumer) throws X {
61+
jdbi.useTransaction(handleConsumer);
62+
}
63+
64+
public <T, X extends Exception> T inTransaction(HandleCallback<T, X> handleCallback) throws X {
65+
return jdbi.inTransaction(handleCallback);
66+
}
67+
68+
public <T, X extends Exception> T withHandle(HandleCallback<T, X> handleCallback) throws X {
69+
return jdbi.withHandle(handleCallback);
70+
}
71+
}

backend/src/main/java/com/cloudera/cai/rag/configuration/JdbiConfiguration.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*******************************************************************************
1+
/*
22
* CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
33
* (C) Cloudera, Inc. 2024
44
* All rights reserved.
@@ -45,6 +45,8 @@
4545
import java.sql.SQLException;
4646
import javax.sql.DataSource;
4747
import lombok.extern.slf4j.Slf4j;
48+
import org.jdbi.v3.core.HandleCallback;
49+
import org.jdbi.v3.core.HandleConsumer;
4850
import org.jdbi.v3.core.Jdbi;
4951
import org.springframework.context.annotation.Bean;
5052
import org.springframework.context.annotation.Configuration;
@@ -56,8 +58,8 @@ public class JdbiConfiguration {
5658
private static final Object LOCK = new Object();
5759

5860
@Bean
59-
public Jdbi jdbi() {
60-
return createJdbi();
61+
public DatabaseOperations databaseOperations() {
62+
return new DatabaseOperations(createJdbi());
6163
}
6264

6365
private static Jdbi createJdbi() {
@@ -108,7 +110,51 @@ private static DataSource dataSource(DatabaseConfig databaseConfig) throws SQLEx
108110
}
109111

110112
// nullables below here
111-
public static Jdbi createNull() {
112-
return new JdbiConfiguration().jdbi();
113+
public static DatabaseOperations createNull(RuntimeException... exceptions) {
114+
return new DatabaseOperationsStub(createJdbi(), exceptions);
115+
}
116+
117+
/**
118+
* Test implementation of DatabaseOperations that can inject failures for testing. This allows us
119+
* to test error scenarios without using Mockito.
120+
*/
121+
private static class DatabaseOperationsStub extends DatabaseOperations {
122+
private final RuntimeException[] exceptions;
123+
private int exceptionIndex = 0;
124+
125+
private DatabaseOperationsStub(Jdbi jdbi, RuntimeException[] exceptions) {
126+
super(jdbi);
127+
this.exceptions = exceptions;
128+
}
129+
130+
private void checkForException() {
131+
if (exceptionIndex < exceptions.length) {
132+
throw exceptions[exceptionIndex++];
133+
}
134+
}
135+
136+
@Override
137+
public <X extends Exception> void useHandle(HandleConsumer<X> handleConsumer) throws X {
138+
checkForException();
139+
super.useHandle(handleConsumer);
140+
}
141+
142+
@Override
143+
public <X extends Exception> void useTransaction(HandleConsumer<X> handleConsumer) throws X {
144+
checkForException();
145+
super.useTransaction(handleConsumer);
146+
}
147+
148+
@Override
149+
public <T, X extends Exception> T inTransaction(HandleCallback<T, X> handleCallback) throws X {
150+
checkForException();
151+
return super.inTransaction(handleCallback);
152+
}
153+
154+
@Override
155+
public <T, X extends Exception> T withHandle(HandleCallback<T, X> handleCallback) throws X {
156+
checkForException();
157+
return super.withHandle(handleCallback);
158+
}
113159
}
114160
}

backend/src/main/java/com/cloudera/cai/rag/datasources/DeleteDataSourceReconciler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,37 +38,37 @@
3838

3939
package com.cloudera.cai.rag.datasources;
4040

41+
import com.cloudera.cai.rag.configuration.DatabaseOperations;
4142
import com.cloudera.cai.rag.configuration.JdbiConfiguration;
4243
import com.cloudera.cai.rag.external.RagBackendClient;
4344
import com.cloudera.cai.util.Tracker;
4445
import com.cloudera.cai.util.reconcilers.*;
4546
import io.opentelemetry.api.OpenTelemetry;
4647
import java.util.Set;
4748
import lombok.extern.slf4j.Slf4j;
48-
import org.jdbi.v3.core.Jdbi;
4949
import org.springframework.beans.factory.annotation.Qualifier;
5050
import org.springframework.stereotype.Component;
5151

5252
@Component
5353
@Slf4j
5454
public class DeleteDataSourceReconciler extends BaseReconciler<Long> {
55-
private final Jdbi jdbi;
55+
private final DatabaseOperations databaseOperations;
5656
private final RagBackendClient ragBackendClient;
5757

5858
public DeleteDataSourceReconciler(
59-
Jdbi jdbi,
59+
DatabaseOperations databaseOperations,
6060
RagBackendClient ragBackendClient,
6161
@Qualifier("reconcilerConfig") ReconcilerConfig reconcilerConfig,
6262
OpenTelemetry openTelemetry) {
6363
super(reconcilerConfig, openTelemetry);
64-
this.jdbi = jdbi;
64+
this.databaseOperations = databaseOperations;
6565
this.ragBackendClient = ragBackendClient;
6666
}
6767

6868
@Override
6969
public void resync() {
7070
log.debug("Checking for data sources to delete");
71-
jdbi.useHandle(
71+
databaseOperations.useHandle(
7272
handle ->
7373
handle
7474
.createQuery("SELECT id FROM rag_data_source WHERE deleted IS NOT NULL")
@@ -83,7 +83,7 @@ public ReconcileResult reconcile(Set<Long> dataSourceIds) {
8383
ragBackendClient.deleteDataSource(dataSourceId);
8484
log.info(
8585
"deleting data source and documents from the database. data source id: {}", dataSourceId);
86-
jdbi.useTransaction(
86+
databaseOperations.useTransaction(
8787
handle -> {
8888
handle.execute("DELETE FROM RAG_DATA_SOURCE WHERE ID = ?", dataSourceId);
8989
handle.execute(

backend/src/main/java/com/cloudera/cai/rag/datasources/RagDataSourceRepository.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,28 @@
3939
package com.cloudera.cai.rag.datasources;
4040

4141
import com.cloudera.cai.rag.Types.RagDataSource;
42+
import com.cloudera.cai.rag.configuration.DatabaseOperations;
4243
import com.cloudera.cai.rag.configuration.JdbiConfiguration;
4344
import com.cloudera.cai.util.exceptions.NotFound;
4445
import java.time.Instant;
4546
import java.util.List;
4647
import lombok.extern.slf4j.Slf4j;
4748
import org.jdbi.v3.core.Handle;
48-
import org.jdbi.v3.core.Jdbi;
4949
import org.jdbi.v3.core.mapper.reflect.ConstructorMapper;
5050
import org.jdbi.v3.core.statement.Query;
5151
import org.springframework.stereotype.Component;
5252

5353
@Component
5454
@Slf4j
5555
public class RagDataSourceRepository {
56-
private final Jdbi jdbi;
56+
private final DatabaseOperations databaseOperations;
5757

58-
public RagDataSourceRepository(Jdbi jdbi) {
59-
this.jdbi = jdbi;
58+
public RagDataSourceRepository(DatabaseOperations databaseOperations) {
59+
this.databaseOperations = databaseOperations;
6060
}
6161

6262
public Long createRagDataSource(RagDataSource input) {
63-
return jdbi.inTransaction(handle -> createRagDataSource(handle, input));
63+
return databaseOperations.inTransaction(handle -> createRagDataSource(handle, input));
6464
}
6565

6666
public Long createRagDataSource(Handle handle, RagDataSource input) {
@@ -91,7 +91,7 @@ private static RagDataSource cleanInputs(RagDataSource input) {
9191

9292
public void updateRagDataSource(RagDataSource input) {
9393
RagDataSource cleanedInputs = cleanInputs(input);
94-
jdbi.useTransaction(
94+
databaseOperations.useTransaction(
9595
handle -> {
9696
var sql =
9797
"""
@@ -121,7 +121,7 @@ public void updateRagDataSource(RagDataSource input) {
121121
}
122122

123123
public RagDataSource getRagDataSourceById(Long id) {
124-
return jdbi.withHandle(
124+
return databaseOperations.withHandle(
125125
handle -> {
126126
var sql =
127127
"""
@@ -150,7 +150,7 @@ public RagDataSource getRagDataSourceById(Long id) {
150150

151151
public List<RagDataSource> getRagDataSources() {
152152
log.debug("Getting all RagDataSources");
153-
return jdbi.withHandle(
153+
return databaseOperations.withHandle(
154154
handle -> {
155155
var sql =
156156
"""
@@ -174,7 +174,7 @@ public List<RagDataSource> getRagDataSources() {
174174
}
175175

176176
public void deleteDataSource(Long id) {
177-
jdbi.useTransaction(handle -> deleteDataSource(handle, id));
177+
databaseOperations.useTransaction(handle -> deleteDataSource(handle, id));
178178
}
179179

180180
public void deleteDataSource(Handle handle, Long id) {
@@ -184,7 +184,7 @@ public void deleteDataSource(Handle handle, Long id) {
184184
}
185185

186186
public int getNumberOfDataSources() {
187-
return jdbi.withHandle(
187+
return databaseOperations.withHandle(
188188
handle -> {
189189
try (var query =
190190
handle.createQuery(
@@ -198,6 +198,6 @@ public int getNumberOfDataSources() {
198198

199199
public static RagDataSourceRepository createNull() {
200200
// the db configuration will use in-memory db based on env vars.
201-
return new RagDataSourceRepository(new JdbiConfiguration().jdbi());
201+
return new RagDataSourceRepository(JdbiConfiguration.createNull());
202202
}
203203
}

backend/src/main/java/com/cloudera/cai/rag/files/RagFileDeleteReconciler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.cloudera.cai.rag.files;
22

33
import com.cloudera.cai.rag.Types;
4+
import com.cloudera.cai.rag.configuration.DatabaseOperations;
45
import com.cloudera.cai.rag.configuration.JdbiConfiguration;
56
import com.cloudera.cai.rag.external.RagBackendClient;
67
import com.cloudera.cai.util.exceptions.NotFound;
@@ -10,31 +11,30 @@
1011
import io.opentelemetry.api.OpenTelemetry;
1112
import java.util.Set;
1213
import lombok.extern.slf4j.Slf4j;
13-
import org.jdbi.v3.core.Jdbi;
1414
import org.jdbi.v3.core.mapper.reflect.ConstructorMapper;
1515
import org.jdbi.v3.core.statement.Query;
1616
import org.springframework.stereotype.Component;
1717

1818
@Slf4j
1919
@Component
2020
public class RagFileDeleteReconciler extends BaseReconciler<Types.RagDocument> {
21-
private final Jdbi jdbi;
21+
private final DatabaseOperations databaseOperations;
2222
private final RagBackendClient ragBackendClient;
2323

2424
public RagFileDeleteReconciler(
2525
ReconcilerConfig reconcilerConfig,
2626
OpenTelemetry openTelemetry,
27-
Jdbi jdbi,
27+
DatabaseOperations databaseOperations,
2828
RagBackendClient ragBackendClient) {
2929
super(reconcilerConfig, openTelemetry);
30-
this.jdbi = jdbi;
30+
this.databaseOperations = databaseOperations;
3131
this.ragBackendClient = ragBackendClient;
3232
}
3333

3434
@Override
3535
public void resync() throws Exception {
3636
log.debug("checking for RAG documents to be deleted");
37-
jdbi.useHandle(
37+
databaseOperations.useHandle(
3838
handle -> {
3939
handle.registerRowMapper(ConstructorMapper.factory(Types.RagDocument.class));
4040
Query query =
@@ -54,7 +54,7 @@ public ReconcileResult reconcile(Set<Types.RagDocument> documents) throws Except
5454
} catch (NotFound e) {
5555
log.debug("got a not found exception from the rag backend: {}", e.getMessage());
5656
}
57-
jdbi.useHandle(
57+
databaseOperations.useHandle(
5858
handle ->
5959
handle.execute("DELETE from rag_data_source_document WHERE id = ?", document.id()));
6060
}

0 commit comments

Comments
 (0)