Skip to content

Conversation

@rionmonster
Copy link
Contributor

@rionmonster rionmonster commented Dec 27, 2025

Purpose

Linked issue: close #2262

Per Issue #2262, this pull request addresses a race condition that could sometimes occur and result in the IcebergRewriteITCase.testLogTableCompaction test case failing (particularly during CI builds).

Brief change log

This change updates the testLogTableCompaction test with an additional call to the existing assertReplicaStatus() function to ensure that the associated tiering job had fully processed the previous rewrites to avoid the race condition prior to the subsequent assertions.

Tests

The IcebergRewriteITCase.testLogTableCompaction was initially updated to use an iterative approach (e.g., repeat 50 times) as mentioned in the original issue to reproduce the issue. After this was repeatably reproducible, the proposed fix was introduced to verify the test would repeatedly pass through all of the iterations successfully and repeatably.

API and Format

N/A

Documentation

N/A

Reviewer(s) Requested

@swuferhong (as original reporter), @beryllw (as original author)

checkFileStatusInIcebergTable(t1, 3, false);

// Ensure tiering job has fully processed the previous writes
assertReplicaStatus(t1Bucket, i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about it. If the files count already is 3 in iceberg, shouldn't it also mean that the tiering has already tiered all records since we only write 3 records.

Copy link
Contributor Author

@rionmonster rionmonster Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia

I agree that it’s a bit odd. The additional replica assertion seems to alleviate any race conditions within this specific test case that we were seeing arise within the failing CI instances (at least based on my local testing to reproduce the issue).

It may simply be a testing artifact as opposed to a legitimate issue, but if it doesn’t seem like a fix, we can explore a few other avenues. It seemed like one of those common race conditions, so leveraging an existing function to help seemed like a decent approach. Happy to explore some additional avenues though, if we feel that we need a bit more exhaustive checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rionmonster I tried to reporduce it in my local. But fails. I think it'll be better to explore the root cause for I'm afraid of it is caused by another critical issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia

Good to know! It seemed to be pretty consistent regarding pass/fail on my end, but clearly there’s something else at play.

I’ll do some more exploration and see what I find. Thanks for the feedback!

Copy link
Contributor Author

@rionmonster rionmonster Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia

I've been digging into this a bit further and it seems there's some disparity between the actual files being written and retrieving the latest offsets for those (specifically after any types of asynchronous operations, such as compaction, etc.). I think we may need some mechanism to improve the consistency, at least within the bounds of the tests.

I wrote a little monitor to run through several iterations of the tests to see what the state of various bits looked like during each iteration. Here's a sample of those:

Successful Test (Successfully Performs Compaction as Expected)
[MONITOR] log_table_33 - TableBucket{tableId=32, bucket=0}

IcebergFiles IcebergSnapshotId LakeSnapshotId ReplicaLakeSnapId ReplicaLakeLogEndOff Timestamp
0 -1 -1 -1 -1 1767113560800
0 -1 -1 -1 -1 1767113561046
... ... ... ... ... ...
3 5182733673261799288 5182733673261799288 5443797100773076340 3 1767113615059
3 5182733673261799288 5182733673261799288 5443797100773076340 3 1767113615307
2 2575057976625237982 2575057976625237982 5443797100773076340 4 1767113615557
2 2575057976625237982 2575057976625237982 5443797100773076340 4 1767113615808

Failing Test (File appeared to never be properly written before expected offset)
[MONITOR] log_table_34 - TableBucket{tableId=33, bucket=0}

IcebergFiles IcebergSnapshotId LakeSnapshotId ReplicaLakeSnapId ReplicaLakeLogEndOff Timestamp
0 -1 -1 -1 -1 1767113616327
... ... ... ... ... ...
2 7273969972093574431 7273969972093574431 7273969972093574431 2 1767113861627
2 7273969972093574431 7273969972093574431 7273969972093574431 2 1767113861882
2 7273969972093574431 7273969972093574431 7273969972093574431 2 1767113862135
2 7273969972093574431 7273969972093574431 7273969972093574431 2 1767113862381
2 7273969972093574431 7273969972093574431 7273969972093574431 2 1767113862633
[ASSERTION FAILURE] Expected offset 3 but got 2 for bucket TableBucket{tableId=33, bucket=0}
  Replica Lake Snapshot ID: 7273969972093574431
  Current State:
    Iceberg Files: 2
    Iceberg Snapshot ID: 7273969972093574431
    Lake Snapshot ID (from admin): 7273969972093574431
    Replica Lake Snapshot ID: 7273969972093574431
    Replica Lake Log End Offset: 2

I'm not sure if this is more of an artifact of the tests themselves or a legitimate issue. Any thoughts? Happy to continue digging. It feels like a race-condition due to the inconsistency, either on writing the files or reading stale offsets from the data lake directly. I'm sure in a real-world environment, this might be tolerable (as I suspect it's just a minor latency spike which would eventually resolve), but in the confines of a test, it's flaky.

Copy link
Contributor Author

@rionmonster rionmonster Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia

I added some additional checks around these to try and better identify the underlying issues. It doesn't look like the files are being written or recognized by Iceberg, which I suspect is simply a race-condition from the additional diagnostics:

[ASSERTION FAILURE] Expected offset 3 but got 2 for bucket TableBucket{tableId=21, bucket=0}
  Replica Lake Snapshot ID: 6090342046054561204
  Current State:
    Iceberg Files: 2
    Iceberg Snapshot ID: 6090342046054561204
    Lake Snapshot ID (from admin): 6090342046054561204
    Replica Lake Snapshot ID: 6090342046054561204
    Replica Lake Log End Offset: 2 (expected: 3, diff: 1)

Said differently, because the replica offset is off (by one) it means either the tiering job hasn't processed all writes yet (check monitor output above), the offset update is lagging behind file commits (race condition), or the tiering job itself may be stuck or running slowly.

I think we could help resolve this in two ways, which I'm testing through now and will update the PR:

  1. Introduce a waitForTieringToProcess helper function that would check to verify that the tiering had been executed before continuing to validate the replica statuses.
  2. Reduce the test-specific freshness and/or tiering intervals (e.g., POLL_TIERING_TABLE_INTERVAL) from their current values of 500ms to 100ms.

Thoughts? I’d probably err on the side of applying both since it’s a flaky test and the more consistency, the better.

Copy link
Contributor Author

@rionmonster rionmonster Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia

I've been tinkering with a few different approaches to handling this and it appears to entirely be related to the asynchronous processes in place (e.g., tiering, datalake freshness, etc.). I tried to avoid introducing any explicit delays such as Thread.sleep(...) calls as those generally feel more like addressing an underlying symptom as opposed to the cause.

I just recently ran a test through 500 and 1000 iterations respectively with the POLL_TIERING_TABLE_INTERVAL and TABLE_DATALAKE_FRESHNESS set to 100ms (from their original 500ms values). This appeared to address the flakiness, although I suppose with all these asynchronous processes, it’s still technically possible for them to fail.

Do this think this would be sufficient or are you aware of any additional configurations that we might be able to use within the context of this test to force flushes to help with consistency in some of these asynchronous processes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rionmonster for diving into it and sorry for late reply for i have a busy week. I download the log for the fail ci in https://github.com/apache/fluss/actions/runs/20522062759/artifacts/4970768099
And I found the following logs:

--------- first tiering
20:09:10,300 [SourceCoordinator-Source: TieringSource] INFO  org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - Generate Tiering splits for table fluss.log_table.
20:09:10,310 [SourceCoordinator-Source: TieringSource] INFO  org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - **Generate Tiering 1** splits for table fluss.log_table with cost 10ms.
20:09:10,310 [Source: TieringSource (1/2)#0] INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [TieringLogSplit{tablePath=fluss.log_table, tableBucket=TableBucket{tableId=2, bucket=0}, 

---- second  tiering
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - Generate Tiering splits for table fluss.log_table.
20:09:11,303 [SourceCoordinator-Source: TieringSource] INFO  org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator [] - Last committed lake table snapshot info is:LakeSnapshot{snapshotId=3828565410144101634, tableBucketsOffset={TableBucket{tableId=2, bucket=0}=1}}
20:09:11,307 [SourceCoordinator-Source: TieringSource] INFO  org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - Generate Tiering 1 splits for table fluss.log_table with cost 7ms.
20:09:11,307 [Source: TieringSource (1/2)#0] INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [TieringLogSplit{tablePath=fluss.log_table, tableBucket=TableBucket{tableId=2, bucket=0}, partitionName='null', startingOffset=1, stoppingOffset=2, numberOfSplits=1}]
20:09:11,307 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 1
20:09:11,307 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - add split tiering-log-split-2-0
20:09:11,310 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - Start to tier table fluss.log_table with table id 2.
20:09:11,311 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - Subscribe to read log for split tiering-log-split-2-0 from starting offset 1 to end offset 2.

---- third  tiering

org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - Generate Tiering splits for table fluss.log_table.
20:09:12,303 [SourceCoordinator-Source: TieringSource] INFO  org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator [] - Last committed lake table snapshot info is:LakeSnapshot{snapshotId=7991116837264656570, tableBucketsOffset={TableBucket{tableId=2, bucket=0}=2}}
20:09:12,307 [SourceCoordinator-Source: TieringSource] INFO  org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - Generate Tiering 1 splits for table fluss.log_table with cost 7ms.
20:09:12,308 [Source: TieringSource (1/2)#0] INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding split(s) to reader: [TieringLogSplit{tablePath=fluss.log_table, tableBucket=TableBucket{tableId=2, bucket=0}, partitionName='null', startingOffset=2, stoppingOffset=3, numberOfSplits=1}]
20:09:12,308 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 2
20:09:12,308 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - add split tiering-log-split-2-0
20:09:12,311 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - Start to tier table fluss.log_table with table id 2.
20:09:12,311 [Source Data Fetcher for Source: TieringSource (1/2)#0] INFO  org.apache.fluss.flink.tiering.source.TieringSplitReader     [] - Subscribe to read log for split tiering-log-split-2-0 from starting offset 2 to end offset 3.


--- then no tiering again, that's where the problem happen

20:09:14,300 [SourceCoordinator-Source: TieringSource-worker-thread-1] INFO  org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - No available Tiering table found, will poll later.
20:09:14,800 [SourceCoordinator-Source: TieringSource-worker-thread-1] INFO  org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - No available Tiering table found, will poll later.
....

20:10:19,800 [SourceCoordinator-Source: TieringSource-worker-thread-1] INFO  org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator [] - No available Tiering table found, will poll later.

**no any tiering happen**

I think the keypoint is that why after the third tiering which tiering up to offset 3(3 records), no tiering happen.
It's expected to happen another tiering since we write the 4th record in

 flussRows.addAll(
                    writeIcebergTableRecords(
                            t1, t1Bucket, ++i, true, Collections.singletonList(row(1, "v1"))));

It'll be better that we figure out the reason. Since you can reproduce it in your local, I think you can print more logs in LakeTableTieringManager to find the it doesn't assign the table to tiering service anymore.

I'm afarid that there is some bug in LakeTableTieringManager or the logic of assign the table to tier.

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rionmonster Thanks for the pr. Only one question

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Unstable test IcebergRewriteITCase.testLogTableCompaction

2 participants