Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ public void execute(JobExecutionContext jobExecutionContext) {
return;
}

try {
rdfRepository.ensureStorageReady();
} catch (Exception e) {
LOG.error("RDF storage is not ready; aborting indexing job", e);
updateJobStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(
new IndexingError()
Comment on lines +145 to +151
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a new failure path where the job aborts early if rdfRepository.ensureStorageReady() throws. Since there are existing unit tests covering execute() behavior in RdfIndexAppTest, it would be good to add/adjust tests to assert that (1) job status becomes FAILED and (2) jobData.failure is populated with an IndexingError message when storage readiness checks fail.

Copilot uses AI. Check for mistakes.
.withErrorSource(IndexingError.ErrorSource.JOB)
.withMessage("RDF storage is not ready: " + e.getMessage()));
sendUpdates(jobExecutionContext, true);
return;
}
Comment on lines +145 to +156
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New failure behavior was added here (abort the job and mark it FAILED when rdfRepository.ensureStorageReady() throws), but RdfIndexAppTest doesn’t appear to cover it. Please add a unit test that stubs ensureStorageReady() to throw and asserts: status becomes FAILED, jobData.failure.message is set, and updates are sent (or updateRecordToDbAndNotify invoked) so the failure is surfaced to users.

Copilot uses AI. Check for mistakes.

String jobName = jobExecutionContext.getJobDetail().getKey().getName();
if (jobName.equals(ON_DEMAND_JOB)) {
Map<String, Object> jsonAppConfig = JsonUtils.convertValue(jobData, Map.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public void execute(EventPublisherJob jobConfiguration) throws InterruptedExcept

public void joinJob(RdfIndexJob job, EventPublisherJob jobConfiguration)
throws InterruptedException {
RdfRepository.getInstance().ensureStorageReady();
currentJob = job;
coordinatorOwnedJob = false;
stopped.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ public String getBaseUri() {
return config.getBaseUri().toString();
}

public void ensureStorageReady() {
if (!isEnabled()) {
return;
}
storageService.ensureStorageReady();
}

public void createOrUpdate(EntityInterface entity) {
if (!isEnabled()) {
return;
Expand Down Expand Up @@ -147,6 +154,9 @@ public void createOrUpdate(EntityInterface entity) {
entity.getEntityReference().getType(),
entity.getFullyQualifiedName(),
e);
// Rethrow so callers (e.g. RdfBatchProcessor) can count this as a failure instead of
// reporting a false success. Entity-hook callers (RdfUpdater) already wrap in try/catch.
throw new RuntimeException("Failed to create/update entity in RDF", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,26 @@ public class JenaFusekiStorage implements RdfStorageInterface {

private final RDFConnection connection;
private final String baseUri;
private final String endpoint;
private final String username;
private final String password;

public JenaFusekiStorage(RdfConfiguration config) {
this.baseUri =
config.getBaseUri() != null ? config.getBaseUri().toString() : "https://open-metadata.org/";

String endpoint =
this.endpoint =
config.getRemoteEndpoint() != null && !config.getRemoteEndpoint().toString().isEmpty()
? config.getRemoteEndpoint().toString()
: "http://openmetadata-fuseki:3030/openmetadata";
this.username = config.getUsername();
this.password = config.getPassword();

// Ensure the dataset exists before connecting
ensureDatasetExists(endpoint, config.getUsername(), config.getPassword());
// Best-effort attempt to create the dataset at startup; callers should invoke
// ensureStorageReady() before running work to recover from later restarts of the RDF server.
ensureDatasetExists(endpoint, username, password);

if (config.getUsername() != null && config.getPassword() != null) {
if (username != null && password != null) {
java.net.http.HttpClient httpClient =
java.net.http.HttpClient.newBuilder()
.authenticator(
Expand All @@ -74,6 +80,30 @@ protected java.net.PasswordAuthentication getPasswordAuthentication() {
loadOntology();
}

@Override
public void ensureStorageReady() {
if (testConnection()) {
LOG.debug("Fuseki dataset at {} is accessible", endpoint);
return;
}
Comment on lines +84 to +88
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensureStorageReady() currently treats the storage as ready if a SPARQL query succeeds (testConnection()), but indexing requires write access too (e.g., storeEntity() calls connection.update() and connection.load()). In a configuration where queries are allowed but updates are blocked (common with Fuseki auth/permissions), ensureStorageReady() will return successfully and the job will still run and fail every entity. Consider extending readiness checks to also validate a lightweight update/write operation (e.g., a no-op/safe DELETE WHERE against a dedicated healthcheck graph) and throw with an error message that explicitly calls out missing update permissions when writes are not allowed.

Copilot uses AI. Check for mistakes.

LOG.warn(
"Fuseki dataset at {} is not accessible; attempting to (re)create it before running",
endpoint);
ensureDatasetExists(endpoint, username, password);

if (!testConnection()) {
Comment on lines +84 to +95
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensureStorageReady() calls testConnection() twice on the unhappy path. In JenaFusekiStorage, testConnection() currently logs at ERROR with a full stack trace for expected transient/unready states, so a single readiness check can emit multiple ERROR stack traces before the method throws its own IllegalStateException. Consider reducing testConnection() logging to DEBUG/INFO (or only logging in ensureStorageReady()), so readiness probes don’t spam logs during failures/retries.

Copilot uses AI. Check for mistakes.
throw new IllegalStateException(
String.format(
"RDF storage is not accessible at %s after attempting dataset creation. "
+ "Verify the configured RDF endpoint URL, credentials, that the Fuseki dataset "
+ "exists, and that the configured user has permission to create it.",
endpoint));
}
LOG.info("Fuseki dataset at {} is now ready", endpoint);
loadOntology();
}

/**
* Ensures the Fuseki dataset exists, creating it if necessary.
* Parses the endpoint URL to extract the server base URL and dataset name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ void storeRelationship(
*/
boolean testConnection();

/**
* Verify the underlying storage is reachable and the configured dataset/graph is accessible,
* attempting to create it if missing. Implementations must throw if the storage cannot be
* brought to a ready state so callers can surface a clear error instead of silently producing
* partial results.
*/
default void ensureStorageReady() {}

/**
* Get storage type identifier
*/
Expand Down
Loading