Skip to content

Commit c2c40a2

Browse files
(old CDK) Reduce initial load timeout and improve logging (#59682)
1 parent e6681c9 commit c2c40a2

File tree

5 files changed

+80
-17
lines changed

5 files changed

+80
-17
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,9 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177-
| 0.48.9 | 2025-04-17 | [\#58132] (https://github.com/airbytehq/airbyte/pull/58132) | Fix vulnerability in dependencies. |
178-
| 0.48.8 | 2025-03-11 | [\#55709](https://github.com/airbytehq/airbyte/pull/55709) | Filter unwanted tables in discover to prevent null table issues |
177+
| 0.48.13 | 2025-05-06 | [\#59682](https://github.com/airbytehq/airbyte/pull/59682) | Reduce initial load timeout and improve logging. |
178+
| 0.48.9 | 2025-04-17 | [\#58132](https://github.com/airbytehq/airbyte/pull/58132) | Fix vulnerability in dependencies. |
179+
| 0.48.8 | 2025-03-11 | [\#55709](https://github.com/airbytehq/airbyte/pull/55709) | Filter unwanted tables in discover to prevent null table issues |
179180
| 0.48.7 | 2025-01-26 | [\#51596](https://github.com/airbytehq/airbyte/pull/51596) | Make efficient table discovery during read |
180181
| 0.48.6 | 2025-01-26 | [\#51596](https://github.com/airbytehq/airbyte/pull/51596) | Fix flaky source mssql tests |
181182
| 0.48.5 | 2025-01-16 | [\#51583](https://github.com/airbytehq/airbyte/pull/51583) | Also save SSL key to /tmp in destination-postgres |
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.48.12
1+
version=0.48.13

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,16 @@ class DebeziumRecordIterator<T>(
5555
private var numUnloggedPolls: Int = -1
5656
private var lastLoggedPoll: Instant = Instant.MIN
5757

58+
@VisibleForTesting
59+
fun formatDuration(duration: Duration): String {
60+
return when {
61+
duration.toMillis() < 1000 -> String.format("%.2f ms", duration.toNanos() / 1_000_000.0)
62+
duration.toSeconds() < 60 -> String.format("%.2f seconds", duration.toMillis() / 1000.0)
63+
duration.toMinutes() < 60 -> String.format("%.2f minutes", duration.toSeconds() / 60.0)
64+
else -> String.format("%.2f hours", duration.toMinutes() / 60.0)
65+
}
66+
}
67+
5868
// The following logic incorporates heartbeat:
5969
// 1. Wait on queue either the configured time first or 1 min after a record received
6070
// 2. If nothing came out of queue finish sync
@@ -66,6 +76,8 @@ class DebeziumRecordIterator<T>(
6676
// keep trying until the publisher is closed or until the queue is empty. the latter case is
6777
// possible when the publisher has shutdown but the consumer has not yet processed all
6878
// messages it emitted.
79+
LOGGER.info { "Starting CDC Process" }
80+
val instantBeforeSync = Instant.now()
6981
while (!MoreBooleans.isTruthy(publisherStatusSupplier.get()) || !queue.isEmpty()) {
7082
val next: ChangeEvent<String?, String?>?
7183
val waitTime =
@@ -84,15 +96,15 @@ class DebeziumRecordIterator<T>(
8496
isHeartbeatEvent(next)
8597
if (isEventLogged) {
8698
val pollDuration: Duration = Duration.between(instantBeforePoll, Instant.now())
99+
// format duration to human-readable
100+
val formattedPollDuration = formatDuration(pollDuration)
87101
LOGGER.info {
88102
"CDC events queue poll(): " +
89103
when (numUnloggedPolls) {
90-
-1 -> "blocked for $pollDuration in its first call."
91-
0 ->
92-
"blocked for $pollDuration after " +
93-
"its previous call which was also logged."
104+
-1 -> "first call - waited $formattedPollDuration for events"
105+
0 -> "waited for $formattedPollDuration since last logged call."
94106
else ->
95-
"blocked for $pollDuration after " +
107+
"waited for $formattedPollDuration after " +
96108
"$numUnloggedPolls previous call(s) which were not logged."
97109
}
98110
}
@@ -110,18 +122,16 @@ class DebeziumRecordIterator<T>(
110122
!receivedFirstRecord || hasSnapshotFinished || maxInstanceOfNoRecordsFound >= 10
111123
) {
112124
requestClose(
113-
String.format(
114-
"No records were returned by Debezium in the timeout seconds %s, closing the engine and iterator",
115-
waitTime.seconds
116-
),
125+
"Closing the Debezium engine after no records received within ${waitTime.seconds} seconds timeout. Status: receivedFirstRecord=$receivedFirstRecord, " +
126+
"hasSnapshotFinished=$hasSnapshotFinished, noRecordsAttempts=$maxInstanceOfNoRecordsFound",
117127
DebeziumCloseReason.TIMEOUT
118128
)
119129
}
120130

121131
maxInstanceOfNoRecordsFound++
122132
LOGGER.info {
123133
"CDC events queue poll(): " +
124-
"returned nothing, polling again, attempt $maxInstanceOfNoRecordsFound."
134+
"returned nothing. Waiting for more records, attempt $maxInstanceOfNoRecordsFound."
125135
}
126136
continue
127137
}
@@ -137,13 +147,14 @@ class DebeziumRecordIterator<T>(
137147

138148
val heartbeatPos = getHeartbeatPosition(next)
139149
val isProgressing = heartbeatPos != lastHeartbeatPosition
150+
val instantSyncTime: Duration = Duration.between(instantBeforeSync, Instant.now())
151+
val debeziumWaitingTimeRemaining = waitTime.seconds - instantSyncTime.toSeconds()
140152
LOGGER.info {
141153
"CDC events queue poll(): " +
142-
"returned a heartbeat event: " +
143154
if (isProgressing) {
144-
"progressing to $heartbeatPos."
155+
"returned a heartbeat event, " + "progressing to $heartbeatPos."
145156
} else {
146-
"no progress since last heartbeat."
157+
"no progress since last heartbeat. Will continue polling until timeout is reached. Time remaining in seconds: ${debeziumWaitingTimeRemaining}."
147158
}
148159
}
149160
// wrap up sync if heartbeat position crossed the target OR heartbeat position

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InitialLoadTimeoutUtil.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ private val LOGGER = KotlinLogging.logger {}
1313

1414
object InitialLoadTimeoutUtil {
1515

16-
val MIN_INITIAL_LOAD_TIMEOUT: Duration = Duration.ofHours(4)
16+
val MIN_INITIAL_LOAD_TIMEOUT: Duration = Duration.ofHours(1)
1717
val MAX_INITIAL_LOAD_TIMEOUT: Duration = Duration.ofHours(24)
1818
val DEFAULT_INITIAL_LOAD_TIMEOUT: Duration = Duration.ofHours(8)
1919

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import java.time.Duration
1111
import java.util.*
1212
import org.apache.kafka.connect.source.SourceRecord
1313
import org.junit.jupiter.api.Assertions
14+
import org.junit.jupiter.api.Assertions.assertEquals
1415
import org.junit.jupiter.api.Test
1516
import org.junit.jupiter.params.ParameterizedTest
1617
import org.junit.jupiter.params.provider.CsvSource
@@ -75,6 +76,56 @@ class DebeziumRecordIteratorTest {
7576
return mapper.readTree(testConfig)
7677
}
7778

79+
@Test
80+
fun test_format_duration(): Unit {
81+
val debeziumRecordIterator =
82+
DebeziumRecordIterator(
83+
mock(),
84+
object : CdcTargetPosition<Long> {
85+
override fun reachedTargetPosition(
86+
changeEventWithMetadata: ChangeEventWithMetadata?
87+
): Boolean {
88+
return false
89+
}
90+
91+
override fun extractPositionFromHeartbeatOffset(
92+
sourceOffset: Map<String?, *>
93+
): Long {
94+
return sourceOffset["lsn"] as Long
95+
}
96+
},
97+
{ false },
98+
mock(),
99+
Duration.ZERO,
100+
getTestConfig(), // Heartbeats should not be ignored for tests.
101+
)
102+
val testCases =
103+
listOf(
104+
// include duration and expected output
105+
listOf(Duration.ofNanos(10), "0.00 ms"),
106+
listOf(Duration.ofNanos(40_560), "0.04 ms"),
107+
listOf(Duration.ofNanos(500_000), "0.50 ms"),
108+
listOf(Duration.ofMillis(42), "42.00 ms"),
109+
listOf(Duration.ofMillis(999), "999.00 ms"),
110+
listOf(Duration.ofSeconds(1), "1.00 seconds"),
111+
listOf(Duration.ofMillis(12500), "12.50 seconds"),
112+
listOf(Duration.ofSeconds(45), "45.00 seconds"),
113+
listOf(Duration.ofSeconds(60), "1.00 minutes"),
114+
listOf(Duration.ofSeconds(90), "1.50 minutes"),
115+
listOf(Duration.ofMinutes(30), "30.00 minutes"),
116+
listOf(Duration.ofMinutes(59), "59.00 minutes"),
117+
listOf(Duration.ofMinutes(60), "1.00 hours"),
118+
listOf(Duration.ofMinutes(150), "2.50 hours"),
119+
listOf(Duration.ofHours(5), "5.00 hours")
120+
)
121+
122+
testCases.forEach { testCase ->
123+
val duration = testCase[0] as Duration
124+
val expected = testCase[1] as String
125+
assertEquals(expected, debeziumRecordIterator.formatDuration(duration))
126+
}
127+
}
128+
78129
@ParameterizedTest
79130
@CsvSource(
80131
"c, true",

0 commit comments

Comments
 (0)