From 6b93ad5aa759502a92fee411bff7396eaa08a4ca Mon Sep 17 00:00:00 2001 From: 911432 <30204381+911432@users.noreply.github.com> Date: Wed, 19 Apr 2023 07:22:32 +0000 Subject: [PATCH] Explain how to connect a external table. --- docs/flink-configuration.md | 11 +++ docs/flink-connector.md | 148 ------------------------------------ docs/flink-ddl.md | 33 +++++++- 3 files changed, 40 insertions(+), 152 deletions(-) delete mode 100644 docs/flink-connector.md diff --git a/docs/flink-configuration.md b/docs/flink-configuration.md index 7e531baa3a3e..da0de47766c8 100644 --- a/docs/flink-configuration.md +++ b/docs/flink-configuration.md @@ -75,6 +75,17 @@ The following properties can be set if using the REST catalog: | token | | | A token which will be used to interact with the server. | +## exteral table configuration + +Connect external tables in Flink, We can use the catalog configuration described above and the external table configuration described below. +The following properties can be set if using the external table: + +| Property | Required | Values | Description | +| -----------------|----------| ------ |--------------------------------------------------------------------------------------------------------------------| +| catalog-name | | | User-specified catalog name. It's required because the connector don't have any default value. | +| catalog-database | | | The iceberg database name in the backend catalog, use the current flink database named by default | +| catalog-table | | | The iceberg table name in the backend catalog. Default to use the table name in the flink `CREATE TABLE` sentence. | + ## Runtime configuration ### Read options diff --git a/docs/flink-connector.md b/docs/flink-connector.md deleted file mode 100644 index dc1520a59630..000000000000 --- a/docs/flink-connector.md +++ /dev/null @@ -1,148 +0,0 @@ ---- -title: "Flink Connector" -url: flink-connector -aliases: - - "flink/flink-connector" -menu: - main: - parent: Flink - weight: 200 ---- - - -# Flink Connector -Apache Flink supports creating Iceberg table directly without creating the explicit Flink catalog in Flink SQL. That means we can just create an iceberg table by specifying `'connector'='iceberg'` table option in Flink SQL which is similar to usage in the Flink official [document](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/). - -In Flink, the SQL `CREATE TABLE test (..) WITH ('connector'='iceberg', ...)` will create a Flink table in current Flink catalog (use [GenericInMemoryCatalog](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/catalogs/#genericinmemorycatalog) by default), -which is just mapping to the underlying iceberg table instead of maintaining iceberg table directly in current Flink catalog. - -To create the table in Flink SQL by using SQL syntax `CREATE TABLE test (..) WITH ('connector'='iceberg', ...)`, Flink iceberg connector provides the following table properties: - -* `connector`: Use the constant `iceberg`. -* `catalog-name`: User-specified catalog name. It's required because the connector don't have any default value. -* `catalog-type`: `hive` or `hadoop` for built-in catalogs (defaults to `hive`), or left unset for custom catalog implementations using `catalog-impl`. -* `catalog-impl`: The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. See also [custom catalog](../flink/flink-getting-started.md#custom-catalog) for more details. -* `catalog-database`: The iceberg database name in the backend catalog, use the current flink database name by default. -* `catalog-table`: The iceberg table name in the backend catalog. Default to use the table name in the flink `CREATE TABLE` sentence. - -## Table managed in Hive catalog. - -Before executing the following SQL, please make sure you've configured the Flink SQL client correctly according to the quick start [document](../flink). - -The following SQL will create a Flink table in the current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in iceberg catalog. - -```sql -CREATE TABLE flink_table ( - id BIGINT, - data STRING -) WITH ( - 'connector'='iceberg', - 'catalog-name'='hive_prod', - 'uri'='thrift://localhost:9083', - 'warehouse'='hdfs://nn:8020/path/to/warehouse' -); -``` - -If you want to create a Flink table mapping to a different iceberg table managed in Hive catalog (such as `hive_db.hive_iceberg_table` in Hive), then you can create Flink table as following: - -```sql -CREATE TABLE flink_table ( - id BIGINT, - data STRING -) WITH ( - 'connector'='iceberg', - 'catalog-name'='hive_prod', - 'catalog-database'='hive_db', - 'catalog-table'='hive_iceberg_table', - 'uri'='thrift://localhost:9083', - 'warehouse'='hdfs://nn:8020/path/to/warehouse' -); -``` - -{{< hint info >}} -The underlying catalog database (`hive_db` in the above example) will be created automatically if it does not exist when writing records into the Flink table. -{{< /hint >}} - -## Table managed in hadoop catalog - -The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in hadoop catalog. - -```sql -CREATE TABLE flink_table ( - id BIGINT, - data STRING -) WITH ( - 'connector'='iceberg', - 'catalog-name'='hadoop_prod', - 'catalog-type'='hadoop', - 'warehouse'='hdfs://nn:8020/path/to/warehouse' -); -``` - -## Table managed in custom catalog - -The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table `default_database.flink_table` managed in -a custom catalog of type `com.my.custom.CatalogImpl`. - -```sql -CREATE TABLE flink_table ( - id BIGINT, - data STRING -) WITH ( - 'connector'='iceberg', - 'catalog-name'='custom_prod', - 'catalog-impl'='com.my.custom.CatalogImpl', - -- More table properties for the customized catalog - 'my-additional-catalog-config'='my-value', - ... -); -``` - -Please check sections under the Integrations tab for all custom catalogs. - -## A complete example. - -Take the Hive catalog as an example: - -```sql -CREATE TABLE flink_table ( - id BIGINT, - data STRING -) WITH ( - 'connector'='iceberg', - 'catalog-name'='hive_prod', - 'uri'='thrift://localhost:9083', - 'warehouse'='file:///path/to/warehouse' -); - -INSERT INTO flink_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'); - -SET execution.result-mode=tableau; -SELECT * FROM flink_table; - -+----+------+ -| id | data | -+----+------+ -| 1 | AAA | -| 2 | BBB | -| 3 | CCC | -+----+------+ -3 rows in set -``` - -For more details, please refer to the Iceberg [Flink document](../flink). \ No newline at end of file diff --git a/docs/flink-ddl.md b/docs/flink-ddl.md index 67f9e21d40ad..093db754e795 100644 --- a/docs/flink-ddl.md +++ b/docs/flink-ddl.md @@ -153,7 +153,7 @@ CREATE TABLE `hive_catalog`.`default`.`sample` ( ); ``` -Table create commands support the commonly used [Flink create clauses](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/) including: +Table create commands support the commonly used [Flink create clauses](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/create/) including: * `PARTITION BY (column1, column2, ...)` to configure partitioning, Flink does not yet support hidden partitioning. * `COMMENT 'table document'` to set a table description. @@ -187,15 +187,40 @@ CREATE TABLE `hive_catalog`.`default`.`sample` ( CREATE TABLE `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`; ``` -For more details, refer to the [Flink `CREATE TABLE` documentation](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/). +For more details, refer to the [Flink `CREATE TABLE` documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/create/). +### `CONNECT TO A EXTERNAL TABLE` + +Apache Flink provides access to externally stored iceberg tables that can be used as source or sink. It doesn't hold any data there, so the schema definition only declares how to map columns from an external iceberg table to Flink’s representation. Want to know how to connect to other common external systems, go to the [Flink official document](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/overview/). + +Flink SQL `CREATE TABLE sample_second (..) WITH ('connector'='iceberg', ...)` will access external iceberg table directly. The externally located database is automatically created if it does not exist when writing a record to a Flink table. + +Flink iceberg connector provides table properties. +For more details, refer to the [exteral table configuration document](https://iceberg.apache.org/docs/latest/flink-configuration/#exteral-table-configuration) + +Want to link database and table with different name like `hive_db.hive_iceberg_table`, we can connect table like this: + +```sql +CREATE TABLE `default`. `sample_second` ( + id BIGINT COMMENT 'unique id', + data STRING +) WITH ( + 'connector'='iceberg', + 'catalog-type'='hive', + 'uri'='thrift://localhost:9083', + 'warehouse'='file:///path/to/warehouse' + 'catalog-name'='hive_prod', + 'catalog-database'='hive_db', + 'catalog-table'='hive_iceberg_table', +); +``` ### `ALTER TABLE` Iceberg only support altering table properties: ```sql -ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro') +ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro'); ``` ### `ALTER TABLE .. RENAME TO` @@ -210,4 +235,4 @@ To delete a table, run: ```sql DROP TABLE `hive_catalog`.`default`.`sample`; -``` +``` \ No newline at end of file