diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 05414cdd7..26116589d 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -131,7 +131,10 @@ public void startPartition(final String tableName, final TopicPartition topicPar pipeName, conn, topicPartition.partition(), - cleanerServiceExecutor)); + cleanerServiceExecutor, + nameIndex + ) + ); } } @@ -376,6 +379,7 @@ private class ServiceContext { private final String pipeName; private final SnowflakeConnectionService conn; private final SnowflakeIngestionService ingestionService; + private final String nameIndex; private List fileNames; // Includes a list of files: @@ -421,7 +425,10 @@ private ServiceContext( String pipeName, SnowflakeConnectionService conn, int partition, - ScheduledExecutorService v2CleanerExecutor) { + ScheduledExecutorService v2CleanerExecutor, + String nameIndex + ) { + this.nameIndex = nameIndex; this.pipeName = pipeName; this.tableName = tableName; this.stageName = stageName; @@ -470,7 +477,9 @@ private ServiceContext( ingestionService, pipeStatus, telemetryService, - v2CleanerExecutor); + v2CleanerExecutor, + nameIndex + ); this.stageFileProcessorClient = processor.trackFilesAsync(); this.cleanerExecutor = null; this.reprocessCleanerExecutor = null; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java b/src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java index 31413753c..84080a687 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java @@ -75,6 +75,7 @@ class StageFilesProcessor { // can // save us from hitting "too many requests - 429 status code" private static final long CLEANUP_PERIOD_SECONDS = 61; + private final String nameIndex; /** * Client interface for the StageFileProcessor - allows thread safe registration of new files and @@ -103,7 +104,9 @@ public StageFilesProcessor( SnowflakeIngestionService ingestionService, SnowflakeTelemetryPipeStatus pipeTelemetry, SnowflakeTelemetryService telemetryService, - ScheduledExecutorService schedulingExecutor) { + ScheduledExecutorService schedulingExecutor, + String nameIndex + ) { this( pipeName, tableName, @@ -114,7 +117,9 @@ public StageFilesProcessor( pipeTelemetry, telemetryService, schedulingExecutor, - System::currentTimeMillis); + System::currentTimeMillis, + nameIndex + ); } @VisibleForTesting @@ -128,7 +133,9 @@ public StageFilesProcessor( SnowflakeTelemetryPipeStatus pipeTelemetry, SnowflakeTelemetryService telemetryService, ScheduledExecutorService schedulingExecutor, - TimeSupplier currentTimeSupplier) { + TimeSupplier currentTimeSupplier, + String nameIndex + ) { this.pipeName = pipeName; this.tableName = tableName; this.stageName = stageName; @@ -140,6 +147,7 @@ public StageFilesProcessor( this.pipeTelemetry = pipeTelemetry; this.schedulingExecutor = schedulingExecutor; this.filters = new FilteringPredicates(currentTimeSupplier, prefix); + this.nameIndex = nameIndex; } /** @@ -172,7 +180,7 @@ public ProgressRegister trackFilesAsync() { */ @VisibleForTesting void trackFiles(ProgressRegisterImpl register, PipeProgressRegistryTelemetry progressTelemetry) { - LOGGER.info("Starting file cleaner for pipe {} ...", pipeName); + LOGGER.info("Starting file cleaner for pipe {} ...{}", pipeName, nameIndex); AtomicBoolean shouldFetchInitialStageFiles = new AtomicBoolean(true); AtomicBoolean isFirstRun = new AtomicBoolean(true); @@ -197,16 +205,17 @@ void trackFiles(ProgressRegisterImpl register, PipeProgressRegistryTelemetry pro // initialize state based on the remote stage state (do it on first call or after // error) if (shouldFetchInitialStageFiles.getAndSet(false)) { + LOGGER.info("Getting in first time nameIndex {}", nameIndex); initializeCleanStartState(ctx, isFirstRun.get()); isFirstRun.set(false); } LOGGER.debug( - "cleanup cycle {} for pipe {} with {} files and history with {} entries", + "cleanup cycle {} for pipe {} with {} files and history with {} entries {}", ctx.cleanupCycle.incrementAndGet(), pipeName, ctx.files.size(), - ctx.ingestHistory.size()); + ctx.ingestHistory.size(), nameIndex); // process the files, store the spillover ones for the next cycle. in case of an // error - we @@ -217,11 +226,11 @@ void trackFiles(ProgressRegisterImpl register, PipeProgressRegistryTelemetry pro } catch (Exception e) { progressTelemetry.reportKafkaConnectFatalError(e.getMessage()); LOGGER.warn( - "Cleaner encountered an exception {} in cycle {}:\n{}\n{}", + "Cleaner encountered an exception {} in cycle {}:\n{}\n{}, {}", e.getClass(), ctx.cleanupCycle.get(), e.getMessage(), - e.getStackTrace()); + e.getStackTrace(), nameIndex); shouldFetchInitialStageFiles.set(true); hadError.set(true); @@ -257,13 +266,18 @@ private void initializeCleanStartState(ProcessorContext ctx, boolean firstRun) { // state ctx.ingestHistory.clear(); ctx.historyMarker.set(null); - LOGGER.debug("for pipe {} found {} file(s) on remote stage", pipeName, remoteStageFiles.size()); + LOGGER.debug("for pipe {} found {} file(s) on remote stage {}", pipeName, remoteStageFiles.size(), nameIndex); } private void nextCheck(ProcessorContext ctx, ProgressRegisterImpl register, boolean hadErrors) { // make first categorization - split files into these with start offset higher than current FileCategorizer fileCategories = FileCategorizer.build(ctx.files, register.offset.get()); + LOGGER.info("DirtyFiles : {}, Staging Files : {}, {}, {}", + String.join(", ", fileCategories.dirtyFiles), + String.join(", ", fileCategories.stageFiles.keySet()), + nameIndex, pipeName, register.offset.get() + ); if (hadErrors) { ctx.progressTelemetry.updateStatsAfterError( @@ -300,10 +314,10 @@ private void nextCheck(ProcessorContext ctx, ProgressRegisterImpl register, bool fileCategories.query(filters.trackableFilesPredicate).collect(Collectors.toList()); cleanOldHistory(ctx); LOGGER.debug( - "keep {} files and {} history entries for next cycle for pipe {}", + "keep {} files and {} history entries for next cycle for pipe {}, {}", filesToTrack.size(), ctx.ingestHistory.size(), - pipeName); + pipeName, nameIndex); ctx.files.clear(); ctx.files.addAll(filesToTrack); ctx.files.addAll(fileCategories.dirtyFiles); @@ -346,10 +360,10 @@ private void checkAndRefreshStaleFiles(FileCategorizer fileCategorizer, Processo // new or modified // readOneHourHistory method though) LOGGER.debug( - "Checking stale file history for pipe: {}, staleFileCount: {}, staleFiles:{}", + "Checking stale file history for pipe: {}, staleFileCount: {}, staleFiles:{}, {}", pipeName, staleFiles.size(), - String.join(", ", staleFiles)); + String.join(", ", staleFiles), nameIndex); Map report = ingestionService.readOneHourHistory(staleFiles, historyWindow); @@ -377,10 +391,10 @@ private void purgeLoadedFiles( if (!loadedFiles.isEmpty()) { LOGGER.debug( - "Purging loaded files for pipe: {}, loadedFileCount: {}, loadedFiles:{}", + "Purging loaded files for pipe: {}, loadedFileCount: {}, loadedFiles:{}, {}", pipeName, loadedFiles.size(), - String.join(", ", loadedFiles)); + String.join(", ", loadedFiles), nameIndex); conn.purgeStage(stageName, loadedFiles); stopTrackingFiles(loadedFiles, fileCategorizer, ctx); @@ -397,10 +411,10 @@ private void moveFailedFiles( fileCategorizer.query(filters.failedFilesPredicate).collect(Collectors.toList()); if (!failedFiles.isEmpty()) { LOGGER.debug( - "Moving failed files for pipe:{} to tableStage failedFileCount:{}, failedFiles:{}", + "Moving failed files for pipe:{} to tableStage failedFileCount:{}, failedFiles:{}, {}", pipeName, failedFiles.size(), - String.join(", ", failedFiles)); + String.join(", ", failedFiles), nameIndex); conn.moveToTableStage(tableName, stageName, failedFiles); stopTrackingFiles(failedFiles, fileCategorizer, ctx); onMoveFiles.accept(failedFiles.size()); @@ -440,18 +454,18 @@ private void purgeDirtyFiles(Set files) { try { LOGGER.info( "Purging files already present on the stage for pipe{} before start." - + " reprocessFileSize:{}, files: {}", + + " reprocessFileSize:{}, files: {}, {}", pipeName, files.size(), - String.join(", ", files)); + String.join(", ", files), nameIndex); conn.purgeStage(stageName, new ArrayList<>(files)); files.clear(); } catch (Exception e) { LOGGER.error( - "Reprocess cleaner encountered an exception {}:\n{}\n{}", + "Reprocess cleaner encountered an exception {}:\n{}\n{}, {}", e.getClass(), e.getMessage(), - e.getStackTrace()); + e.getStackTrace(), nameIndex); } } @@ -540,13 +554,13 @@ public ProgressRegisterImpl(StageFilesProcessor filesProcessor) { @Override public void registerNewStageFile(String fileName) { - LOGGER.debug("Start tracking new file {}", fileName); + LOGGER.debug("Start tracking new file {}, {}", fileName, owner.get().nameIndex); files.add(fileName); } @Override public void newOffset(long offset) { - LOGGER.trace("New offset: {}", offset); + LOGGER.trace("New offset: {}, {}", offset, owner.get().nameIndex); this.offset.set(offset); } @@ -564,9 +578,9 @@ private void transferFilesToContext(ProcessorContext ctx) { files.drainTo(freshFiles); StageFilesProcessor processor = owner.get(); LOGGER.debug( - "collected {} files for processing for pipe {}", + "collected {} files for processing for pipe {}, files {}, {}", freshFiles.size(), - processor == null ? "n/a" : processor.pipeName); + processor == null ? "n/a" : processor.pipeName, owner.get().nameIndex); ctx.files.addAll(freshFiles); } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java b/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java index e9d4c617b..63da8069f 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java @@ -84,7 +84,9 @@ private void createFileProcessor(int ticks) { pipeTelemetry, telemetryService, createTestScheduler(ticks, currentTime, nextTickCallback, scheduledFuture), - currentTime::get); + currentTime::get, + "nameIndex" + ); register = new StageFilesProcessor.ProgressRegisterImpl(victim); }