diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
new file mode 100644
index 0000000000..6f0d3f5897
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Iterator;
+
+import com.google.common.base.Optional;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.source.extractor.filebased.FileBasedExtractor;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import org.apache.gobblin.util.WriterUtils;
+
+/**
+ * Extractor for file streaming mode that creates FileAwareInputStream for each file.
+ *
+ * This extractor is used when {@code iceberg.record.processing.enabled=false} to stream
+ * OpenHouse table files as binary data to destinations like Azure, HDFS
+ *
+ * Each "record" is a {@link FileAwareInputStream} representing one file from
+ * the OpenHouse table. The downstream writer handles streaming the file content.
+ */
+@Slf4j
+public class IcebergFileStreamExtractor extends FileBasedExtractor {
+
+ public IcebergFileStreamExtractor(WorkUnitState workUnitState) throws IOException {
+ super(workUnitState, new IcebergFileStreamHelper(workUnitState));
+ }
+
+ @Override
+ public String getSchema() {
+ // For file streaming, schema is not used by IdentityConverter; returning a constant
+ return "FileAwareInputStream";
+ }
+
+ @Override
+ public Iterator downloadFile(String filePath) throws IOException {
+ throw new NotImplementedException("Not yet implemented");
+ }
+
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
new file mode 100644
index 0000000000..3fe82818d0
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import org.apache.gobblin.source.extractor.filebased.TimestampAwareFileBasedHelper;
+
+/**
+ * File-based helper for Iceberg file streaming operations.
+ *
+ * This helper supports file streaming mode where OpenHouse table files
+ * are streamed as binary data without record-level processing.
+ */
+@Slf4j
+public class IcebergFileStreamHelper implements TimestampAwareFileBasedHelper {
+
+ private final State state;
+ private final Configuration configuration;
+ private FileSystem fileSystem;
+
+ public IcebergFileStreamHelper(State state) {
+ this.state = state;
+ this.configuration = new Configuration();
+
+ // Add any Hadoop configuration from job properties
+ for (String key : state.getPropertyNames()) {
+ if (key.startsWith("fs.") || key.startsWith("hadoop.")) {
+ configuration.set(key, state.getProp(key));
+ }
+ }
+ }
+
+ @Override
+ public void connect() throws FileBasedHelperException {
+ try {
+ this.fileSystem = FileSystem.get(configuration);
+ log.info("Connected to Iceberg file stream helper with FileSystem: {}", fileSystem.getClass().getSimpleName());
+ } catch (IOException e) {
+ throw new FileBasedHelperException("Failed to initialize FileSystem for Iceberg file streaming", e);
+ }
+ }
+
+ @Override
+ public List ls(String path) throws FileBasedHelperException {
+ try {
+ // For Iceberg, file discovery is handled by IcebergSource
+ // This method returns files from work unit configuration
+ List filesToPull = state.getPropAsList(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, "");
+ log.debug("Returning {} files for processing", filesToPull.size());
+ return filesToPull;
+ } catch (Exception e) {
+ throw new FileBasedHelperException("Failed to list files", e);
+ }
+ }
+
+ @Override
+ public InputStream getFileStream(String filePath) throws FileBasedHelperException {
+ try {
+ Path path = new Path(filePath);
+ FileSystem fs = getFileSystemForPath(path);
+ return fs.open(path);
+ } catch (IOException e) {
+ throw new FileBasedHelperException("Failed to get file stream for: " + filePath, e);
+ }
+ }
+
+ @Override
+ public long getFileSize(String filePath) throws FileBasedHelperException {
+ try {
+ Path path = new Path(filePath);
+ FileSystem fs = getFileSystemForPath(path);
+ return fs.getFileStatus(path).getLen();
+ } catch (IOException e) {
+ throw new FileBasedHelperException("Failed to get file size for: " + filePath, e);
+ }
+ }
+
+ @Override
+ public long getFileMTime(String filePath) throws FileBasedHelperException {
+ try {
+ Path path = new Path(filePath);
+ FileSystem fs = getFileSystemForPath(path);
+ return fs.getFileStatus(path).getModificationTime();
+ } catch (IOException e) {
+ throw new FileBasedHelperException("Failed to get file modification time for: " + filePath, e);
+ }
+ }
+
+ private FileSystem getFileSystemForPath(Path path) throws IOException {
+ // If path has a different scheme than the default FileSystem, get scheme-specific FS
+ if (path.toUri().getScheme() != null &&
+ !path.toUri().getScheme().equals(fileSystem.getUri().getScheme())) {
+ return path.getFileSystem(configuration);
+ }
+ return fileSystem;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (fileSystem != null) {
+ try {
+ fileSystem.close();
+ log.info("Closed Iceberg file stream helper and FileSystem connection");
+ } catch (IOException e) {
+ log.warn("Error closing FileSystem connection", e);
+ throw e;
+ }
+ } else {
+ log.debug("Closing Iceberg file stream helper - no FileSystem to close");
+ }
+ }
+
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
new file mode 100644
index 0000000000..7a6cfe52b4
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import org.apache.gobblin.source.extractor.filebased.FileBasedSource;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitWeighter;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.binpacking.FieldWeighter;
+import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+/**
+ * Unified Iceberg source that supports partition-based data copying from Iceberg tables.
+ *
+ * This source reads job configuration, applies date partition filters with optional lookback period,
+ * and uses Iceberg's TableScan API to enumerate data files for specific partitions. It groups files
+ * into work units for parallel processing.
+ *
+ *
+ */
+@Slf4j
+public class IcebergSource extends FileBasedSource {
+
+ public static final String ICEBERG_DATABASE_NAME = "iceberg.database.name";
+ public static final String ICEBERG_TABLE_NAME = "iceberg.table.name";
+ public static final String ICEBERG_CATALOG_URI = "iceberg.catalog.uri";
+ public static final String ICEBERG_CATALOG_CLASS = "iceberg.catalog.class";
+ public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
+ public static final String ICEBERG_RECORD_PROCESSING_ENABLED = "iceberg.record.processing.enabled";
+ public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false;
+ public static final String ICEBERG_FILES_PER_WORKUNIT = "iceberg.files.per.workunit";
+ public static final int DEFAULT_FILES_PER_WORKUNIT = 10;
+ public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled";
+ public static final String ICEBERG_FILTER_EXPR = "iceberg.filter.expr"; // e.g., datepartition=2025-04-01
+ public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days";
+ public static final int DEFAULT_LOOKBACK_DAYS = 1;
+ public static final String ICEBERG_DATE_PARTITION_KEY = "datepartition"; // date partition key name
+ public static final String ICEBERG_PARTITION_KEY = "iceberg.partition.key";
+ public static final String ICEBERG_PARTITION_VALUES = "iceberg.partition.values";
+ public static final String ICEBERG_FILE_PARTITION_PATH = "iceberg.file.partition.path";
+ public static final String ICEBERG_SIMULATE = "iceberg.simulate";
+ public static final String ICEBERG_MAX_SIZE_MULTI_WORKUNITS = "iceberg.binPacking.maxSizePerBin";
+ public static final String ICEBERG_MAX_WORK_UNITS_PER_BIN = "iceberg.binPacking.maxWorkUnitsPerBin";
+ private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight";
+
+ private Optional lineageInfo;
+ private final WorkUnitWeighter weighter = new FieldWeighter(WORK_UNIT_WEIGHT);
+
+ /**
+ * Initialize file system helper based on mode (streaming vs record processing)
+ */
+ @Override
+ public void initFileSystemHelper(State state) throws FileBasedHelperException {
+ // For file streaming mode, we use IcebergFileStreamHelper
+ // For record processing mode, we'll use a different helper (future implementation)
+ boolean recordProcessingEnabled = state.getPropAsBoolean(
+ ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED);
+
+ if (recordProcessingEnabled) {
+ // Future: Initialize helper for record processing
+ throw new UnsupportedOperationException("Record processing mode not yet implemented. " +
+ "This will be added when SQL/Data Lake destinations are required.");
+ } else {
+ // Initialize helper for file streaming - now implements TimestampAwareFileBasedHelper
+ this.fsHelper = new IcebergFileStreamHelper(state);
+ this.fsHelper.connect();
+ }
+ }
+
+ /**
+ * Get work units by discovering files from Iceberg table
+ * @param state is the source state
+ * @return List list of work units
+ */
+ @Override
+ public List getWorkunits(SourceState state) {
+ this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
+
+ try {
+ initFileSystemHelper(state);
+
+ validateConfiguration(state);
+
+ IcebergCatalog catalog = createCatalog(state);
+ String database = state.getProp(ICEBERG_DATABASE_NAME);
+ String table = state.getProp(ICEBERG_TABLE_NAME);
+
+ IcebergTable icebergTable = catalog.openTable(database, table);
+ Preconditions.checkArgument(catalog.tableAlreadyExists(icebergTable),
+ String.format("OpenHouse table not found: %s.%s", database, table));
+
+ List filesWithPartitions = discoverPartitionFilePaths(state, icebergTable);
+ log.info("Discovered {} files from Iceberg table {}.{}", filesWithPartitions.size(), database, table);
+
+ // Create work units from discovered files
+ List workUnits = createWorkUnitsFromFiles(filesWithPartitions, state, icebergTable);
+
+ // Handle simulate mode - log what would be copied without executing
+ if (state.contains(ICEBERG_SIMULATE) && state.getPropAsBoolean(ICEBERG_SIMULATE)) {
+ log.info("Simulate mode enabled. Will not execute the copy.");
+ logSimulateMode(workUnits, filesWithPartitions, state);
+ return Lists.newArrayList();
+ }
+
+ // Apply bin packing to work units if configured
+ List extends WorkUnit> packedWorkUnits = applyBinPacking(workUnits, state);
+ log.info("Work unit creation complete. Initial work units: {}, packed work units: {}",
+ workUnits.size(), packedWorkUnits.size());
+
+ return Lists.newArrayList(packedWorkUnits);
+
+ } catch (Exception e) {
+ log.error("Failed to create work units for Iceberg table", e);
+ throw new RuntimeException("Failed to create work units", e);
+ }
+ }
+
+ /**
+ * Get extractor based on mode (streaming vs record processing)
+ *
+ * @param state a {@link org.apache.gobblin.configuration.WorkUnitState} carrying properties needed by the returned {@link Extractor}
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public Extractor getExtractor(WorkUnitState state) throws IOException {
+ boolean recordProcessingEnabled = state.getPropAsBoolean(
+ ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED);
+
+ if (recordProcessingEnabled) {
+ // Return record processing extractor
+ throw new UnsupportedOperationException("Record processing mode not yet implemented.");
+ } else {
+ // Return file streaming extractor
+ return new IcebergFileStreamExtractor(state);
+ }
+ }
+
+ /**
+ * Discover partition data files using Iceberg TableScan API with optional lookback for date partitions.
+ *
+ *
This method supports two modes:
+ *
+ *
Full table scan: When {@code iceberg.filter.enabled=false}, returns all data files from current snapshot
+ *
Partition filter: When {@code iceberg.filter.enabled=true}, uses Iceberg TableScan with partition
+ * filter and applies lookback period for date partitions
+ *
+ *
+ *
For date partitions (partition key = {@value #ICEBERG_DATE_PARTITION_KEY}), the lookback period allows copying data for the last N days.
+ * For example, with {@code iceberg.filter.expr=datepartition=2025-04-03} and {@code iceberg.lookback.days=3},
+ * this will discover files for partitions: datepartition=2025-04-03, datepartition=2025-04-02, datepartition=2025-04-01
+ *
+ * @param state source state containing filter configuration
+ * @param icebergTable the Iceberg table to scan
+ * @return list of file paths with partition metadata matching the filter criteria
+ * @throws IOException if table scan or file discovery fails
+ */
+ private List discoverPartitionFilePaths(SourceState state, IcebergTable icebergTable) throws IOException {
+ boolean filterEnabled = state.getPropAsBoolean(ICEBERG_FILTER_ENABLED, true);
+
+ if (!filterEnabled) {
+ log.info("Partition filter disabled, discovering all data files from current snapshot");
+ IcebergSnapshotInfo snapshot = icebergTable.getCurrentSnapshotInfo();
+ List result = Lists.newArrayList();
+ for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshot.getManifestFiles()) {
+ for (String filePath : mfi.getListedFilePaths()) {
+ result.add(new IcebergTable.FilePathWithPartition(filePath, Maps.newHashMap()));
+ }
+ }
+ log.info("Discovered {} data files from snapshot", result.size());
+ return result;
+ }
+
+ // Parse filter expression
+ String expr = state.getProp(ICEBERG_FILTER_EXPR);
+ Preconditions.checkArgument(!StringUtils.isBlank(expr),
+ "iceberg.filter.expr is required when iceberg.filter.enabled=true");
+ String[] parts = expr.split("=", 2);
+ Preconditions.checkArgument(parts.length == 2,
+ "Invalid iceberg.filter.expr. Expected key=value, got: %s", expr);
+ String key = parts[0].trim();
+ String value = parts[1].trim();
+
+ // Apply lookback period for date partitions
+ // lookbackDays=1 (default) means copy only the specified date
+ // lookbackDays=3 means copy specified date + 2 previous days (total 3 days)
+ int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS);
+ List values = Lists.newArrayList();
+
+ if (ICEBERG_DATE_PARTITION_KEY.equals(key) && lookbackDays >= 1) {
+ log.info("Applying lookback period of {} days for date partition: {}", lookbackDays, value);
+ LocalDate start = LocalDate.parse(value);
+ for (int i = 0; i < lookbackDays; i++) {
+ String partitionValue = start.minusDays(i).toString();
+ values.add(partitionValue);
+ log.debug("Including partition: {}={}", ICEBERG_DATE_PARTITION_KEY, partitionValue);
+ }
+ } else {
+ log.error("Partition key is not correct or lookbackDays < 1, skipping lookback. Input: {}={}, expected: {}=",
+ key, value, ICEBERG_DATE_PARTITION_KEY);
+ throw new IllegalArgumentException(String.format(
+ "Only date partition filter with lookback period is supported. Expected partition key: '%s', got: '%s'",
+ ICEBERG_DATE_PARTITION_KEY, key));
+ }
+
+ // Store partition info on state for downstream use (extractor, destination path mapping)
+ state.setProp(ICEBERG_PARTITION_KEY, key);
+ state.setProp(ICEBERG_PARTITION_VALUES, String.join(",", values));
+
+ // Use Iceberg TableScan API to get only data files (parquet/orc/avro) for specified partitions
+ // TableScan.planFiles() returns DataFiles only - no manifest files or metadata files
+ log.info("Executing TableScan with filter: {}={}", key, values);
+ Expression icebergExpr = null;
+ for (String val : values) {
+ Expression e = Expressions.equal(key, val);
+ icebergExpr = (icebergExpr == null) ? e : Expressions.or(icebergExpr, e);
+ }
+
+ List filesWithPartitions = icebergTable.getFilePathsWithPartitionsForFilter(icebergExpr);
+ log.info("Discovered {} data files for partitions: {}", filesWithPartitions.size(), values);
+
+ return filesWithPartitions;
+ }
+
+ /**
+ * Create work units from discovered file paths by grouping them for parallel processing.
+ *
+ * Files are grouped into work units based on {@code iceberg.files.per.workunit} configuration.
+ * Each work unit contains metadata about the files to process.
+ *
+ * @param filesWithPartitions list of file paths with partition metadata to process
+ * @param state source state containing job configuration
+ * @param table the Iceberg table being copied
+ * @return list of work units ready for parallel execution
+ */
+ private List createWorkUnitsFromFiles(List filesWithPartitions, SourceState state, IcebergTable table) {
+ List workUnits = Lists.newArrayList();
+
+ if (filesWithPartitions.isEmpty()) {
+ log.warn("No files discovered for table {}.{}, returning empty work unit list",
+ state.getProp(ICEBERG_DATABASE_NAME), state.getProp(ICEBERG_TABLE_NAME));
+ return workUnits;
+ }
+
+ String nameSpace = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "iceberg");
+ String tableName = table.getTableId().name();
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, nameSpace, tableName);
+
+ int filesPerWorkUnit = state.getPropAsInt(ICEBERG_FILES_PER_WORKUNIT, DEFAULT_FILES_PER_WORKUNIT);
+ List> groups = Lists.partition(filesWithPartitions, Math.max(1, filesPerWorkUnit));
+ log.info("Grouping {} files into {} work units ({} files per work unit)",
+ filesWithPartitions.size(), groups.size(), filesPerWorkUnit);
+
+ for (int i = 0; i < groups.size(); i++) {
+ List group = groups.get(i);
+ WorkUnit workUnit = new WorkUnit(extract);
+
+ // Store data file paths and their partition metadata separately
+ // Note: Only data files (parquet/orc/avro) are included, no Iceberg metadata files
+ List filePaths = Lists.newArrayList();
+ Map fileToPartitionPath = Maps.newHashMap();
+ long totalSize = 0L;
+
+ for (IcebergTable.FilePathWithPartition fileWithPartition : group) {
+ String filePath = fileWithPartition.getFilePath();
+ filePaths.add(filePath);
+ // Store partition path for each file
+ fileToPartitionPath.put(filePath, fileWithPartition.getPartitionPath());
+ // Accumulate file sizes for work unit weight
+ totalSize += fileWithPartition.getFileSize();
+ }
+
+ workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, String.join(",", filePaths));
+
+ // Store partition path mapping as JSON for extractor to use
+ workUnit.setProp(ICEBERG_FILE_PARTITION_PATH, new com.google.gson.Gson().toJson(fileToPartitionPath));
+
+ // Set work unit size for dynamic scaling (instead of just file count)
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, totalSize);
+
+ // Set work unit weight for bin packing
+ setWorkUnitWeight(workUnit, totalSize);
+
+ // Carry partition info to extractor for destination path mapping
+ if (state.contains(ICEBERG_PARTITION_KEY)) {
+ workUnit.setProp(ICEBERG_PARTITION_KEY, state.getProp(ICEBERG_PARTITION_KEY));
+ }
+ if (state.contains(ICEBERG_PARTITION_VALUES)) {
+ workUnit.setProp(ICEBERG_PARTITION_VALUES, state.getProp(ICEBERG_PARTITION_VALUES));
+ }
+
+ // Add lineage information for data governance and tracking
+ addLineageSourceInfo(state, workUnit, table);
+ workUnits.add(workUnit);
+
+ log.debug("Created work unit {} with {} files, total size: {} bytes", i, group.size(), totalSize);
+ }
+
+ return workUnits;
+ }
+
+ /**
+ * Create catalog using existing IcebergDatasetFinder logic
+ */
+ private IcebergCatalog createCatalog(SourceState state) throws IOException {
+ String catalogPrefix = "iceberg.catalog.";
+ Map catalogProperties = buildMapFromPrefixChildren(state.getProperties(), catalogPrefix);
+
+ Configuration configuration = HadoopUtils.getConfFromProperties(state.getProperties());
+ String catalogClassName = catalogProperties.getOrDefault("class", DEFAULT_ICEBERG_CATALOG_CLASS);
+
+ return IcebergCatalogFactory.create(catalogClassName, catalogProperties, configuration);
+ }
+
+ /**
+ * Build map of properties with given prefix
+ *
+ */
+ private Map buildMapFromPrefixChildren(Properties properties, String configPrefix) {
+ Map catalogProperties = Maps.newHashMap();
+
+ for (Map.Entry