Skip to content

Fluss sink supports dynamic shuffle #1789

@loserwang1024

Description

@loserwang1024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

The current Flink sink supports bucket-based shuffle by bucket ID. It's aim to improve batching efficiency: Data in the same bucket is always written by a single subtask, ensuring better batching performance.

However, still some problems:
Disadvantages:

Bottleneck from single subtask writing a bucket:

  1. Scaling the parallelism will not resolve bottlenecks if a single subtask becomes the performance constraint for a bucket.
  2. Uneven traffic distribution across sink subtasks:
  • Even with balanced input traffic, the number of partitions assigned to each sink subtask may vary significantly. For example, some subtasks may handle no partitions, while others manage multiple (see Table 1).
  • If traffic across partitions is inherently imbalanced, this issue is exacerbated. A subtask processing partitions with high traffic may become a hotspot bottleneck.
  1. Limited applicability for log tables:The current design is ineffective for log tables without a bucket key (e.g., A+ page streams).

Solution

Referring what iceberg range mode does: Statistics are collected by every shuffle operator subtask and aggregated by the coordinator for every checkpoint cycle. Aggregated statistics are broadcast to all subtasks and applied to the range partitioner. So it may take up to two checkpoint cycles to detect traffic distribution change and apply the new statistics to range partitioner.

Anything else?

No response

Willingness to contribute

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions