diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesDAO.java
index 69646d725c9c..056db754e2d1 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesDAO.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesDAO.java
@@ -400,6 +400,35 @@ default void deleteBeforeTimestamp(String entityFQNHash, String extension, Long
deleteBeforeTimestamp(getTimeSeriesTableName(), entityFQNHash, extension, timestamp);
}
+ @ConnectionAwareSqlUpdate(
+ value =
+ "DELETE FROM
"
+ + "WHERE entityFQNHash = :entityFQNHash "
+ + "AND extension = :extension "
+ + "",
+ connectionType = MYSQL)
+ @ConnectionAwareSqlUpdate(
+ value =
+ "DELETE FROM "
+ + "WHERE entityFQNHash = :entityFQNHash "
+ + "AND extension = :extension "
+ + "",
+ connectionType = POSTGRES)
+ void deleteExtensionByKeyInternal(
+ @Define("table") String table,
+ @Bind("value") String value,
+ @BindFQN("entityFQNHash") String entityFQNHash,
+ @Bind("extension") String extension,
+ @Define("mysqlCond") String mysqlCond,
+ @Define("psqlCond") String psqlCond);
+
+ default void deleteExtensionByKey(String key, String value, String entityFQN, String extension) {
+ String mysqlCond = String.format("AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.%s')) = :value", key);
+ String psqlCond = String.format("AND json->>'%s' = :value", key);
+ deleteExtensionByKeyInternal(
+ getTimeSeriesTableName(), value, entityFQN, extension, mysqlCond, psqlCond);
+ }
+
@SqlQuery(
"SELECT json FROM where entityFQNHash = :entityFQNHash and extension = :extension "
+ " AND timestamp >= :startTs and timestamp <= :endTs ORDER BY timestamp DESC")
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java
index 8278d52b3b80..c81317a08a63 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java
@@ -488,6 +488,20 @@ public PipelineStatus getPipelineStatus(String ingestionPipelineFQN, UUID pipeli
PipelineStatus.class);
}
+ @Transaction
+ public IngestionPipeline deletePipelineStatusByRunId(UUID ingestionPipelineId, UUID runId) {
+ IngestionPipeline ingestionPipeline = find(ingestionPipelineId, Include.NON_DELETED);
+ daoCollection
+ .entityExtensionTimeSeriesDao()
+ .deleteExtensionByKey(
+ RUN_ID_EXTENSION_KEY,
+ runId.toString(),
+ ingestionPipeline.getFullyQualifiedName(),
+ PIPELINE_STATUS_EXTENSION);
+ setFieldsInternal(ingestionPipeline, Fields.EMPTY_FIELDS);
+ return ingestionPipeline;
+ }
+
/**
* Handles entity updated from PUT and POST operation.
*/
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java
index e0aeb5725deb..4e7dff9f910b 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java
@@ -1214,6 +1214,34 @@ public PipelineStatus getPipelineStatus(
return repository.getPipelineStatus(fqn, runId);
}
+ @DELETE
+ @Path("/{id}/pipelineStatus/{runId}")
+ @Operation(
+ operationId = "deletePipelineStatusByRunId",
+ summary = "Delete pipeline status by run ID",
+ description =
+ "Delete a specific pipeline status by its run ID for the given ingestion pipeline.",
+ responses = {
+ @ApiResponse(responseCode = "204", description = "Pipeline status deleted successfully"),
+ @ApiResponse(
+ responseCode = "404",
+ description = "Ingestion Pipeline or Pipeline Status not found")
+ })
+ public Response deletePipelineStatusByRunId(
+ @Context UriInfo uriInfo,
+ @Context SecurityContext securityContext,
+ @Parameter(description = "Id of the Ingestion Pipeline", schema = @Schema(type = "UUID"))
+ @PathParam("id")
+ UUID id,
+ @Parameter(description = "Run ID of the pipeline status", schema = @Schema(type = "UUID"))
+ @PathParam("runId")
+ UUID runId) {
+ OperationContext operationContext = new OperationContext(entityType, MetadataOperation.DELETE);
+ authorizer.authorize(securityContext, operationContext, getResourceContextById(id));
+ repository.deletePipelineStatusByRunId(id, runId);
+ return Response.noContent().build();
+ }
+
@DELETE
@Path("/{id}/pipelineStatus")
@Operation(
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java
index 798630bd912a..a9b8f5d2fb72 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java
@@ -7,6 +7,7 @@
import static org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils.timestampToString;
import static org.openmetadata.service.formatter.decorators.MessageDecorator.getDateStringEpochMilli;
import static org.openmetadata.service.jdbi3.UserRepository.AUTH_MECHANISM_FIELD;
+import static org.openmetadata.service.resources.services.ingestionpipelines.IngestionPipelineResource.COLLECTION_PATH;
import static org.openmetadata.service.util.AsciiTable.printOpenMetadataText;
import static org.openmetadata.service.util.UserUtil.updateUserWithHashedPwd;
@@ -25,7 +26,11 @@
import io.dropwizard.jersey.validation.Validators;
import jakarta.validation.Validator;
import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -35,15 +40,18 @@
import java.util.Objects;
import java.util.Scanner;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.openmetadata.schema.EntityInterface;
-import org.openmetadata.schema.ServiceEntityInterface;
import org.openmetadata.schema.api.configuration.OpenMetadataBaseUrlConfiguration;
+import org.openmetadata.schema.auth.JWTAuthMechanism;
import org.openmetadata.schema.email.SmtpSettings;
+import org.openmetadata.schema.entity.Bot;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
import org.openmetadata.schema.entity.app.AppRunRecord;
@@ -53,6 +61,7 @@
import org.openmetadata.schema.entity.applications.configuration.internal.BackfillConfiguration;
import org.openmetadata.schema.entity.applications.configuration.internal.DataInsightsAppConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
+import org.openmetadata.schema.entity.teams.AuthenticationMechanism;
import org.openmetadata.schema.entity.teams.Role;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.services.connections.metadata.AuthProvider;
@@ -80,6 +89,7 @@
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.jdbi3.AppMarketPlaceRepository;
import org.openmetadata.service.jdbi3.AppRepository;
+import org.openmetadata.service.jdbi3.BotRepository;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.EventSubscriptionRepository;
@@ -149,8 +159,10 @@ public class OpenMetadataOperations implements Callable {
public Integer call() {
LOG.info(
"Subcommand needed: 'info', 'validate', 'repair', 'check-connection', "
- + "'drop-create', 'changelog', 'migrate', 'migrate-secrets', 'reindex', 'reindex-rdf', 'deploy-pipelines', "
- + "'dbServiceCleanup', 'relationshipCleanup', 'tagUsageCleanup', 'drop-indexes', 'remove-security-config', 'create-indexes'");
+ + "'drop-create', 'changelog', 'migrate', 'migrate-secrets', 'reindex', 'reindex-rdf', 'reindexdi', 'deploy-pipelines', "
+ + "'dbServiceCleanup', 'relationshipCleanup', 'tagUsageCleanup', 'drop-indexes', 'remove-security-config', 'create-indexes', "
+ + "'setOpenMetadataUrl', 'configureEmailSettings', 'install-app', 'delete-app', 'create-user', 'reset-password', "
+ + "'syncAlertOffset', 'analyze-tables'");
LOG.info(
"Use 'reindex --auto-tune' for automatic performance optimization based on cluster capabilities");
return 0;
@@ -1381,11 +1393,8 @@ private int executeRdfReindexApp(Set entities, int batchSize, boolean re
@Command(name = "deploy-pipelines", description = "Deploy all the service pipelines.")
public Integer deployPipelines() {
try {
- LOG.info("Deploying Pipelines");
+ LOG.info("Deploying Pipelines via API");
parseConfig();
- PipelineServiceClientInterface pipelineServiceClient =
- PipelineServiceClientFactory.createPipelineServiceClient(
- config.getPipelineServiceClientConfiguration());
IngestionPipelineRepository pipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
List pipelines =
@@ -1395,9 +1404,11 @@ public Integer deployPipelines() {
LOG.debug(String.format("Pipelines %d", pipelines.size()));
List columns = Arrays.asList("Name", "Type", "Service Name", "Status");
List> pipelineStatuses = new ArrayList<>();
- for (IngestionPipeline pipeline : pipelines) {
- deployPipeline(pipeline, pipelineServiceClient, pipelineStatuses);
+
+ if (!pipelines.isEmpty()) {
+ deployPipelinesViaAPI(pipelines, pipelineStatuses);
}
+
printToAsciiTable(columns, pipelineStatuses, "No Pipelines Found");
return 0;
} catch (Exception e) {
@@ -1660,36 +1671,219 @@ private void analyzeEntityTable(String entity) {
}
}
- private void deployPipeline(
- IngestionPipeline pipeline,
- PipelineServiceClientInterface pipelineServiceClient,
+ private void deployPipelinesViaAPI(
+ List pipelines, List> pipelineStatuses) {
+ try {
+ // Get ingestion-bot JWT token
+ String jwtToken = getIngestionBotToken();
+ if (jwtToken == null) {
+ throw new RuntimeException("Failed to retrieve ingestion-bot JWT token");
+ }
+
+ // Get server API URL from config
+ String serverUrl = getServerApiUrl();
+ if (serverUrl == null) {
+ throw new RuntimeException("SERVER_HOST_API_URL not configured");
+ }
+ LOG.info("Deploying pipelines to server URL: {}", serverUrl);
+
+ // Create HTTP client
+ HttpClient client = HttpClient.newBuilder().connectTimeout(Duration.ofSeconds(30)).build();
+
+ // Process pipelines in chunks of 20
+ int chunkSize = 20;
+ int totalPipelines = pipelines.size();
+ LOG.info(
+ "Deploying {} pipelines via bulk API calls in chunks of {}", totalPipelines, chunkSize);
+
+ List> pipelineChunks = chunkList(pipelines, chunkSize);
+
+ for (int chunkIndex = 0; chunkIndex < pipelineChunks.size(); chunkIndex++) {
+ List chunk = pipelineChunks.get(chunkIndex);
+ LOG.info(
+ "Processing chunk {} of {} (pipelines {}-{})",
+ chunkIndex + 1,
+ pipelineChunks.size(),
+ chunkIndex * chunkSize + 1,
+ Math.min((chunkIndex + 1) * chunkSize, totalPipelines));
+
+ deployPipelineChunk(client, jwtToken, serverUrl, chunk, pipelineStatuses);
+ }
+
+ LOG.info("Completed bulk deployment of {} pipelines", totalPipelines);
+ } catch (Exception e) {
+ LOG.error("Failed to deploy pipelines via API", e);
+ // Mark all pipelines as failed
+ for (IngestionPipeline pipeline : pipelines) {
+ pipelineStatuses.add(
+ Arrays.asList(
+ pipeline.getName(),
+ pipeline.getPipelineType().value(),
+ pipeline.getService().getName(),
+ "FAILED - " + e.getMessage()));
+ }
+ }
+ }
+
+ private void deployPipelineChunk(
+ HttpClient client,
+ String jwtToken,
+ String serverUrl,
+ List pipelineChunk,
List> pipelineStatuses) {
try {
- LOG.debug(String.format("deploying pipeline %s", pipeline.getName()));
- pipeline.setOpenMetadataServerConnection(
- new OpenMetadataConnectionBuilder(config, pipeline).build());
- secretsManager.decryptIngestionPipeline(pipeline);
- ServiceEntityInterface service =
- Entity.getEntity(pipeline.getService(), "", Include.NON_DELETED);
- pipelineServiceClient.deployPipeline(pipeline, service);
+ // Collect pipeline IDs for this chunk
+ List pipelineIds =
+ pipelineChunk.stream().map(IngestionPipeline::getId).collect(Collectors.toList());
+
+ // Make bulk deploy API call for this chunk
+ String jsonBody = JsonUtils.pojoToJson(pipelineIds);
+
+ HttpRequest request =
+ HttpRequest.newBuilder()
+ .uri(URI.create(serverUrl + COLLECTION_PATH + "bulk/deploy"))
+ .header("Authorization", "Bearer " + jwtToken)
+ .header("Content-Type", "application/json")
+ .POST(HttpRequest.BodyPublishers.ofString(jsonBody))
+ .timeout(Duration.ofMinutes(2))
+ .build();
+
+ HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString());
+
+ if (response.statusCode() == 200) {
+ LOG.debug("Chunk deployment completed successfully");
+ // Parse response and update status table for this chunk
+ updatePipelineStatuses(pipelineChunk, response.body(), pipelineStatuses);
+ } else {
+ LOG.error(
+ "Chunk deployment failed with status: {} - {}", response.statusCode(), response.body());
+ // Mark chunk pipelines as failed
+ for (IngestionPipeline pipeline : pipelineChunk) {
+ pipelineStatuses.add(
+ Arrays.asList(
+ pipeline.getName(),
+ pipeline.getPipelineType().value(),
+ pipeline.getService().getName(),
+ "FAILED - HTTP " + response.statusCode()));
+ }
+ }
} catch (Exception e) {
- LOG.error(
- String.format(
- "Failed to deploy pipeline %s of type %s for service %s",
- pipeline.getName(),
- pipeline.getPipelineType().value(),
- pipeline.getService().getName()),
- e);
- pipeline.setDeployed(false);
- } finally {
- LOG.debug("update the pipeline");
- collectionDAO.ingestionPipelineDAO().update(pipeline);
- pipelineStatuses.add(
- Arrays.asList(
- pipeline.getName(),
- pipeline.getPipelineType().value(),
- pipeline.getService().getName(),
- pipeline.getDeployed().toString()));
+ LOG.error("Failed to deploy pipeline chunk", e);
+ // Mark chunk pipelines as failed
+ for (IngestionPipeline pipeline : pipelineChunk) {
+ pipelineStatuses.add(
+ Arrays.asList(
+ pipeline.getName(),
+ pipeline.getPipelineType().value(),
+ pipeline.getService().getName(),
+ "FAILED - " + e));
+ }
+ }
+ }
+
+ private List> chunkList(List list, int chunkSize) {
+ List> chunks = new ArrayList<>();
+ for (int i = 0; i < list.size(); i += chunkSize) {
+ int end = Math.min(list.size(), i + chunkSize);
+ chunks.add(list.subList(i, end));
+ }
+ return chunks;
+ }
+
+ private String getIngestionBotToken() {
+ try {
+ // Use the same pattern as OpenMetadataConnectionBuilder
+ BotRepository botRepository = (BotRepository) Entity.getEntityRepository(Entity.BOT);
+ UserRepository userRepository = (UserRepository) Entity.getEntityRepository(Entity.USER);
+
+ // First get the bot entity
+ Bot bot =
+ botRepository.getByName(null, Entity.INGESTION_BOT_NAME, new EntityUtil.Fields(Set.of()));
+ if (bot == null || bot.getBotUser() == null) {
+ LOG.error("Ingestion bot not found or bot has no associated user");
+ return null;
+ }
+
+ // Get the bot user with authentication mechanism
+ User botUser =
+ userRepository.getByName(
+ null,
+ bot.getBotUser().getFullyQualifiedName(),
+ new EntityUtil.Fields(Set.of("authenticationMechanism")));
+
+ if (botUser == null || botUser.getAuthenticationMechanism() == null) {
+ LOG.error("Bot user not found or missing authentication mechanism");
+ return null;
+ }
+
+ // Extract and decrypt the JWT token
+ AuthenticationMechanism authMechanism = botUser.getAuthenticationMechanism();
+ if (authMechanism.getAuthType() != AuthenticationMechanism.AuthType.JWT) {
+ LOG.error("Bot user does not have JWT authentication mechanism");
+ return null;
+ }
+
+ JWTAuthMechanism jwtAuthMechanism =
+ JsonUtils.convertValue(authMechanism.getConfig(), JWTAuthMechanism.class);
+
+ // Decrypt the JWT token - this is the crucial step that was missing
+ secretsManager.decryptJWTAuthMechanism(jwtAuthMechanism);
+
+ return jwtAuthMechanism.getJWTToken();
+ } catch (Exception e) {
+ LOG.error("Failed to retrieve ingestion-bot token", e);
+ return null;
+ }
+ }
+
+ private String getServerApiUrl() {
+ if (config.getPipelineServiceClientConfiguration() != null
+ && config.getPipelineServiceClientConfiguration().getMetadataApiEndpoint() != null) {
+ String serverUrl = config.getPipelineServiceClientConfiguration().getMetadataApiEndpoint();
+ return serverUrl.endsWith("/") ? serverUrl : serverUrl + "/";
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void updatePipelineStatuses(
+ List pipelines, String responseBody, List> pipelineStatuses) {
+ try {
+ // Parse the bulk deploy response
+ List