-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Search before asking
- I searched in the issues and found nothing similar.
Paimon version
paimon 1.1.1
Compute Engine
Flink 1.20.1 (paimon-flink-1.20)
Minimal reproduce step
Paimon create table:
CREATE TABLE `paimon`.`default`.`paimon_student_info` (
`sno` INT NOT NULL,
`name` VARCHAR(2147483647),
`address` VARCHAR(2147483647),
`email` VARCHAR(2147483647),
CONSTRAINT `PK_sno` PRIMARY KEY (`sno`) NOT ENFORCED
) WITH (
'path' = 's3://qxb-lake/default.db/paimon_student_info',
'table.exec.sink.upsert-materialize' = 'NONE',
'changelog-producer.row-deduplicate' = 'true',
'changelog-producer' = 'lookup',
'snapshot.num-retained.max' = '10',
'snapshot.num-retained.min' = '1',
'deletion-vectors.enabled' = 'true'
)
Using MinIO as storage, execute the following Java code:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
List<String> initSQLs = ImmutableList.of(
"CREATE CATALOG paimon_catalog WITH (\n" +
" 'type'='paimon',\n" +
" 'warehouse'='s3://qxb-lake//',\n" +
" 's3.endpoint' = 'http://localhost:9000/',\n" +
" 's3.access-key' = '******',\n" +
" 's3.secret-key' = '******',\n" +
" 's3.path.style.access' = 'true'\n" +
");",
"create table student_info (\n" +
" sno int,\n" +
" name string ,\n" +
" address string,\n" +
" email string,\n" +
" primary key (sno) not enforced\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'tmp_student',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'kafka_student_02',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'debezium-json'\n" +
");"
);
String exeSQL = "insert into paimon_catalog.`default`.paimon_student_info\n" +
"select * from student_info;";
for (String initSQL : initSQLs) {
tEnv.executeSql(initSQL);
}
tEnv.executeSql(exeSQL);
}
Additionally, initiate a task to output the Changelog of Paimon to the console or elsewhere, and observe the Changelog generation of the Paimon table
Then input the data in the following order:
- First, write the following data to Kafka topic: tmp_student:
{
"before": null,
"after": { "sno": 8, "name": "dyl5", "address": "hefei", "email": "[email protected]" },
"op":"c"
}
- After ensuring that the data can be queried in the Paimon table, write the following data into Kafka to delete the above record:
{
"before": { "sno": 8, "name": "dyl5", "address": "hefei", "email": "[email protected]" },
"after": null,
"op":"d"
}
- This step requires timing, for example, with a 10 second interval between checkpoints and a default of around 9 seconds. When "op=d" is about to be output to the console or shortly after output, the same data should be written again:
{
"before": null,
"after": { "sno": 8, "name": "dyl5", "address": "hefei", "email": "[email protected]" },
"op":"c"
}
After trying several times, it was accidentally discovered that the Changelog only output the Changelog with "op=d", and did not output the last Changelog with "op=c".
But at this point, querying the Paimon table can get records with sno=8.
This will cause downstream tasks of Paimon Changelog to only receive deletion changes with op=d, resulting in inconsistency between downstream table and Paimon table data.
What doesn't meet your expectations?
Paimon Changelog 丢失数据变更, 导致上下游数据不一致
Anything else?
经过 DEBUG 发现, LookupChangelogMergeFunctionWrapper 的 getResult , 在某些情况下, 在执行上述三次 (op=c, op=d, op=c) 操作时, 会忽略中间的一次 op=d 的操作, 将 highLevel 设置为第一次的 op=c 的记录, 导致配置了 'changelog-producer.row-deduplicate' = 'true' 之后, 在后面 setChangelog 时, 判断前后两条数据完全一样, 忽略了最后一次 op=c 的数据变更输出.
上图中, 执行第三次 op=c 的操作时, highLevel 被授予了 第一次 op=c 的记录, 忽略了 op=d 的操作
Are you willing to submit a PR?
- I'm willing to submit a PR!