Skip to content

Flink create table will make all table options be custom properties #1745

@luoyuxia

Description

@luoyuxia

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

0.7.0 (latest release)

Please describe the bug 🐞

In the definition, custom properties are not understood by Fluss. When using Flink to create fluss table, the option understood by Fluss will be Fluss property. That's right. But the option understood by Fluss will also be set as custom properties.

Can be reproduce as following code:

void testTableConversionWithOptions() {
        Map<String, String> options = new HashMap<>();
        // forward table option & enum type
        options.put(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");
        // forward client memory option
        options.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb");
        // forward client duration option
        options.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");

        ResolvedSchema schema =
                new ResolvedSchema(
                        Collections.singletonList(
                                Column.physical(
                                        "order_id",
                                        org.apache.flink.table.api.DataTypes.STRING().notNull())),
                        Collections.emptyList(),
                        null);
        CatalogTable flinkTable =
                CatalogTable.of(
                        Schema.newBuilder().fromResolvedSchema(schema).build(),
                        "test comment",
                        Collections.emptyList(),
                        options);

        TableDescriptor flussTable =
                FlinkConversions.toFlussTable(new ResolvedCatalogTable(flinkTable, schema));

        assertThat(flussTable.getProperties())
                .containsEntry(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");

        ------ modify as following 
       HashMap<String, String> customProperties = new HashMap<>();
        customProperties.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb");
        customProperties.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
        assertThat(flussTable.getCustomProperties()).containsExactlyEntriesOf(customProperties);
    }

The test will fail,

Actual and expected should have same size but actual size is:
  3
while expected size is:
  2
Actual was:
  {"client.writer.batch-timeout"="32s", "client.writer.buffer.memory-size"="64mb", "table.log.format"="indexed"}
Expected was:
  ["client.writer.buffer.memory-size"="64mb", "client.writer.batch-timeout"="32s"]

"table.log.format"="indexed" is also in custom property.

@wuchong Is an issue needed to be resolved in v0.8

Solution

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions