From c59c69bf23233b880d495f12014ef3fdcfd204d2 Mon Sep 17 00:00:00 2001 From: tpanetti Date: Sun, 14 Sep 2025 15:40:30 -0700 Subject: [PATCH 1/4] Add support for new table engines for ClickPipes managed table (ReplacingMergeTree, SummingMergeTree and Null) --- pkg/internal/api/clickpipe_models.go | 4 +- pkg/resource/clickpipe.go | 50 +++++++++++++++++++++-- pkg/resource/models/clickpipe_resource.go | 12 ++++-- 3 files changed, 58 insertions(+), 8 deletions(-) diff --git a/pkg/internal/api/clickpipe_models.go b/pkg/internal/api/clickpipe_models.go index 3f9cbb3f..b48ea59a 100644 --- a/pkg/internal/api/clickpipe_models.go +++ b/pkg/internal/api/clickpipe_models.go @@ -155,7 +155,9 @@ type ClickPipeDestinationColumn struct { } type ClickPipeDestinationTableEngine struct { - Type string `json:"type"` + Type string `json:"type"` + VersionColumnID *string `json:"versionColumnId,omitempty"` + ColumnIDs []string `json:"columnIds,omitempty"` } type ClickPipeDestinationTableDefinition struct { diff --git a/pkg/resource/clickpipe.go b/pkg/resource/clickpipe.go index 1da97205..06e4cbfb 100644 --- a/pkg/resource/clickpipe.go +++ b/pkg/resource/clickpipe.go @@ -587,12 +587,21 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest, Required: true, Attributes: map[string]schema.Attribute{ "type": schema.StringAttribute{ - MarkdownDescription: "The type of the engine. Only `MergeTree` is supported.", + MarkdownDescription: "The type of the engine. Supported engines: `MergeTree`, `ReplacingMergeTree`, `SummingMergeTree`, `Null`.", Required: true, Validators: []validator.String{ - stringvalidator.OneOf("MergeTree"), + stringvalidator.OneOf("MergeTree", "ReplacingMergeTree", "SummingMergeTree", "Null"), }, }, + "version_column_id": schema.StringAttribute{ + MarkdownDescription: "Column ID to use as version for ReplacingMergeTree engine. Required when engine type is `ReplacingMergeTree`.", + Optional: true, + }, + "column_ids": schema.ListAttribute{ + MarkdownDescription: "Column IDs to sum for SummingMergeTree engine. Required when engine type is `SummingMergeTree`.", + Optional: true, + ElementType: types.StringType, + }, }, }, "sorting_key": schema.ListAttribute{ @@ -739,8 +748,19 @@ func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateR tableEngineModel := models.ClickPipeDestinationTableEngineModel{} response.Diagnostics.Append(tableDefinitionModel.Engine.As(ctx, &tableEngineModel, basetypes.ObjectAsOptions{})...) + engine := api.ClickPipeDestinationTableEngine{ + Type: tableEngineModel.Type.ValueString(), + VersionColumnID: tableEngineModel.VersionColumnID.ValueStringPointer(), + } + + if !tableEngineModel.ColumnIDs.IsNull() { + columnIDs := make([]string, len(tableEngineModel.ColumnIDs.Elements())) + response.Diagnostics.Append(tableEngineModel.ColumnIDs.ElementsAs(ctx, &columnIDs, false)...) + engine.ColumnIDs = columnIDs + } + clickPipe.Destination.TableDefinition = &api.ClickPipeDestinationTableDefinition{ - Engine: api.ClickPipeDestinationTableEngine{Type: tableEngineModel.Type.ValueString()}, + Engine: engine, PartitionBy: tableDefinitionModel.PartitionBy.ValueStringPointer(), PrimaryKey: tableDefinitionModel.PrimaryKey.ValueStringPointer(), SortingKey: sortingKey, @@ -1333,7 +1353,29 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model if clickPipe.Destination.TableDefinition != nil { engineModel := models.ClickPipeDestinationTableEngineModel{ - Type: types.StringValue(clickPipe.Destination.TableDefinition.Engine.Type), + Type: types.StringValue(clickPipe.Destination.TableDefinition.Engine.Type), + VersionColumnID: types.StringNull(), + ColumnIDs: types.ListNull(types.StringType), + } + + // Engine-specific fields are not persisted on ClickPipes side. Used only during pipe creation. + // We need to preserve these from the existing state. + if !stateDestinationModel.TableDefinition.IsNull() { + stateTableDefinitionModel := models.ClickPipeDestinationTableDefinitionModel{} + if diags := stateDestinationModel.TableDefinition.As(ctx, &stateTableDefinitionModel, basetypes.ObjectAsOptions{}); diags.HasError() { + return fmt.Errorf("error reading ClickPipe destination table definition: %v", diags) + } + + if !stateTableDefinitionModel.Engine.IsNull() { + stateEngineModel := models.ClickPipeDestinationTableEngineModel{} + if diags := stateTableDefinitionModel.Engine.As(ctx, &stateEngineModel, basetypes.ObjectAsOptions{}); diags.HasError() { + return fmt.Errorf("error reading ClickPipe destination engine: %v", diags) + } + + // Preserve version_column_id and column_ids from state + engineModel.VersionColumnID = stateEngineModel.VersionColumnID + engineModel.ColumnIDs = stateEngineModel.ColumnIDs + } } tableDefinitionModel := models.ClickPipeDestinationTableDefinitionModel{ diff --git a/pkg/resource/models/clickpipe_resource.go b/pkg/resource/models/clickpipe_resource.go index 3fdb4786..a9619126 100644 --- a/pkg/resource/models/clickpipe_resource.go +++ b/pkg/resource/models/clickpipe_resource.go @@ -348,20 +348,26 @@ func (m ClickPipeDestinationColumnModel) ObjectValue() types.Object { } type ClickPipeDestinationTableEngineModel struct { - Type types.String `tfsdk:"type"` + Type types.String `tfsdk:"type"` + VersionColumnID types.String `tfsdk:"version_column_id"` + ColumnIDs types.List `tfsdk:"column_ids"` } func (m ClickPipeDestinationTableEngineModel) ObjectType() types.ObjectType { return types.ObjectType{ AttrTypes: map[string]attr.Type{ - "type": types.StringType, + "type": types.StringType, + "version_column_id": types.StringType, + "column_ids": types.ListType{ElemType: types.StringType}, }, } } func (m ClickPipeDestinationTableEngineModel) ObjectValue() types.Object { return types.ObjectValueMust(m.ObjectType().AttrTypes, map[string]attr.Value{ - "type": m.Type, + "type": m.Type, + "version_column_id": m.VersionColumnID, + "column_ids": m.ColumnIDs, }) } From 8f6b0191b4d6d36e4ae453ba17305df845b34cc0 Mon Sep 17 00:00:00 2001 From: tpanetti Date: Sun, 14 Sep 2025 15:45:23 -0700 Subject: [PATCH 2/4] Fix formatting --- pkg/internal/api/clickpipe_models.go | 6 +++--- pkg/resource/models/clickpipe_resource.go | 18 +++++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/internal/api/clickpipe_models.go b/pkg/internal/api/clickpipe_models.go index b48ea59a..992ee8b1 100644 --- a/pkg/internal/api/clickpipe_models.go +++ b/pkg/internal/api/clickpipe_models.go @@ -155,9 +155,9 @@ type ClickPipeDestinationColumn struct { } type ClickPipeDestinationTableEngine struct { - Type string `json:"type"` - VersionColumnID *string `json:"versionColumnId,omitempty"` - ColumnIDs []string `json:"columnIds,omitempty"` + Type string `json:"type"` + VersionColumnID *string `json:"versionColumnId,omitempty"` + ColumnIDs []string `json:"columnIds,omitempty"` } type ClickPipeDestinationTableDefinition struct { diff --git a/pkg/resource/models/clickpipe_resource.go b/pkg/resource/models/clickpipe_resource.go index a9619126..d3d7aae1 100644 --- a/pkg/resource/models/clickpipe_resource.go +++ b/pkg/resource/models/clickpipe_resource.go @@ -348,26 +348,26 @@ func (m ClickPipeDestinationColumnModel) ObjectValue() types.Object { } type ClickPipeDestinationTableEngineModel struct { - Type types.String `tfsdk:"type"` - VersionColumnID types.String `tfsdk:"version_column_id"` - ColumnIDs types.List `tfsdk:"column_ids"` + Type types.String `tfsdk:"type"` + VersionColumnID types.String `tfsdk:"version_column_id"` + ColumnIDs types.List `tfsdk:"column_ids"` } func (m ClickPipeDestinationTableEngineModel) ObjectType() types.ObjectType { return types.ObjectType{ AttrTypes: map[string]attr.Type{ - "type": types.StringType, - "version_column_id": types.StringType, - "column_ids": types.ListType{ElemType: types.StringType}, + "type": types.StringType, + "version_column_id": types.StringType, + "column_ids": types.ListType{ElemType: types.StringType}, }, } } func (m ClickPipeDestinationTableEngineModel) ObjectValue() types.Object { return types.ObjectValueMust(m.ObjectType().AttrTypes, map[string]attr.Value{ - "type": m.Type, - "version_column_id": m.VersionColumnID, - "column_ids": m.ColumnIDs, + "type": m.Type, + "version_column_id": m.VersionColumnID, + "column_ids": m.ColumnIDs, }) } From e07fe490994ef39ccfabeefc47a041ac35c063aa Mon Sep 17 00:00:00 2001 From: tpanetti Date: Tue, 16 Sep 2025 10:38:00 -0700 Subject: [PATCH 3/4] Add validation for engine types (version_column_id and column_ids) --- pkg/resource/clickpipe.go | 97 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/pkg/resource/clickpipe.go b/pkg/resource/clickpipe.go index 06e4cbfb..59650dbc 100644 --- a/pkg/resource/clickpipe.go +++ b/pkg/resource/clickpipe.go @@ -685,6 +685,103 @@ func (c *ClickPipeResource) ModifyPlan(ctx context.Context, request resource.Mod if !request.Config.Raw.IsNull() { response.Diagnostics.Append(request.Config.Get(ctx, &config)...) } + + // Validate table engine configuration + if !plan.Destination.IsNull() { + destinationModel := models.ClickPipeDestinationModel{} + response.Diagnostics.Append(plan.Destination.As(ctx, &destinationModel, basetypes.ObjectAsOptions{})...) + + if !destinationModel.TableDefinition.IsNull() { + tableDefinitionModel := models.ClickPipeDestinationTableDefinitionModel{} + response.Diagnostics.Append(destinationModel.TableDefinition.As(ctx, &tableDefinitionModel, basetypes.ObjectAsOptions{})...) + + if !tableDefinitionModel.Engine.IsNull() { + engineModel := models.ClickPipeDestinationTableEngineModel{} + response.Diagnostics.Append(tableDefinitionModel.Engine.As(ctx, &engineModel, basetypes.ObjectAsOptions{})...) + + engineType := engineModel.Type.ValueString() + + // Validate versionColumnId + if engineType == "ReplacingMergeTree" { + if engineModel.VersionColumnID.IsNull() { + response.Diagnostics.AddError( + "Invalid Configuration", + "version_column_id is required for ReplacingMergeTree engine", + ) + } + } else { + if !engineModel.VersionColumnID.IsNull() { + response.Diagnostics.AddError( + "Invalid Configuration", + "version_column_id can only be used with ReplacingMergeTree engine", + ) + } + } + + // Validate columnIds + if engineType != "SummingMergeTree" { + if !engineModel.ColumnIDs.IsNull() && len(engineModel.ColumnIDs.Elements()) > 0 { + response.Diagnostics.AddError( + "Invalid Configuration", + "column_ids can only be used with SummingMergeTree engine", + ) + } + } + + // Validate sortingKey for ReplacingMergeTree + if engineType == "ReplacingMergeTree" { + if tableDefinitionModel.SortingKey.IsNull() || len(tableDefinitionModel.SortingKey.Elements()) == 0 { + response.Diagnostics.AddError( + "Invalid Configuration", + "sorting_key is required for ReplacingMergeTree engine", + ) + } + } + + // Validate column references exist in destination columns + if !destinationModel.Columns.IsNull() && len(destinationModel.Columns.Elements()) > 0 { + destinationColumnsModels := make([]models.ClickPipeDestinationColumnModel, len(destinationModel.Columns.Elements())) + response.Diagnostics.Append(destinationModel.Columns.ElementsAs(ctx, &destinationColumnsModels, false)...) + + // Build set of column names + columnNames := make(map[string]bool) + for _, columnModel := range destinationColumnsModels { + columnNames[columnModel.Name.ValueString()] = true + } + + // Validate versionColumnId exists in columns + if !engineModel.VersionColumnID.IsNull() { + versionColumnID := engineModel.VersionColumnID.ValueString() + if !columnNames[versionColumnID] { + response.Diagnostics.AddError( + "Invalid Configuration", + fmt.Sprintf("version_column_id '%s' must exist in destination columns", versionColumnID), + ) + } + } + + // Validate columnIds exist in columns + if !engineModel.ColumnIDs.IsNull() && len(engineModel.ColumnIDs.Elements()) > 0 { + columnIDs := make([]string, len(engineModel.ColumnIDs.Elements())) + response.Diagnostics.Append(engineModel.ColumnIDs.ElementsAs(ctx, &columnIDs, false)...) + + var missingColumns []string + for _, columnID := range columnIDs { + if !columnNames[columnID] { + missingColumns = append(missingColumns, columnID) + } + } + if len(missingColumns) > 0 { + response.Diagnostics.AddError( + "Invalid Configuration", + fmt.Sprintf("column_ids must exist in destination columns. Missing: %s", strings.Join(missingColumns, ", ")), + ) + } + } + } + } + } + } } func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateRequest, response *resource.CreateResponse) { From d7735b2896f8471c71397fd9d7633b38ea6cfc01 Mon Sep 17 00:00:00 2001 From: tpanetti Date: Thu, 18 Sep 2025 10:56:11 -0700 Subject: [PATCH 4/4] Move table engine names to constants --- pkg/resource/clickpipe.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/resource/clickpipe.go b/pkg/resource/clickpipe.go index 59650dbc..3e55c76f 100644 --- a/pkg/resource/clickpipe.go +++ b/pkg/resource/clickpipe.go @@ -48,6 +48,12 @@ Known limitations: const ( clickPipeStateChangeMaxWaitSeconds = 60 * 2 + + // ClickPipe destination table engine types + ClickPipeEngineMergeTree = "MergeTree" + ClickPipeEngineReplacingMergeTree = "ReplacingMergeTree" + ClickPipeEngineSummingMergeTree = "SummingMergeTree" + ClickPipeEngineNull = "Null" ) type ClickPipeResource struct { @@ -590,7 +596,7 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest, MarkdownDescription: "The type of the engine. Supported engines: `MergeTree`, `ReplacingMergeTree`, `SummingMergeTree`, `Null`.", Required: true, Validators: []validator.String{ - stringvalidator.OneOf("MergeTree", "ReplacingMergeTree", "SummingMergeTree", "Null"), + stringvalidator.OneOf(ClickPipeEngineMergeTree, ClickPipeEngineReplacingMergeTree, ClickPipeEngineSummingMergeTree, ClickPipeEngineNull), }, }, "version_column_id": schema.StringAttribute{ @@ -702,7 +708,7 @@ func (c *ClickPipeResource) ModifyPlan(ctx context.Context, request resource.Mod engineType := engineModel.Type.ValueString() // Validate versionColumnId - if engineType == "ReplacingMergeTree" { + if engineType == ClickPipeEngineReplacingMergeTree { if engineModel.VersionColumnID.IsNull() { response.Diagnostics.AddError( "Invalid Configuration", @@ -719,7 +725,7 @@ func (c *ClickPipeResource) ModifyPlan(ctx context.Context, request resource.Mod } // Validate columnIds - if engineType != "SummingMergeTree" { + if engineType != ClickPipeEngineSummingMergeTree { if !engineModel.ColumnIDs.IsNull() && len(engineModel.ColumnIDs.Elements()) > 0 { response.Diagnostics.AddError( "Invalid Configuration", @@ -729,7 +735,7 @@ func (c *ClickPipeResource) ModifyPlan(ctx context.Context, request resource.Mod } // Validate sortingKey for ReplacingMergeTree - if engineType == "ReplacingMergeTree" { + if engineType == ClickPipeEngineReplacingMergeTree { if tableDefinitionModel.SortingKey.IsNull() || len(tableDefinitionModel.SortingKey.Elements()) == 0 { response.Diagnostics.AddError( "Invalid Configuration",