diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java index dc1cb5cf9..d09ca2f57 100644 --- a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/DDLBaseIT.java @@ -31,7 +31,7 @@ public class DDLBaseIT { .withExposedPorts(8123); @BeforeEach - public void startContainers() throws InterruptedException { + public void startContainers() throws Exception { mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:8.0.36") .asCompatibleSubstituteFor("mysql")) .withDatabaseName("employees").withUsername("root").withPassword("adminpass") diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLGeneratedColumnsIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLGeneratedColumnsIT.java new file mode 100644 index 000000000..384eee724 --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLGeneratedColumnsIT.java @@ -0,0 +1,365 @@ +package com.altinity.clickhouse.debezium.embedded.ddl.parser; + +import com.altinity.clickhouse.debezium.embedded.ITCommon; +import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.altinity.clickhouse.sink.connector.db.HikariDbSource; +import com.altinity.clickhouse.sink.connector.db.DBMetadata; +import org.apache.log4j.BasicConfigurator; +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +@DisplayName("Integration Test that validates handling of MySQL Generated Columns DDL and data replication") +@Testcontainers +public class MySQLGeneratedColumnsIT extends DDLBaseIT { + + @BeforeEach + public void startContainers() throws Exception { + mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:8.0.36") + .asCompatibleSubstituteFor("mysql")) + .withDatabaseName("employees").withUsername("root").withPassword("adminpass") + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + BasicConfigurator.configure(); + mySqlContainer.start(); + clickHouseContainer.start(); + Thread.sleep(15000); + + // Create tables and insert data programmatically + setupDatabase(); + } + + private void setupDatabase() throws Exception { + Connection conn = connectToMySQL(); + + // Create database + conn.prepareStatement("CREATE DATABASE IF NOT EXISTS employees").execute(); + conn.prepareStatement("USE employees").execute(); + + // 1. Basic Generated Column (Stored) + conn.prepareStatement(""" + CREATE TABLE basic_generated_column ( + id INT PRIMARY KEY, + width DECIMAL(10,2), + height DECIMAL(10,2), + area DECIMAL(10,2) GENERATED ALWAYS AS (width * height) STORED + ) + """).execute(); + + // 2. Virtual Generated Column + conn.prepareStatement(""" + CREATE TABLE virtual_column_example ( + id INT PRIMARY KEY, + first_name VARCHAR(50), + last_name VARCHAR(50), + full_name VARCHAR(100) GENERATED ALWAYS AS (CONCAT(first_name, ' ', last_name)) VIRTUAL + ) + """).execute(); + + // 3. Multiple Generated Columns + conn.prepareStatement(""" + CREATE TABLE complex_generated_columns ( + id INT PRIMARY KEY, + base_salary DECIMAL(10,2), + tax_rate DECIMAL(5,2), + gross_salary DECIMAL(10,2) GENERATED ALWAYS AS (base_salary * (1 + tax_rate/100)) STORED, + annual_salary DECIMAL(10,2) GENERATED ALWAYS AS (gross_salary * 12) STORED + ) + """).execute(); + + // 4. JSON Generated Columns + conn.prepareStatement(""" + CREATE TABLE json_generated_columns ( + id INT PRIMARY KEY, + order_details JSON, + total_amount DECIMAL(10,2) GENERATED ALWAYS AS ( + JSON_EXTRACT(order_details, '$.price') * JSON_EXTRACT(order_details, '$.quantity') + ) STORED, + product_name VARCHAR(255) GENERATED ALWAYS AS ( + JSON_UNQUOTE(JSON_EXTRACT(order_details, '$.product_name')) + ) VIRTUAL + ) + """).execute(); + + // 5. Mathematical Generated Columns + conn.prepareStatement(""" + CREATE TABLE mathematical_generated_columns ( + id INT PRIMARY KEY, + radius DECIMAL(10,2), + circle_area DECIMAL(10,2) GENERATED ALWAYS AS (PI() * radius * radius) STORED, + circle_circumference DECIMAL(10,2) GENERATED ALWAYS AS (2 * PI() * radius) STORED + ) + """).execute(); + + // 6. Base table for ALTER operations + conn.prepareStatement(""" + CREATE TABLE base_table ( + id INT PRIMARY KEY, + first_name VARCHAR(50), + last_name VARCHAR(50) + ) + """).execute(); + +// // 7. Employee status table for complex conditions +// conn.prepareStatement(""" +// CREATE TABLE employee_status ( +// id INT PRIMARY KEY, +// hire_date DATE, +// termination_date DATE, +// employment_status VARCHAR(20) GENERATED ALWAYS AS ( +// CASE +// WHEN termination_date IS NULL THEN 'Active' +// WHEN termination_date < CURRENT_DATE THEN 'Terminated' +// ELSE 'Pending Termination' +// END +// ) VIRTUAL +// ) +// """).execute(); + + // 8. Product pricing with enum-based generated columns + conn.prepareStatement(""" + CREATE TABLE product_pricing ( + id INT PRIMARY KEY, + base_price DECIMAL(10,2), + category ENUM('Low', 'Medium', 'High'), + price_category VARCHAR(20) GENERATED ALWAYS AS ( + CASE + WHEN base_price < 10 THEN 'Budget' + WHEN base_price BETWEEN 10 AND 50 THEN 'Mid-range' + ELSE 'Premium' + END + ) STORED + ) + """).execute(); + + // 9. Constrained generated columns + conn.prepareStatement(""" + CREATE TABLE constrained_generated_columns ( + id INT PRIMARY KEY, + temperature DECIMAL(5,2), + temperature_category VARCHAR(20) GENERATED ALWAYS AS ( + CASE + WHEN temperature < 0 THEN 'Freezing' + WHEN temperature BETWEEN 0 AND 20 THEN 'Cold' + WHEN temperature BETWEEN 20 AND 30 THEN 'Warm' + ELSE 'Hot' + END + ) STORED, + CONSTRAINT chk_temperature CHECK (temperature BETWEEN -50 AND 100) + ) + """).execute(); + + // Insert sample data for testing + conn.prepareStatement(""" + INSERT INTO basic_generated_column (id, width, height) VALUES + (1, 10.5, 20.3), + (2, 15.7, 8.9), + (3, 12.0, 25.5) + """).execute(); + + conn.prepareStatement(""" + INSERT INTO virtual_column_example (id, first_name, last_name) VALUES + (1, 'John', 'Doe'), + (2, 'Jane', 'Smith'), + (3, 'Bob', 'Johnson') + """).execute(); + + conn.prepareStatement(""" + INSERT INTO complex_generated_columns (id, base_salary, tax_rate) VALUES + (1, 5000.00, 15.0), + (2, 7500.00, 20.0), + (3, 10000.00, 25.0) + """).execute(); + + conn.prepareStatement(""" + INSERT INTO json_generated_columns (id, order_details) VALUES + (1, '{"price": 25.50, "quantity": 2, "product_name": "Widget A"}'), + (2, '{"price": 15.75, "quantity": 3, "product_name": "Widget B"}'), + (3, '{"price": 100.00, "quantity": 1, "product_name": "Premium Widget"}') + """).execute(); + + conn.prepareStatement(""" + INSERT INTO mathematical_generated_columns (id, radius) VALUES + (1, 5.0), + (2, 10.0), + (3, 7.5) + """).execute(); + + conn.prepareStatement(""" + INSERT INTO base_table (id, first_name, last_name) VALUES + (1, 'Alice', 'Cooper'), + (2, 'Charlie', 'Brown'), + (3, 'Diana', 'Prince') + """).execute(); + +// conn.prepareStatement(""" +// INSERT INTO employee_status (id, hire_date, termination_date) VALUES +// (1, '2020-01-15', NULL), +// (2, '2019-03-20', '2023-12-31'), +// (3, '2021-06-10', '2025-03-15') +// """).execute(); + + conn.prepareStatement(""" + INSERT INTO product_pricing (id, base_price, category) VALUES + (1, 5.99, 'Low'), + (2, 25.50, 'Medium'), + (3, 150.00, 'High') + """).execute(); + + conn.prepareStatement(""" + INSERT INTO constrained_generated_columns (id, temperature) VALUES + (1, -10.5), + (2, 15.0), + (3, 25.5), + (4, 45.0) + """).execute(); + } + + @Test + public void testGeneratedColumns() throws Exception { + AtomicReference engine = new AtomicReference<>(); + + ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.execute(() -> { + try { + Properties properties = getDebeziumProperties(); + engine.set(new DebeziumChangeEventCapture()); + engine.get().setup(properties, new SourceRecordParserService(), false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread.sleep(10000); + + Connection conn = connectToMySQL(); + + // Test ALTER TABLE operations with generated columns + conn.prepareStatement( + "ALTER TABLE base_table ADD COLUMN full_name VARCHAR(100) " + + "GENERATED ALWAYS AS (CONCAT(first_name, ' ', last_name)) VIRTUAL" + ).execute(); + + conn.prepareStatement( + "ALTER TABLE base_table MODIFY COLUMN full_name VARCHAR(150) " + + "GENERATED ALWAYS AS (CONCAT(first_name, ' ', IFNULL(last_name, ''))) VIRTUAL" + ).execute(); + + // Wait for initial data sync and DDL processing + Thread.sleep(25000); + + BaseDbWriter writer = ITCommon.getDBWriter(clickHouseContainer); + DBMetadata dbMetadata = new DBMetadata(getDebeziumProperties()); + + // Validate basic generated column structure + Map basicColumns = dbMetadata.getColumnsDataTypesForTable( + writer.getConnection(), "basic_generated_column", "employees"); + + Assert.assertTrue("ID column should exist", basicColumns.containsKey("id")); + Assert.assertTrue("Width column should exist", basicColumns.containsKey("width")); + Assert.assertTrue("Height column should exist", basicColumns.containsKey("height")); + Assert.assertTrue("Area column should exist", basicColumns.containsKey("area")); + + Assert.assertTrue(basicColumns.get("id").equalsIgnoreCase("Int32")); + Assert.assertTrue(basicColumns.get("width").equalsIgnoreCase("Nullable(Decimal(10, 2))")); + Assert.assertTrue(basicColumns.get("height").equalsIgnoreCase("Nullable(Decimal(10, 2))")); + Assert.assertTrue(basicColumns.get("area").equalsIgnoreCase("Nullable(Decimal(10, 2))")); + + // Verify data was replicated correctly + Connection chConn = writer.getConnection(); + PreparedStatement ps = chConn.prepareStatement( + "SELECT id, width, height, area FROM employees.basic_generated_column ORDER BY id"); + ResultSet rs = ps.executeQuery(); + + Assert.assertTrue("Should have at least one row", rs.next()); + Assert.assertEquals(1, rs.getInt("id")); + Assert.assertEquals(10.50, rs.getDouble("width"), 0.01); + Assert.assertEquals(20.30, rs.getDouble("height"), 0.01); + Assert.assertEquals(213.15, rs.getDouble("area"), 0.01); + + // Test virtual generated columns + Map virtualColumns = dbMetadata.getColumnsDataTypesForTable( + writer.getConnection(), "virtual_column_example", "employees"); + + Assert.assertTrue("Full name column should exist", virtualColumns.containsKey("full_name")); + Assert.assertTrue(virtualColumns.get("full_name").equalsIgnoreCase("Nullable(String)")); + + // Test complex generated columns + Map complexColumns = dbMetadata.getColumnsDataTypesForTable( + writer.getConnection(), "complex_generated_columns", "employees"); + + Assert.assertTrue("Gross salary column should exist", complexColumns.containsKey("gross_salary")); + Assert.assertTrue("Annual salary column should exist", complexColumns.containsKey("annual_salary")); + Assert.assertTrue(complexColumns.get("gross_salary").equalsIgnoreCase("Nullable(Decimal(10, 2))")); + Assert.assertTrue(complexColumns.get("annual_salary").equalsIgnoreCase("Nullable(Decimal(10, 2))")); + + // Test JSON generated columns + Map jsonColumns = dbMetadata.getColumnsDataTypesForTable( + writer.getConnection(), "json_generated_columns", "employees"); + + Assert.assertTrue("Total amount column should exist", jsonColumns.containsKey("total_amount")); + Assert.assertTrue("Product name column should exist", jsonColumns.containsKey("product_name")); + Assert.assertTrue(jsonColumns.get("total_amount").equalsIgnoreCase("Nullable(Decimal(10, 2))")); + Assert.assertTrue(jsonColumns.get("product_name").equalsIgnoreCase("Nullable(String)")); + + // Test mathematical generated columns + Map mathColumns = dbMetadata.getColumnsDataTypesForTable( + writer.getConnection(), "mathematical_generated_columns", "employees"); + + Assert.assertTrue("Circle area column should exist", mathColumns.containsKey("circle_area")); + Assert.assertTrue("Circle circumference column should exist", mathColumns.containsKey("circle_circumference")); + Assert.assertTrue(mathColumns.get("circle_area").equalsIgnoreCase("Nullable(Decimal(10, 2))")); + Assert.assertTrue(mathColumns.get("circle_circumference").equalsIgnoreCase("Nullable(Decimal(10, 2))")); + + // Test ALTER TABLE generated columns + Map baseTableColumns = dbMetadata.getColumnsDataTypesForTable( + writer.getConnection(), "base_table", "employees"); + Assert.assertTrue("Full name column should exist after ALTER", baseTableColumns.containsKey("full_name")); + Assert.assertTrue(baseTableColumns.get("full_name").equalsIgnoreCase("Nullable(String)")); + + // Test employee status generated columns + Map employeeColumns = dbMetadata.getColumnsDataTypesForTable( + writer.getConnection(), "employee_status", "employees"); + Assert.assertTrue("Employment status column should exist", employeeColumns.containsKey("employment_status")); + Assert.assertTrue(employeeColumns.get("employment_status").equalsIgnoreCase("Nullable(String)")); + + // Test product pricing generated columns + Map pricingColumns = dbMetadata.getColumnsDataTypesForTable( + writer.getConnection(), "product_pricing", "employees"); + Assert.assertTrue("Price category column should exist", pricingColumns.containsKey("price_category")); + Assert.assertTrue(pricingColumns.get("price_category").equalsIgnoreCase("Nullable(String)")); + + // Test constrained generated columns + Map constrainedColumns = dbMetadata.getColumnsDataTypesForTable( + writer.getConnection(), "constrained_generated_columns", "employees"); + Assert.assertTrue("Temperature category column should exist", constrainedColumns.containsKey("temperature_category")); + Assert.assertTrue(constrainedColumns.get("temperature_category").equalsIgnoreCase("Nullable(String)")); + + + if (engine.get() != null) { + engine.get().stop(); + } + executorService.shutdown(); + HikariDbSource.close(); + } + + + +} \ No newline at end of file diff --git a/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLTriggersProceduresIT.java b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLTriggersProceduresIT.java new file mode 100644 index 000000000..67403db3c --- /dev/null +++ b/sink-connector-lightweight/src/test/java/com/altinity/clickhouse/debezium/embedded/ddl/parser/MySQLTriggersProceduresIT.java @@ -0,0 +1,252 @@ +package com.altinity.clickhouse.debezium.embedded.ddl.parser; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import com.altinity.clickhouse.debezium.embedded.ITCommon; +import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; +import com.altinity.clickhouse.debezium.embedded.config.SinkConnectorLightWeightConfig; +import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; +import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; +import com.altinity.clickhouse.sink.connector.db.BaseDbWriter; +import com.altinity.clickhouse.sink.connector.db.DBMetadata; +import com.clickhouse.jdbc.ClickHouseConnection; + +import org.apache.log4j.BasicConfigurator; +import org.junit.Assert; +import org.junit.jupiter.api.*; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.utility.DockerImageName; + +/** + * Integration Test class to validate MySQL triggers, stored procedures, and functions + * with ClickHouse replication using Debezium. + */ +@Disabled +@DisplayName("Integration Test to validate MySQL triggers, procedures and functions with ClickHouse replication") +public class MySQLTriggersProceduresIT { + + private static AtomicReference engine; + private static ExecutorService executorService; + private static Connection mysqlConn; + private static BaseDbWriter writer; + + @Container + public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest") + .asCompatibleSubstituteFor("clickhouse")) + .withInitScript("init_clickhouse_it.sql") + .withUsername("ch_user") + .withPassword("password") + .withExposedPorts(8123); + + @Container + public static MySQLContainer mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:8.0.36") + .asCompatibleSubstituteFor("mysql")) + .withDatabaseName("test_db") + .withUsername("root") + .withPassword("adminpass") + .withInitScript("triggers_procedures_setup.sql") + .withExtraHost("mysql-server", "0.0.0.0") + .waitingFor(new HttpWaitStrategy().forPort(3306)); + + @BeforeAll + public static void startContainers() { + try { + BasicConfigurator.configure(); + mySqlContainer.start(); + clickHouseContainer.start(); + Thread.sleep(10000); + + setupDebeziumEngine(); + mysqlConn = connectToMySQL(); + String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), "test_db"); + Connection connection = BaseDbWriter.createConnection(jdbcUrl, "client_1", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), + "test_db", new ClickHouseSinkConnectorConfig(new HashMap<>())); + writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(), + "test_db", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, connection); + Thread.sleep(20000); + } catch (Exception e) { + throw new RuntimeException("Failed to start containers", e); + } + } + + private static void setupDebeziumEngine() throws Exception { + engine = new AtomicReference<>(); + Properties props = ITCommon.getDebeziumProperties(mySqlContainer, clickHouseContainer); + executorService = Executors.newSingleThreadExecutor(); + executorService.execute(() -> { + try { + DebeziumChangeEventCapture capture = new DebeziumChangeEventCapture(); + engine.set(capture); + capture.setup(props, new SourceRecordParserService(), false); + } catch (Exception e) { + throw new RuntimeException("Failed to start Debezium engine", e); + } + }); + Thread.sleep(20000); + } + + protected static Connection connectToMySQL() { + return ITCommon.connectToMySQL(mySqlContainer); + } + + @AfterAll + public static void stopEngine() throws Exception { + if (engine != null && engine.get() != null) { + engine.get().stop(); + } + executorService.shutdownNow(); + } + + @Test + @DisplayName("Test customer insert trigger") + public void testCustomerInsertTrigger() throws Exception { + // Insert a new customer + mysqlConn.prepareStatement( + "INSERT INTO customers (name, email) VALUES ('Test User', 'test.user@example.com')" + ).execute(); + Thread.sleep(10000); + + // Verify customer was inserted + try (ResultSet rs = writer.getConnection().prepareStatement( + "SELECT * FROM test_db.`customers` FINAL WHERE email = 'test.user@example.com'").executeQuery()) { + Assert.assertTrue("Customer should be found", rs.next()); + Assert.assertEquals("Test User", rs.getString("name")); + } + + // Verify audit log was created + try (ResultSet rs = writer.getConnection().prepareStatement( + "SELECT * FROM test_db.`audit_logs` FINAL WHERE action_type = 'INSERT' ORDER BY created_at DESC LIMIT 1").executeQuery()) { + Assert.assertTrue("Audit log should be found", rs.next()); + Assert.assertTrue("Audit log should contain customer details", + rs.getString("action_details").contains("Customer added")); + } + } + + @Test + @DisplayName("Test customer update trigger") + public void testCustomerUpdateTrigger() throws Exception { + // Update customer name (should succeed) + mysqlConn.prepareStatement( + "UPDATE customers SET name = 'Updated Test User' WHERE email = 'test.user@example.com'" + ).execute(); + Thread.sleep(10000); + + // Verify name was updated + try (ResultSet rs = writer.getConnection().prepareStatement( + "SELECT * FROM test_db.`customers` FINAL WHERE email = 'test.user@example.com'").executeQuery()) { + Assert.assertTrue("Customer should be found", rs.next()); + Assert.assertEquals("Updated Test User", rs.getString("name")); + } + + // Try to update email (should fail) + try { + mysqlConn.prepareStatement( + "UPDATE customers SET email = 'updated.email@example.com' WHERE email = 'test.user@example.com'" + ).execute(); + Assert.fail("Email update should have failed"); + } catch (Exception e) { + Assert.assertTrue("Error should mention email updates not allowed", + e.getMessage().contains("Email updates are not allowed")); + } + } + + @Test + @DisplayName("Test stored procedures") + public void testStoredProcedures() throws Exception { + // Test insert_order procedure + mysqlConn.prepareStatement( + "CALL insert_order(1, 150.00)" + ).execute(); + Thread.sleep(10000); + + // Verify order was inserted + try (ResultSet rs = writer.getConnection().prepareStatement( + "SELECT * FROM test_db.`orders` FINAL WHERE order_amount = 150.00").executeQuery()) { + Assert.assertTrue("Order should be found", rs.next()); + Assert.assertEquals(1, rs.getInt("customer_id")); + } + + // Test customer_report procedure + try (ResultSet rs = mysqlConn.prepareStatement("CALL customer_report()").executeQuery()) { + Assert.assertTrue("Report should return results", rs.next()); + Assert.assertTrue("Report should include customer details", + rs.getString("name") != null); + } + } + + @Test + @DisplayName("Test functions") + public void testFunctions() throws Exception { + // Test get_total_order_amount function + try (ResultSet rs = mysqlConn.prepareStatement( + "SELECT get_total_order_amount(1) AS total_amount").executeQuery()) { + Assert.assertTrue("Should return total amount", rs.next()); + Assert.assertTrue("Total amount should be greater than 0", + rs.getDouble("total_amount") > 0); + } + + // Test get_customer_info function + try (ResultSet rs = mysqlConn.prepareStatement( + "SELECT get_customer_info(1) AS customer_info").executeQuery()) { + Assert.assertTrue("Should return customer info", rs.next()); + String jsonInfo = rs.getString("customer_info"); + Assert.assertTrue("JSON should contain customer details", + jsonInfo.contains("customer_id") && jsonInfo.contains("name")); + } + + // Test count_orders function + try (ResultSet rs = mysqlConn.prepareStatement( + "SELECT count_orders(1) AS order_count").executeQuery()) { + Assert.assertTrue("Should return order count", rs.next()); + Assert.assertTrue("Order count should be greater than 0", + rs.getInt("order_count") > 0); + } + } + + @Test + @DisplayName("Test delete customer procedure") + public void testDeleteCustomerProcedure() throws Exception { + // Get initial counts + int initialCustomers = getCount(mysqlConn, "SELECT COUNT(*) FROM customers"); + int initialOrders = getCount(mysqlConn, "SELECT COUNT(*) FROM orders"); + int initialAuditLogs = getCount(mysqlConn, "SELECT COUNT(*) FROM audit_logs"); + + // Delete customer + mysqlConn.prepareStatement("CALL delete_customer(1)").execute(); + Thread.sleep(10000); + + // Verify customer was deleted + try (ResultSet rs = writer.getConnection().prepareStatement( + "SELECT * FROM test_db.`customers` FINAL WHERE customer_id = 1").executeQuery()) { + Assert.assertFalse("Customer should be deleted", rs.next()); + } + + // Verify orders were deleted + try (ResultSet rs = writer.getConnection().prepareStatement( + "SELECT * FROM test_db.`orders` FINAL WHERE customer_id = 1").executeQuery()) { + Assert.assertFalse("Orders should be deleted", rs.next()); + } + + // Verify audit logs were created + int finalAuditLogs = getCount(writer.getConnection(), + "SELECT COUNT(*) FROM test_db.`audit_logs` FINAL"); + Assert.assertTrue("Audit logs should increase", finalAuditLogs > initialAuditLogs); + } + + private int getCount(Connection conn, String query) throws Exception { + try (ResultSet rs = conn.createStatement().executeQuery(query)) { + return rs.next() ? rs.getInt(1) : 0; + } + } +} \ No newline at end of file diff --git a/sink-connector-lightweight/src/test/resources/mysql_generated_columns.sql b/sink-connector-lightweight/src/test/resources/mysql_generated_columns.sql new file mode 100644 index 000000000..0e0484ab3 --- /dev/null +++ b/sink-connector-lightweight/src/test/resources/mysql_generated_columns.sql @@ -0,0 +1,142 @@ +-- MySQL Generated Columns Test Setup +CREATE DATABASE IF NOT EXISTS employees; +USE employees; + +-- 1. Basic Generated Column (Stored) +CREATE TABLE basic_generated_column ( + id INT PRIMARY KEY, + width DECIMAL(10,2), + height DECIMAL(10,2), + area DECIMAL(10,2) GENERATED ALWAYS AS (width * height) STORED +); + +-- 2. Virtual Generated Column +CREATE TABLE virtual_column_example ( + id INT PRIMARY KEY, + first_name VARCHAR(50), + last_name VARCHAR(50), + full_name VARCHAR(100) GENERATED ALWAYS AS (CONCAT(first_name, ' ', last_name)) VIRTUAL +); + +-- 3. Multiple Generated Columns +CREATE TABLE complex_generated_columns ( + id INT PRIMARY KEY, + base_salary DECIMAL(10,2), + tax_rate DECIMAL(5,2), + gross_salary DECIMAL(10,2) GENERATED ALWAYS AS (base_salary * (1 + tax_rate/100)) STORED, + annual_salary DECIMAL(10,2) GENERATED ALWAYS AS (gross_salary * 12) STORED +); + +-- 4. JSON Generated Columns +CREATE TABLE json_generated_columns ( + id INT PRIMARY KEY, + order_details JSON, + total_amount DECIMAL(10,2) GENERATED ALWAYS AS ( + JSON_EXTRACT(order_details, '$.price') * JSON_EXTRACT(order_details, '$.quantity') + ) STORED, + product_name VARCHAR(255) GENERATED ALWAYS AS ( + JSON_UNQUOTE(JSON_EXTRACT(order_details, '$.product_name')) + ) VIRTUAL +); + +-- 5. Mathematical Generated Columns +CREATE TABLE mathematical_generated_columns ( + id INT PRIMARY KEY, + radius DECIMAL(10,2), + circle_area DECIMAL(10,2) GENERATED ALWAYS AS (PI() * radius * radius) STORED, + circle_circumference DECIMAL(10,2) GENERATED ALWAYS AS (2 * PI() * radius) STORED +); + +-- 6. Base table for ALTER operations +CREATE TABLE base_table ( + id INT PRIMARY KEY, + first_name VARCHAR(50), + last_name VARCHAR(50) +); + + + +-- 8. Product pricing with enum-based generated columns +CREATE TABLE product_pricing ( + id INT PRIMARY KEY, + base_price DECIMAL(10,2), + category ENUM('Low', 'Medium', 'High'), + price_category VARCHAR(20) GENERATED ALWAYS AS ( + CASE + WHEN base_price < 10 THEN 'Budget' + WHEN base_price BETWEEN 10 AND 50 THEN 'Mid-range' + ELSE 'Premium' + END + ) STORED +); + +-- 9. Constrained generated columns +CREATE TABLE constrained_generated_columns ( + id INT PRIMARY KEY, + temperature DECIMAL(5,2), + temperature_category VARCHAR(20) GENERATED ALWAYS AS ( + CASE + WHEN temperature < 0 THEN 'Freezing' + WHEN temperature BETWEEN 0 AND 20 THEN 'Cold' + WHEN temperature BETWEEN 20 AND 30 THEN 'Warm' + ELSE 'Hot' + END + ) STORED, + CONSTRAINT chk_temperature CHECK (temperature BETWEEN -50 AND 100) +); + +-- 10. Indexable generated columns +CREATE TABLE indexable_generated_columns ( + id INT PRIMARY KEY, + first_name VARCHAR(50), + last_name VARCHAR(50), + full_name VARCHAR(100) GENERATED ALWAYS AS (CONCAT(first_name, ' ', last_name)) STORED, + INDEX idx_full_name (full_name) +); + +-- Insert sample data for testing +INSERT INTO basic_generated_column (id, width, height) VALUES +(1, 10.5, 20.3), +(2, 15.7, 8.9), +(3, 12.0, 25.5); + +INSERT INTO virtual_column_example (id, first_name, last_name) VALUES +(1, 'John', 'Doe'), +(2, 'Jane', 'Smith'), +(3, 'Bob', 'Johnson'); + +INSERT INTO complex_generated_columns (id, base_salary, tax_rate) VALUES +(1, 5000.00, 15.0), +(2, 7500.00, 20.0), +(3, 10000.00, 25.0); + +INSERT INTO json_generated_columns (id, order_details) VALUES +(1, '{"price": 25.50, "quantity": 2, "product_name": "Widget A"}'), +(2, '{"price": 15.75, "quantity": 3, "product_name": "Widget B"}'), +(3, '{"price": 100.00, "quantity": 1, "product_name": "Premium Widget"}'); + +INSERT INTO mathematical_generated_columns (id, radius) VALUES +(1, 5.0), +(2, 10.0), +(3, 7.5); + +INSERT INTO base_table (id, first_name, last_name) VALUES +(1, 'Alice', 'Cooper'), +(2, 'Charlie', 'Brown'), +(3, 'Diana', 'Prince'); + +INSERT INTO product_pricing (id, base_price, category) VALUES +(1, 5.99, 'Low'), +(2, 25.50, 'Medium'), +(3, 150.00, 'High'); + +INSERT INTO constrained_generated_columns (id, temperature) VALUES +(1, -10.5), +(2, 15.0), +(3, 25.5), +(4, 45.0); + +INSERT INTO indexable_generated_columns (id, first_name, last_name) VALUES +(1, 'Emma', 'Watson'), +(2, 'Robert', 'Downey'), +(3, 'Scarlett', 'Johansson'); \ No newline at end of file