Skip to content

Commit d95a083

Browse files
yuxiqiantchivs
authored andcommitted
[tests][pipeline-connector/fluss] Add MySQL to Fluss E2e IT case (apache#4057)
* [ci][fluss] Add MySQL to Fluss E2e IT case Signed-off-by: yuxiqian <[email protected]> * add: comments Signed-off-by: yuxiqian <[email protected]> --------- Signed-off-by: yuxiqian <[email protected]>
1 parent 66a1435 commit d95a083

File tree

5 files changed

+448
-1
lines changed

5 files changed

+448
-1
lines changed

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ limitations under the License.
4242
<maven.plugin.download.version>1.6.8</maven.plugin.download.version>
4343
<iceberg.version>1.6.1</iceberg.version>
4444
<hive.version>2.3.9</hive.version>
45+
<fluss.version>0.7.0</fluss.version>
4546
</properties>
4647

4748
<dependencies>
@@ -602,6 +603,24 @@ limitations under the License.
602603
<outputDirectory>${project.build.directory}/dependencies
603604
</outputDirectory>
604605
</artifactItem>
606+
<artifactItem>
607+
<groupId>org.apache.flink</groupId>
608+
<artifactId>flink-cdc-pipeline-connector-fluss</artifactId>
609+
<version>${project.version}</version>
610+
<destFileName>fluss-cdc-pipeline-connector.jar</destFileName>
611+
<type>jar</type>
612+
<outputDirectory>${project.build.directory}/dependencies
613+
</outputDirectory>
614+
</artifactItem>
615+
<artifactItem>
616+
<groupId>com.alibaba.fluss</groupId>
617+
<artifactId>fluss-flink-${flink.major.version}</artifactId>
618+
<version>${fluss.version}</version>
619+
<destFileName>fluss-sql-connector.jar</destFileName>
620+
<type>jar</type>
621+
<outputDirectory>${project.build.directory}/dependencies
622+
</outputDirectory>
623+
</artifactItem>
605624
<artifactItem>
606625
<groupId>org.apache.flink</groupId>
607626
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.pipeline.tests;
19+
20+
import org.apache.flink.cdc.common.test.utils.TestUtils;
21+
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
22+
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
23+
24+
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
25+
26+
import org.assertj.core.api.Assertions;
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.params.ParameterizedTest;
30+
import org.junit.jupiter.params.provider.ValueSource;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
import org.testcontainers.containers.GenericContainer;
34+
import org.testcontainers.containers.output.Slf4jLogConsumer;
35+
import org.testcontainers.images.builder.Transferable;
36+
import org.testcontainers.junit.jupiter.Container;
37+
import org.testcontainers.junit.jupiter.Testcontainers;
38+
39+
import java.nio.file.Path;
40+
import java.sql.Connection;
41+
import java.sql.DriverManager;
42+
import java.sql.SQLException;
43+
import java.sql.Statement;
44+
import java.time.Duration;
45+
import java.util.Arrays;
46+
import java.util.Collections;
47+
import java.util.List;
48+
import java.util.stream.Collectors;
49+
50+
/** An End-to-end test case for Fluss pipeline connector. */
51+
@Testcontainers
52+
public class FlussE2eITCase extends PipelineTestEnvironment {
53+
54+
private static final Logger LOG = LoggerFactory.getLogger(FlussE2eITCase.class);
55+
private static final Duration FLUSS_TESTCASE_TIMEOUT = Duration.ofMinutes(3);
56+
private static final String flussImageTag = "fluss/fluss:0.7.0";
57+
private static final String zooKeeperImageTag = "zookeeper:3.9.2";
58+
59+
private static final List<String> flussCoordinatorProperties =
60+
Arrays.asList(
61+
"zookeeper.address: zookeeper:2181",
62+
"bind.listeners: INTERNAL://coordinator-server:0, CLIENT://coordinator-server:9123",
63+
"internal.listener.name: INTERNAL",
64+
"remote.data.dir: /tmp/fluss/remote-data",
65+
"security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT",
66+
"security.sasl.enabled.mechanisms: PLAIN",
67+
"security.sasl.plain.jaas.config: com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
68+
"super.users: User:admin");
69+
70+
private static final List<String> flussTabletServerProperties =
71+
Arrays.asList(
72+
"zookeeper.address: zookeeper:2181",
73+
"bind.listeners: INTERNAL://tablet-server:0, CLIENT://tablet-server:9123",
74+
"internal.listener.name: INTERNAL",
75+
"tablet-server.id: 0",
76+
"kv.snapshot.interval: 0s",
77+
"data.dir: /tmp/fluss/data",
78+
"remote.data.dir: /tmp/fluss/remote-data",
79+
"security.protocol.map: CLIENT:SASL, INTERNAL:PLAINTEXT",
80+
"security.sasl.enabled.mechanisms: PLAIN",
81+
"security.sasl.plain.jaas.config: com.alibaba.fluss.security.auth.sasl.plain.PlainLoginModule required user_admin=\"admin-pass\" user_developer=\"developer-pass\";",
82+
"super.users: User:admin");
83+
84+
@Container
85+
private static final GenericContainer<?> ZOOKEEPER =
86+
new GenericContainer<>(zooKeeperImageTag)
87+
.withNetworkAliases("zookeeper")
88+
.withExposedPorts(2181)
89+
.withNetwork(NETWORK)
90+
.withLogConsumer(new Slf4jLogConsumer(LOG));
91+
92+
@Container
93+
private static final GenericContainer<?> FLUSS_COORDINATOR =
94+
new GenericContainer<>(flussImageTag)
95+
.withEnv(
96+
ImmutableMap.of(
97+
"FLUSS_PROPERTIES",
98+
String.join("\n", flussCoordinatorProperties)))
99+
.withCommand("coordinatorServer")
100+
.withNetworkAliases("coordinator-server")
101+
.withExposedPorts(9123)
102+
.withNetwork(NETWORK)
103+
.dependsOn(ZOOKEEPER)
104+
.withLogConsumer(new Slf4jLogConsumer(LOG));
105+
106+
@Container
107+
private static final GenericContainer<?> FLUSS_TABLET_SERVER =
108+
new GenericContainer<>(flussImageTag)
109+
.withEnv(
110+
ImmutableMap.of(
111+
"FLUSS_PROPERTIES",
112+
String.join("\n", flussTabletServerProperties)))
113+
.withCommand("tabletServer")
114+
.withNetworkAliases("tablet-server")
115+
.withExposedPorts(9123)
116+
.withNetwork(NETWORK)
117+
.dependsOn(ZOOKEEPER, FLUSS_COORDINATOR)
118+
.withLogConsumer(new Slf4jLogConsumer(LOG));
119+
120+
protected final UniqueDatabase inventoryDatabaseWithPrimaryKey =
121+
new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
122+
123+
protected final UniqueDatabase inventoryDatabaseWithoutPrimaryKey =
124+
new UniqueDatabase(
125+
MYSQL, "mysql_inventory_wo_pk", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
126+
127+
@Override
128+
protected List<String> copyJarToFlinkLib() {
129+
// Due to a bug described in https://github.com/apache/fluss/pull/1267, it's not viable to
130+
// pass Fluss dependency with `--jar` CLI option. We may remove this workaround and use
131+
// `submitPipelineJob` to carry extra jar later.
132+
return Collections.singletonList("fluss-sql-connector.jar");
133+
}
134+
135+
@BeforeEach
136+
public void before() throws Exception {
137+
super.before();
138+
inventoryDatabaseWithPrimaryKey.createAndInitialize();
139+
inventoryDatabaseWithoutPrimaryKey.createAndInitialize();
140+
}
141+
142+
@AfterEach
143+
public void after() {
144+
super.after();
145+
inventoryDatabaseWithPrimaryKey.dropDatabase();
146+
inventoryDatabaseWithoutPrimaryKey.dropDatabase();
147+
}
148+
149+
@ParameterizedTest(name = "PkTable: {0}")
150+
@ValueSource(booleans = {true, false})
151+
void testMySqlToFluss(boolean hasPrimaryKey) throws Exception {
152+
UniqueDatabase inventoryDatabase =
153+
hasPrimaryKey
154+
? inventoryDatabaseWithPrimaryKey
155+
: inventoryDatabaseWithoutPrimaryKey;
156+
String database = inventoryDatabase.getDatabaseName();
157+
String pipelineJob =
158+
String.format(
159+
"source:\n"
160+
+ " type: mysql\n"
161+
+ " hostname: %s\n"
162+
+ " port: 3306\n"
163+
+ " username: %s\n"
164+
+ " password: %s\n"
165+
+ " tables: %s.\\.*\n"
166+
+ " server-id: 5400-5404\n"
167+
+ " server-time-zone: UTC\n"
168+
+ " scan.incremental.snapshot.chunk.key-column: %s.\\.*:id\n"
169+
+ "\n"
170+
+ "sink:\n"
171+
+ " type: fluss\n"
172+
+ " bootstrap.servers: coordinator-server:9123\n"
173+
+ " properties.client.security.protocol: sasl\n"
174+
+ " properties.client.security.sasl.mechanism: PLAIN\n"
175+
+ " properties.client.security.sasl.username: developer\n"
176+
+ " properties.client.security.sasl.password: developer-pass\n"
177+
+ "\n"
178+
+ "pipeline:\n"
179+
+ " parallelism: %d",
180+
INTER_CONTAINER_MYSQL_ALIAS,
181+
MYSQL_TEST_USER,
182+
MYSQL_TEST_PASSWORD,
183+
database,
184+
database,
185+
parallelism);
186+
Path flussConnector = TestUtils.getResource("fluss-cdc-pipeline-connector.jar");
187+
submitPipelineJob(pipelineJob, flussConnector);
188+
waitUntilJobRunning(Duration.ofSeconds(30));
189+
LOG.info("Pipeline job is running");
190+
191+
validateSinkResult(
192+
database,
193+
"products",
194+
Arrays.asList(
195+
"101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
196+
"102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
197+
"103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
198+
"104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
199+
"105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
200+
"106, hammer, 16oz carpenter's hammer, 1.0, null, null, null",
201+
"107, rocks, box of assorted rocks, 5.3, null, null, null",
202+
"108, jacket, water resistent black wind breaker, 0.1, null, null, null",
203+
"109, spare tire, 24 inch spare tire, 22.2, null, null, null"));
204+
205+
validateSinkResult(
206+
database,
207+
"customers",
208+
Arrays.asList(
209+
"101, user_1, Shanghai, 123567891234",
210+
"102, user_2, Shanghai, 123567891234",
211+
"103, user_3, Shanghai, 123567891234",
212+
"104, user_4, Shanghai, 123567891234"));
213+
214+
if (!hasPrimaryKey) {
215+
// Non-primary key does not support deleting rows for now.
216+
return;
217+
}
218+
219+
String mysqlJdbcUrl =
220+
String.format(
221+
"jdbc:mysql://%s:%s/%s",
222+
MYSQL.getHost(),
223+
MYSQL.getDatabasePort(),
224+
inventoryDatabase.getDatabaseName());
225+
226+
// Fluss does not support applying DDL events for now.
227+
try (Connection conn =
228+
DriverManager.getConnection(
229+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
230+
Statement stat = conn.createStatement()) {
231+
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
232+
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
233+
stat.execute("DELETE FROM products WHERE id=111;");
234+
stat.execute(
235+
"INSERT INTO products VALUES (default,'jacket','water resistant white wind breaker', 0.2, null, null, null);");
236+
stat.execute(
237+
"INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter', 5.18, null, null, null);");
238+
} catch (SQLException e) {
239+
LOG.error("Update table for CDC failed.", e);
240+
throw e;
241+
}
242+
243+
validateSinkResult(
244+
database,
245+
"products",
246+
Arrays.asList(
247+
"101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
248+
"102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
249+
"103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
250+
"104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
251+
"105, hammer, 14oz carpenter's hammer, 0.875, red, {\"k1\": \"v1\", \"k2\": \"v2\"}, {\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
252+
"106, hammer, 18oz carpenter hammer, 1.0, null, null, null",
253+
"107, rocks, box of assorted rocks, 5.1, null, null, null",
254+
"108, jacket, water resistent black wind breaker, 0.1, null, null, null",
255+
"109, spare tire, 24 inch spare tire, 22.2, null, null, null",
256+
"110, jacket, water resistant white wind breaker, 0.2, null, null, null",
257+
"111, scooter, Big 2-wheel scooter, 5.18, null, null, null"));
258+
}
259+
260+
private List<String> fetchFlussTableRows(String database, String table, int rowCount)
261+
throws Exception {
262+
String template =
263+
readLines("docker/peek-fluss.sql").stream()
264+
.filter(line -> !line.startsWith("--"))
265+
.collect(Collectors.joining("\n"));
266+
String sql = String.format(template, database, table, rowCount);
267+
String containerSqlPath = sharedVolume.toString() + "/peek.sql";
268+
jobManager.copyFileToContainer(Transferable.of(sql), containerSqlPath);
269+
270+
org.testcontainers.containers.Container.ExecResult result =
271+
jobManager.execInContainer("/opt/flink/bin/sql-client.sh", "-f", containerSqlPath);
272+
if (result.getExitCode() != 0) {
273+
throw new RuntimeException(
274+
"Failed to execute peek script. Stdout: "
275+
+ result.getStdout()
276+
+ "; Stderr: "
277+
+ result.getStderr());
278+
}
279+
280+
return Arrays.stream(result.getStdout().split("\n"))
281+
.filter(line -> line.startsWith("|"))
282+
.skip(1)
283+
.map(FlussE2eITCase::extractRow)
284+
.map(row -> String.format("%s", String.join(", ", row)))
285+
.collect(Collectors.toList());
286+
}
287+
288+
private static String[] extractRow(String row) {
289+
return Arrays.stream(row.split("\\|"))
290+
.map(String::trim)
291+
.filter(col -> !col.isEmpty())
292+
.map(col -> col.equals("<NULL>") ? "null" : col)
293+
.toArray(String[]::new);
294+
}
295+
296+
private void validateSinkResult(String database, String table, List<String> expected)
297+
throws InterruptedException {
298+
LOG.info("Verifying Fluss {}::{} results...", database, table);
299+
long deadline = System.currentTimeMillis() + FLUSS_TESTCASE_TIMEOUT.toMillis();
300+
List<String> results = Collections.emptyList();
301+
int rowCount = expected.size();
302+
while (System.currentTimeMillis() < deadline) {
303+
try {
304+
results = fetchFlussTableRows(database, table, rowCount);
305+
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
306+
LOG.info(
307+
"Successfully verified {} records in {} seconds.",
308+
expected.size(),
309+
(System.currentTimeMillis() - deadline + FLUSS_TESTCASE_TIMEOUT.toMillis())
310+
/ 1000);
311+
return;
312+
} catch (Exception e) {
313+
LOG.warn("Validate failed, waiting for the next loop...", e);
314+
} catch (AssertionError ignored) {
315+
// AssertionError contains way too much records and might flood the log output.
316+
LOG.warn(
317+
"Results mismatch, expected {} records, but got {} actually. Waiting for the next loop...",
318+
expected.size(),
319+
results.size());
320+
}
321+
Thread.sleep(1000L);
322+
}
323+
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
324+
}
325+
}

0 commit comments

Comments
 (0)