|
| 1 | +--- |
| 2 | +title: "Flink Configuration" |
| 3 | +url: flink-configuration |
| 4 | +aliases: |
| 5 | + - "flink/flink-configuration" |
| 6 | +menu: |
| 7 | + main: |
| 8 | + parent: Flink |
| 9 | + weight: 600 |
| 10 | +--- |
| 11 | +<!-- |
| 12 | + - Licensed to the Apache Software Foundation (ASF) under one or more |
| 13 | + - contributor license agreements. See the NOTICE file distributed with |
| 14 | + - this work for additional information regarding copyright ownership. |
| 15 | + - The ASF licenses this file to You under the Apache License, Version 2.0 |
| 16 | + - (the "License"); you may not use this file except in compliance with |
| 17 | + - the License. You may obtain a copy of the License at |
| 18 | + - |
| 19 | + - http://www.apache.org/licenses/LICENSE-2.0 |
| 20 | + - |
| 21 | + - Unless required by applicable law or agreed to in writing, software |
| 22 | + - distributed under the License is distributed on an "AS IS" BASIS, |
| 23 | + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 24 | + - See the License for the specific language governing permissions and |
| 25 | + - limitations under the License. |
| 26 | + --> |
| 27 | + |
| 28 | +# Flink Configuration |
| 29 | + |
| 30 | +## Catalog Configuration |
| 31 | + |
| 32 | +A catalog is created and named by executing the following query (replace `<catalog_name>` with your catalog name and |
| 33 | +`<config_key>`=`<config_value>` with catalog implementation config): |
| 34 | + |
| 35 | +```sql |
| 36 | +CREATE CATALOG <catalog_name> WITH ( |
| 37 | + 'type'='iceberg', |
| 38 | + `<config_key>`=`<config_value>` |
| 39 | +); |
| 40 | +``` |
| 41 | + |
| 42 | +The following properties can be set globally and are not limited to a specific catalog implementation: |
| 43 | + |
| 44 | +| Property | Required | Values | Description | |
| 45 | +| ---------------------------- |----------| -------------------------- |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| 46 | +| type | ✔️ | iceberg | Must be `iceberg`. | |
| 47 | +| catalog-type | | `hive`, `hadoop` or `rest` | `hive`, `hadoop` or `rest` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. | |
| 48 | +| catalog-impl | | | The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. | |
| 49 | +| property-version | | | Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is `1`. | |
| 50 | +| cache-enabled | | `true` or `false` | Whether to enable catalog cache, default value is `true`. | |
| 51 | +| cache.expiration-interval-ms | | | How long catalog entries are locally cached, in milliseconds; negative values like `-1` will disable expiration, value 0 is not allowed to set. default value is `-1`. | |
| 52 | + |
| 53 | +The following properties can be set if using the Hive catalog: |
| 54 | + |
| 55 | +| Property | Required | Values | Description | |
| 56 | +| --------------- |----------| ------ |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| 57 | +| uri | ✔️ | | The Hive metastore's thrift URI. | |
| 58 | +| clients | | | The Hive metastore client pool size, default value is 2. | |
| 59 | +| warehouse | | | The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath. | |
| 60 | +| hive-conf-dir | | | Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwritten with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog. | |
| 61 | +| hadoop-conf-dir | | | Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values. | |
| 62 | + |
| 63 | +The following properties can be set if using the Hadoop catalog: |
| 64 | + |
| 65 | +| Property | Required | Values | Description | |
| 66 | +| --------- |-------------| ------ | ---------------------------------------------------------- | |
| 67 | +| warehouse | ✔️ | | The HDFS directory to store metadata files and data files. | |
| 68 | + |
| 69 | +The following properties can be set if using the REST catalog: |
| 70 | + |
| 71 | +| Property | Required | Values | Description | |
| 72 | +| ---------- |----------| ------ |-----------------------------------------------------------------------------| |
| 73 | +| uri | ✔️ | | The URL to the REST Catalog. | |
| 74 | +| credential | | | A credential to exchange for a token in the OAuth2 client credentials flow. | |
| 75 | +| token | | | A token which will be used to interact with the server. | |
| 76 | + |
| 77 | + |
| 78 | +## Runtime configuration |
| 79 | + |
| 80 | +### Read options |
| 81 | + |
| 82 | +Flink read options are passed when configuring the Flink IcebergSource: |
| 83 | + |
| 84 | +``` |
| 85 | +IcebergSource.forRowData() |
| 86 | + .tableLoader(TableLoader.fromCatalog(...)) |
| 87 | + .assignerFactory(new SimpleSplitAssignerFactory()) |
| 88 | + .streaming(true) |
| 89 | + .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT) |
| 90 | + .startSnapshotId(3821550127947089987L) |
| 91 | + .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s") |
| 92 | + .build() |
| 93 | +``` |
| 94 | + |
| 95 | +For Flink SQL, read options can be passed in via SQL hints like this: |
| 96 | + |
| 97 | +``` |
| 98 | +SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */ |
| 99 | +... |
| 100 | +``` |
| 101 | + |
| 102 | +Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode. |
| 103 | + |
| 104 | +``` |
| 105 | +env.getConfig() |
| 106 | + .getConfiguration() |
| 107 | + .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L); |
| 108 | +... |
| 109 | +``` |
| 110 | + |
| 111 | +`Read option` has the highest priority, followed by `Flink configuration` and then `Table property`. |
| 112 | + |
| 113 | +| Read option | Flink configuration | Table property | Default | Description | |
| 114 | +| --------------------------- | --------------------------------------------- | ---------------------------- | -------------------------------- | ------------------------------------------------------------ | |
| 115 | +| snapshot-id | N/A | N/A | null | For time travel in batch mode. Read data from the specified snapshot-id. | |
| 116 | +| case-sensitive | connector.iceberg.case-sensitive | N/A | false | If true, match column name in a case sensitive way. | |
| 117 | +| as-of-timestamp | N/A | N/A | null | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. | |
| 118 | +| starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just for FIP27 Source. | |
| 119 | +| start-snapshot-timestamp | N/A | N/A | null | Start to read data from the most recent snapshot as of the given time in milliseconds. | |
| 120 | +| start-snapshot-id | N/A | N/A | null | Start to read data from the specified snapshot-id. | |
| 121 | +| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. | |
| 122 | +| split-size | connector.iceberg.split-size | read.split.target-size | 128 MB | Target size when combining input splits. | |
| 123 | +| split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | 10 | Number of bins to consider when combining input splits. | |
| 124 | +| split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | 4MB | The estimated cost to open a file, used as a minimum weight when combining splits. | |
| 125 | +| streaming | connector.iceberg.streaming | N/A | false | Sets whether the current task runs in streaming or batch mode. | |
| 126 | +| monitor-interval | connector.iceberg.monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. | |
| 127 | +| include-column-stats | connector.iceberg.include-column-stats | N/A | false | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. | |
| 128 | +| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. | |
| 129 | +| limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. | |
| 130 | + |
| 131 | + |
| 132 | +### Write options |
| 133 | + |
| 134 | +Flink write options are passed when configuring the FlinkSink, like this: |
| 135 | + |
| 136 | +``` |
| 137 | +FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) |
| 138 | + .table(table) |
| 139 | + .tableLoader(tableLoader) |
| 140 | + .set("write-format", "orc") |
| 141 | + .set(FlinkWriteOptions.OVERWRITE_MODE, "true"); |
| 142 | +``` |
| 143 | + |
| 144 | +For Flink SQL, write options can be passed in via SQL hints like this: |
| 145 | + |
| 146 | +``` |
| 147 | +INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */ |
| 148 | +... |
| 149 | +``` |
| 150 | + |
| 151 | +| Flink option | Default | Description | |
| 152 | +| ---------------------- | ------------------------------------------ | ------------------------------------------------------------ | |
| 153 | +| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc | |
| 154 | +| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes | |
| 155 | +| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled | |
| 156 | +| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. | |
| 157 | +| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode | |
| 158 | +| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write | |
| 159 | +| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write | |
| 160 | +| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write | |
0 commit comments