Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/internal/api/clickpipe_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ type ClickPipeDestinationColumn struct {
}

type ClickPipeDestinationTableEngine struct {
Type string `json:"type"`
Type string `json:"type"`
VersionColumnID *string `json:"versionColumnId,omitempty"`
ColumnIDs []string `json:"columnIds,omitempty"`
}

type ClickPipeDestinationTableDefinition struct {
Expand Down
153 changes: 149 additions & 4 deletions pkg/resource/clickpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ Known limitations:

const (
clickPipeStateChangeMaxWaitSeconds = 60 * 2

// ClickPipe destination table engine types
ClickPipeEngineMergeTree = "MergeTree"
ClickPipeEngineReplacingMergeTree = "ReplacingMergeTree"
ClickPipeEngineSummingMergeTree = "SummingMergeTree"
ClickPipeEngineNull = "Null"
)

type ClickPipeResource struct {
Expand Down Expand Up @@ -588,12 +594,21 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest,
Required: true,
Attributes: map[string]schema.Attribute{
"type": schema.StringAttribute{
MarkdownDescription: "The type of the engine. Only `MergeTree` is supported.",
MarkdownDescription: "The type of the engine. Supported engines: `MergeTree`, `ReplacingMergeTree`, `SummingMergeTree`, `Null`.",
Required: true,
Validators: []validator.String{
stringvalidator.OneOf("MergeTree"),
stringvalidator.OneOf(ClickPipeEngineMergeTree, ClickPipeEngineReplacingMergeTree, ClickPipeEngineSummingMergeTree, ClickPipeEngineNull),
},
},
"version_column_id": schema.StringAttribute{
MarkdownDescription: "Column ID to use as version for ReplacingMergeTree engine. Required when engine type is `ReplacingMergeTree`.",
Optional: true,
},
"column_ids": schema.ListAttribute{
MarkdownDescription: "Column IDs to sum for SummingMergeTree engine. Required when engine type is `SummingMergeTree`.",
Optional: true,
ElementType: types.StringType,
},
},
},
"sorting_key": schema.ListAttribute{
Expand Down Expand Up @@ -681,6 +696,103 @@ func (c *ClickPipeResource) ModifyPlan(ctx context.Context, request resource.Mod
if !request.Config.Raw.IsNull() {
response.Diagnostics.Append(request.Config.Get(ctx, &config)...)
}

// Validate table engine configuration
if !plan.Destination.IsNull() {
destinationModel := models.ClickPipeDestinationModel{}
response.Diagnostics.Append(plan.Destination.As(ctx, &destinationModel, basetypes.ObjectAsOptions{})...)

if !destinationModel.TableDefinition.IsNull() {
tableDefinitionModel := models.ClickPipeDestinationTableDefinitionModel{}
response.Diagnostics.Append(destinationModel.TableDefinition.As(ctx, &tableDefinitionModel, basetypes.ObjectAsOptions{})...)

if !tableDefinitionModel.Engine.IsNull() {
engineModel := models.ClickPipeDestinationTableEngineModel{}
response.Diagnostics.Append(tableDefinitionModel.Engine.As(ctx, &engineModel, basetypes.ObjectAsOptions{})...)

engineType := engineModel.Type.ValueString()

// Validate versionColumnId
if engineType == ClickPipeEngineReplacingMergeTree {
if engineModel.VersionColumnID.IsNull() {
response.Diagnostics.AddError(
"Invalid Configuration",
"version_column_id is required for ReplacingMergeTree engine",
)
}
} else {
if !engineModel.VersionColumnID.IsNull() {
response.Diagnostics.AddError(
"Invalid Configuration",
"version_column_id can only be used with ReplacingMergeTree engine",
)
}
}

// Validate columnIds
if engineType != ClickPipeEngineSummingMergeTree {
if !engineModel.ColumnIDs.IsNull() && len(engineModel.ColumnIDs.Elements()) > 0 {
response.Diagnostics.AddError(
"Invalid Configuration",
"column_ids can only be used with SummingMergeTree engine",
)
}
}

// Validate sortingKey for ReplacingMergeTree
if engineType == ClickPipeEngineReplacingMergeTree {
if tableDefinitionModel.SortingKey.IsNull() || len(tableDefinitionModel.SortingKey.Elements()) == 0 {
response.Diagnostics.AddError(
"Invalid Configuration",
"sorting_key is required for ReplacingMergeTree engine",
)
}
}

// Validate column references exist in destination columns
if !destinationModel.Columns.IsNull() && len(destinationModel.Columns.Elements()) > 0 {
destinationColumnsModels := make([]models.ClickPipeDestinationColumnModel, len(destinationModel.Columns.Elements()))
response.Diagnostics.Append(destinationModel.Columns.ElementsAs(ctx, &destinationColumnsModels, false)...)

// Build set of column names
columnNames := make(map[string]bool)
for _, columnModel := range destinationColumnsModels {
columnNames[columnModel.Name.ValueString()] = true
}

// Validate versionColumnId exists in columns
if !engineModel.VersionColumnID.IsNull() {
versionColumnID := engineModel.VersionColumnID.ValueString()
if !columnNames[versionColumnID] {
response.Diagnostics.AddError(
"Invalid Configuration",
fmt.Sprintf("version_column_id '%s' must exist in destination columns", versionColumnID),
)
}
}

// Validate columnIds exist in columns
if !engineModel.ColumnIDs.IsNull() && len(engineModel.ColumnIDs.Elements()) > 0 {
columnIDs := make([]string, len(engineModel.ColumnIDs.Elements()))
response.Diagnostics.Append(engineModel.ColumnIDs.ElementsAs(ctx, &columnIDs, false)...)

var missingColumns []string
for _, columnID := range columnIDs {
if !columnNames[columnID] {
missingColumns = append(missingColumns, columnID)
}
}
if len(missingColumns) > 0 {
response.Diagnostics.AddError(
"Invalid Configuration",
fmt.Sprintf("column_ids must exist in destination columns. Missing: %s", strings.Join(missingColumns, ", ")),
)
}
}
}
}
}
}
}

func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateRequest, response *resource.CreateResponse) {
Expand Down Expand Up @@ -744,8 +856,19 @@ func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateR
tableEngineModel := models.ClickPipeDestinationTableEngineModel{}
response.Diagnostics.Append(tableDefinitionModel.Engine.As(ctx, &tableEngineModel, basetypes.ObjectAsOptions{})...)

engine := api.ClickPipeDestinationTableEngine{
Type: tableEngineModel.Type.ValueString(),
VersionColumnID: tableEngineModel.VersionColumnID.ValueStringPointer(),
}

if !tableEngineModel.ColumnIDs.IsNull() {
columnIDs := make([]string, len(tableEngineModel.ColumnIDs.Elements()))
response.Diagnostics.Append(tableEngineModel.ColumnIDs.ElementsAs(ctx, &columnIDs, false)...)
engine.ColumnIDs = columnIDs
}

clickPipe.Destination.TableDefinition = &api.ClickPipeDestinationTableDefinition{
Engine: api.ClickPipeDestinationTableEngine{Type: tableEngineModel.Type.ValueString()},
Engine: engine,
PartitionBy: tableDefinitionModel.PartitionBy.ValueStringPointer(),
PrimaryKey: tableDefinitionModel.PrimaryKey.ValueStringPointer(),
SortingKey: sortingKey,
Expand Down Expand Up @@ -1380,7 +1503,29 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model

if clickPipe.Destination.TableDefinition != nil {
engineModel := models.ClickPipeDestinationTableEngineModel{
Type: types.StringValue(clickPipe.Destination.TableDefinition.Engine.Type),
Type: types.StringValue(clickPipe.Destination.TableDefinition.Engine.Type),
VersionColumnID: types.StringNull(),
ColumnIDs: types.ListNull(types.StringType),
}

// Engine-specific fields are not persisted on ClickPipes side. Used only during pipe creation.
// We need to preserve these from the existing state.
if !stateDestinationModel.TableDefinition.IsNull() {
stateTableDefinitionModel := models.ClickPipeDestinationTableDefinitionModel{}
if diags := stateDestinationModel.TableDefinition.As(ctx, &stateTableDefinitionModel, basetypes.ObjectAsOptions{}); diags.HasError() {
return fmt.Errorf("error reading ClickPipe destination table definition: %v", diags)
}

if !stateTableDefinitionModel.Engine.IsNull() {
stateEngineModel := models.ClickPipeDestinationTableEngineModel{}
if diags := stateTableDefinitionModel.Engine.As(ctx, &stateEngineModel, basetypes.ObjectAsOptions{}); diags.HasError() {
return fmt.Errorf("error reading ClickPipe destination engine: %v", diags)
}

// Preserve version_column_id and column_ids from state
engineModel.VersionColumnID = stateEngineModel.VersionColumnID
engineModel.ColumnIDs = stateEngineModel.ColumnIDs
}
}

tableDefinitionModel := models.ClickPipeDestinationTableDefinitionModel{
Expand Down
12 changes: 9 additions & 3 deletions pkg/resource/models/clickpipe_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,20 +349,26 @@ func (m ClickPipeDestinationColumnModel) ObjectValue() types.Object {
}

type ClickPipeDestinationTableEngineModel struct {
Type types.String `tfsdk:"type"`
Type types.String `tfsdk:"type"`
VersionColumnID types.String `tfsdk:"version_column_id"`
ColumnIDs types.List `tfsdk:"column_ids"`
}

func (m ClickPipeDestinationTableEngineModel) ObjectType() types.ObjectType {
return types.ObjectType{
AttrTypes: map[string]attr.Type{
"type": types.StringType,
"type": types.StringType,
"version_column_id": types.StringType,
"column_ids": types.ListType{ElemType: types.StringType},
},
}
}

func (m ClickPipeDestinationTableEngineModel) ObjectValue() types.Object {
return types.ObjectValueMust(m.ObjectType().AttrTypes, map[string]attr.Value{
"type": m.Type,
"type": m.Type,
"version_column_id": m.VersionColumnID,
"column_ids": m.ColumnIDs,
})
}

Expand Down
Loading