Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceColumn;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable;
import com.google.cloud.teleport.v2.templates.exceptions.InvalidDMLGenerationException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
Expand Down Expand Up @@ -76,50 +77,47 @@ public class CassandraDMLGenerator implements IDMLGenerator {
@Override
public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequest) {
if (dmlGeneratorRequest == null) {
LOG.warn("DMLGeneratorRequest is null. Cannot process the request.");
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
"DMLGeneratorRequest is null. Cannot process the request.");
}
ISchemaMapper schemaMapper = dmlGeneratorRequest.getSchemaMapper();
String spannerTableName = dmlGeneratorRequest.getSpannerTableName();
Ddl spannerDdl = dmlGeneratorRequest.getSpannerDdl();
SourceSchema sourceSchema = dmlGeneratorRequest.getSourceSchema();
if (schemaMapper == null || spannerDdl == null || sourceSchema == null) {
LOG.warn(
"Schema Mapper, Ddl and SourceSchema must be not null, respectively found {},{},{}.",
schemaMapper,
spannerDdl,
sourceSchema);
return new DMLGeneratorResponse("");
if (schemaMapper == null) {
throw new InvalidDMLGenerationException("Schema Mapper must be not null");
}
if (spannerDdl == null) {
throw new InvalidDMLGenerationException("Spanner Ddl must be not null.");
}
if (sourceSchema == null) {
throw new InvalidDMLGenerationException("SourceSchema must be not null.");
}
String sourceTableName = "";
try {
sourceTableName = schemaMapper.getSourceTableName("", spannerTableName);
} catch (Exception e) {
LOG.warn(
"Equivalent table for {} was not found in source, check schema mapping provided",
spannerTableName);
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
"Could not find source table name for spanner table: " + spannerTableName, e);
}

Table spannerTable = spannerDdl.table(spannerTableName);
if (spannerTable == null) {
LOG.warn("Spanner table {} not found. Dropping the record.", spannerTableName);
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
String.format(
"The spanner table %s was not found in ddl found on spanner.", spannerTableName));
}
SourceTable sourceTable = sourceSchema.table(sourceTableName);
if (sourceTable == null) {
LOG.warn(
"Source table {} not found for Spanner table Name: {}",
sourceTableName,
spannerTableName);
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
String.format("The source table %s was not found in source schema.", sourceTableName));
}

if (sourceTable.primaryKeyColumns() == null || sourceTable.primaryKeyColumns().size() == 0) {
LOG.warn(
"Cannot reverse replicate table {} without primary key. Skipping the record.",
sourceTableName);
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
String.format(
"Cannot reverse replicate for source table %s without primary key, skipping the record",
sourceTableName));
}

Map<String, PreparedStatementValueObject<?>> pkColumnNameValues =
Expand All @@ -132,10 +130,10 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
dmlGeneratorRequest.getCustomTransformationResponse());
if (pkColumnNameValues == null) {
LOG.warn(
"Failed to generate primary key values for table {}. Skipping the record.",
sourceTableName);
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
String.format(
"Cannot reverse replicate for table %s without primary key, skipping the record",
sourceTableName));
}
java.sql.Timestamp timestamp = dmlGeneratorRequest.getCommitTimestamp().toSqlTimestamp();
String modType = dmlGeneratorRequest.getModType();
Expand Down Expand Up @@ -200,8 +198,8 @@ private static DMLGeneratorResponse generateDMLResponse(
} else if ("DELETE".equals(modType)) {
return getDeleteStatementCQL(sourceTable.name(), timestamp, pkColumnNameValues);
} else {
LOG.error("Unsupported modType: {} for table {}", modType, spannerTable.name());
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
String.format("Unsupported modType: %s for table %s", modType, spannerTable.name()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceColumn;
import com.google.cloud.teleport.v2.spanner.sourceddl.SourceSchema;
import com.google.cloud.teleport.v2.spanner.type.Type;
import com.google.cloud.teleport.v2.templates.exceptions.InvalidDMLGenerationException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -43,52 +44,52 @@ public class MySQLDMLGenerator implements IDMLGenerator {

public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequest) {
if (dmlGeneratorRequest == null) {
LOG.warn("DMLGeneratorRequest is null. Cannot process the request.");
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
"DMLGeneratorRequest is null. Cannot process the request.");
}
String spannerTableName = dmlGeneratorRequest.getSpannerTableName();
ISchemaMapper schemaMapper = dmlGeneratorRequest.getSchemaMapper();
Ddl spannerDdl = dmlGeneratorRequest.getSpannerDdl();
SourceSchema sourceSchema = dmlGeneratorRequest.getSourceSchema();
if (schemaMapper == null || spannerDdl == null || sourceSchema == null) {
LOG.warn(
"Schema Mapper, Ddl and SourceSchema must be not null, respectively found {},{},{}.",
schemaMapper,
spannerDdl,
sourceSchema);
return new DMLGeneratorResponse("");

if (schemaMapper == null) {
throw new InvalidDMLGenerationException("Schema Mapper must be not null");
}
if (spannerDdl == null) {
throw new InvalidDMLGenerationException("Spanner Ddl must be not null.");
}
if (sourceSchema == null) {
throw new InvalidDMLGenerationException("SourceSchema must be not null.");
}

Table spannerTable = spannerDdl.table(spannerTableName);
if (spannerTable == null) {
LOG.warn(
"The spanner table {} was not found in ddl found on spanner. Ddl: {}",
spannerTableName,
spannerDdl);
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
String.format(
"The spanner table %s was not found in ddl found on spanner", spannerTableName));
}

String sourceTableName = "";
try {
sourceTableName = schemaMapper.getSourceTableName("", spannerTableName);
} catch (NoSuchElementException e) {
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
"Could not find source table name for spanner table: " + spannerTableName, e);
}
com.google.cloud.teleport.v2.spanner.sourceddl.SourceTable sourceTable =
sourceSchema.table(sourceTableName);
if (sourceTable == null) {
LOG.warn(
"Equivalent table {} was not found in source for spanner table {}",
sourceTableName,
spannerTableName);
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
String.format(
"Equivalent table %s was not found in source for spanner table %s",
sourceTableName, spannerTableName));
}

if (sourceTable.primaryKeyColumns() == null || sourceTable.primaryKeyColumns().size() == 0) {
LOG.warn(
"Cannot reverse replicate for source table {} without primary key, skipping the record. Source Table: {}",
sourceTableName,
sourceTable);
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
String.format(
"Cannot reverse replicate for source table %s without primary key, skipping the record.",
sourceTableName));
}

Map<String, String> pkcolumnNameValues =
Expand All @@ -101,10 +102,10 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
dmlGeneratorRequest.getSourceDbTimezoneOffset(),
dmlGeneratorRequest.getCustomTransformationResponse());
if (pkcolumnNameValues == null) {
LOG.warn(
"Cannot reverse replicate for table {} without primary key, skipping the record",
sourceTableName);
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
String.format(
"Cannot reverse replicate for table %s without primary key, skipping the record",
sourceTableName));
}

if ("INSERT".equals(dmlGeneratorRequest.getModType())
Expand All @@ -115,8 +116,10 @@ public DMLGeneratorResponse getDMLStatement(DMLGeneratorRequest dmlGeneratorRequ
} else if ("DELETE".equals(dmlGeneratorRequest.getModType())) {
return getDeleteStatement(sourceTable.name(), pkcolumnNameValues);
} else {
LOG.warn("Unsupported modType: " + dmlGeneratorRequest.getModType());
return new DMLGeneratorResponse("");
throw new InvalidDMLGenerationException(
String.format(
"Unsupported modType: %s for table %s",
dmlGeneratorRequest.getModType(), spannerTableName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.IDao;
import com.google.cloud.teleport.v2.templates.dbutils.dao.source.TransactionalCheck;
import com.google.cloud.teleport.v2.templates.dbutils.dml.IDMLGenerator;
import com.google.cloud.teleport.v2.templates.exceptions.InvalidDMLGenerationException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorRequest;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import java.time.Instant;
Expand Down Expand Up @@ -110,8 +111,7 @@ public static boolean processRecord(

DMLGeneratorResponse dmlGeneratorResponse = dmlGenerator.getDMLStatement(dmlGeneratorRequest);
if (dmlGeneratorResponse.getDmlStatement().isEmpty()) {
LOG.warn("DML statement is empty for table: " + tableName);
return false;
throw new InvalidDMLGenerationException("DML statement is empty for table: " + tableName);
}
// TODO we need to handle it as proper Interface Level as of now we have handle Prepared
// TODO Statement and Raw Statement Differently
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.exceptions;

/**
* Exception thrown when the DML Generator fails to create a valid DML statement. This effectively
* prevents silent data loss by routing these records to DLQ.
*/
public class InvalidDMLGenerationException extends RuntimeException {
public InvalidDMLGenerationException(String message) {
super(message);
}

public InvalidDMLGenerationException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.teleport.v2.spanner.exceptions.InvalidTransformationException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.ChangeEventConvertorException;
import com.google.cloud.teleport.v2.templates.constants.Constants;
import com.google.cloud.teleport.v2.templates.exceptions.InvalidDMLGenerationException;
import java.sql.SQLDataException;
import java.sql.SQLNonTransientConnectionException;
import java.sql.SQLSyntaxErrorException;
Expand All @@ -47,6 +48,7 @@ public static TupleTag<String> classify(Exception exception) {
return classifySpannerException(e);
} else if (exception instanceof ChangeEventConvertorException
|| exception instanceof IllegalArgumentException
|| exception instanceof InvalidDMLGenerationException
|| exception instanceof NullPointerException) {
return Constants.PERMANENT_ERROR_TAG;
}
Expand All @@ -61,7 +63,8 @@ private static TupleTag<String> classifySpannerException(SpannerException except
Throwable cause = exception.getCause();

if (cause instanceof InvalidTransformationException
|| cause instanceof ChangeEventConvertorException) {
|| cause instanceof ChangeEventConvertorException
|| cause instanceof InvalidDMLGenerationException) {
return Constants.PERMANENT_ERROR_TAG;
} else if (cause instanceof CodecNotFoundException
|| cause instanceof SQLSyntaxErrorException
Expand Down
Loading
Loading