Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/internal/api/clickpipe_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ type ClickPipeDestinationColumn struct {
}

type ClickPipeDestinationTableEngine struct {
Type string `json:"type"`
Type string `json:"type"`
VersionColumnID *string `json:"versionColumnId,omitempty"`
ColumnIDs []string `json:"columnIds,omitempty"`
}

type ClickPipeDestinationTableDefinition struct {
Expand Down
147 changes: 143 additions & 4 deletions pkg/resource/clickpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,12 +587,21 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest,
Required: true,
Attributes: map[string]schema.Attribute{
"type": schema.StringAttribute{
MarkdownDescription: "The type of the engine. Only `MergeTree` is supported.",
MarkdownDescription: "The type of the engine. Supported engines: `MergeTree`, `ReplacingMergeTree`, `SummingMergeTree`, `Null`.",
Required: true,
Validators: []validator.String{
stringvalidator.OneOf("MergeTree"),
stringvalidator.OneOf("MergeTree", "ReplacingMergeTree", "SummingMergeTree", "Null"),
},
},
"version_column_id": schema.StringAttribute{
MarkdownDescription: "Column ID to use as version for ReplacingMergeTree engine. Required when engine type is `ReplacingMergeTree`.",
Optional: true,
},
"column_ids": schema.ListAttribute{
MarkdownDescription: "Column IDs to sum for SummingMergeTree engine. Required when engine type is `SummingMergeTree`.",
Optional: true,
ElementType: types.StringType,
},
},
},
"sorting_key": schema.ListAttribute{
Expand Down Expand Up @@ -676,6 +685,103 @@ func (c *ClickPipeResource) ModifyPlan(ctx context.Context, request resource.Mod
if !request.Config.Raw.IsNull() {
response.Diagnostics.Append(request.Config.Get(ctx, &config)...)
}

// Validate table engine configuration
if !plan.Destination.IsNull() {
destinationModel := models.ClickPipeDestinationModel{}
response.Diagnostics.Append(plan.Destination.As(ctx, &destinationModel, basetypes.ObjectAsOptions{})...)

if !destinationModel.TableDefinition.IsNull() {
tableDefinitionModel := models.ClickPipeDestinationTableDefinitionModel{}
response.Diagnostics.Append(destinationModel.TableDefinition.As(ctx, &tableDefinitionModel, basetypes.ObjectAsOptions{})...)

if !tableDefinitionModel.Engine.IsNull() {
engineModel := models.ClickPipeDestinationTableEngineModel{}
response.Diagnostics.Append(tableDefinitionModel.Engine.As(ctx, &engineModel, basetypes.ObjectAsOptions{})...)

engineType := engineModel.Type.ValueString()

// Validate versionColumnId
if engineType == "ReplacingMergeTree" {
if engineModel.VersionColumnID.IsNull() {
response.Diagnostics.AddError(
"Invalid Configuration",
"version_column_id is required for ReplacingMergeTree engine",
)
}
} else {
if !engineModel.VersionColumnID.IsNull() {
response.Diagnostics.AddError(
"Invalid Configuration",
"version_column_id can only be used with ReplacingMergeTree engine",
)
}
}

// Validate columnIds
if engineType != "SummingMergeTree" {
if !engineModel.ColumnIDs.IsNull() && len(engineModel.ColumnIDs.Elements()) > 0 {
response.Diagnostics.AddError(
"Invalid Configuration",
"column_ids can only be used with SummingMergeTree engine",
)
}
}

// Validate sortingKey for ReplacingMergeTree
if engineType == "ReplacingMergeTree" {
if tableDefinitionModel.SortingKey.IsNull() || len(tableDefinitionModel.SortingKey.Elements()) == 0 {
response.Diagnostics.AddError(
"Invalid Configuration",
"sorting_key is required for ReplacingMergeTree engine",
)
}
}

// Validate column references exist in destination columns
if !destinationModel.Columns.IsNull() && len(destinationModel.Columns.Elements()) > 0 {
destinationColumnsModels := make([]models.ClickPipeDestinationColumnModel, len(destinationModel.Columns.Elements()))
response.Diagnostics.Append(destinationModel.Columns.ElementsAs(ctx, &destinationColumnsModels, false)...)

// Build set of column names
columnNames := make(map[string]bool)
for _, columnModel := range destinationColumnsModels {
columnNames[columnModel.Name.ValueString()] = true
}

// Validate versionColumnId exists in columns
if !engineModel.VersionColumnID.IsNull() {
versionColumnID := engineModel.VersionColumnID.ValueString()
if !columnNames[versionColumnID] {
response.Diagnostics.AddError(
"Invalid Configuration",
fmt.Sprintf("version_column_id '%s' must exist in destination columns", versionColumnID),
)
}
}

// Validate columnIds exist in columns
if !engineModel.ColumnIDs.IsNull() && len(engineModel.ColumnIDs.Elements()) > 0 {
columnIDs := make([]string, len(engineModel.ColumnIDs.Elements()))
response.Diagnostics.Append(engineModel.ColumnIDs.ElementsAs(ctx, &columnIDs, false)...)

var missingColumns []string
for _, columnID := range columnIDs {
if !columnNames[columnID] {
missingColumns = append(missingColumns, columnID)
}
}
if len(missingColumns) > 0 {
response.Diagnostics.AddError(
"Invalid Configuration",
fmt.Sprintf("column_ids must exist in destination columns. Missing: %s", strings.Join(missingColumns, ", ")),
)
}
}
}
}
}
}
}

func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateRequest, response *resource.CreateResponse) {
Expand Down Expand Up @@ -739,8 +845,19 @@ func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateR
tableEngineModel := models.ClickPipeDestinationTableEngineModel{}
response.Diagnostics.Append(tableDefinitionModel.Engine.As(ctx, &tableEngineModel, basetypes.ObjectAsOptions{})...)

engine := api.ClickPipeDestinationTableEngine{
Type: tableEngineModel.Type.ValueString(),
VersionColumnID: tableEngineModel.VersionColumnID.ValueStringPointer(),
}

if !tableEngineModel.ColumnIDs.IsNull() {
columnIDs := make([]string, len(tableEngineModel.ColumnIDs.Elements()))
response.Diagnostics.Append(tableEngineModel.ColumnIDs.ElementsAs(ctx, &columnIDs, false)...)
engine.ColumnIDs = columnIDs
}

clickPipe.Destination.TableDefinition = &api.ClickPipeDestinationTableDefinition{
Engine: api.ClickPipeDestinationTableEngine{Type: tableEngineModel.Type.ValueString()},
Engine: engine,
PartitionBy: tableDefinitionModel.PartitionBy.ValueStringPointer(),
PrimaryKey: tableDefinitionModel.PrimaryKey.ValueStringPointer(),
SortingKey: sortingKey,
Expand Down Expand Up @@ -1333,7 +1450,29 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model

if clickPipe.Destination.TableDefinition != nil {
engineModel := models.ClickPipeDestinationTableEngineModel{
Type: types.StringValue(clickPipe.Destination.TableDefinition.Engine.Type),
Type: types.StringValue(clickPipe.Destination.TableDefinition.Engine.Type),
VersionColumnID: types.StringNull(),
ColumnIDs: types.ListNull(types.StringType),
}

// Engine-specific fields are not persisted on ClickPipes side. Used only during pipe creation.
// We need to preserve these from the existing state.
if !stateDestinationModel.TableDefinition.IsNull() {
stateTableDefinitionModel := models.ClickPipeDestinationTableDefinitionModel{}
if diags := stateDestinationModel.TableDefinition.As(ctx, &stateTableDefinitionModel, basetypes.ObjectAsOptions{}); diags.HasError() {
return fmt.Errorf("error reading ClickPipe destination table definition: %v", diags)
}

if !stateTableDefinitionModel.Engine.IsNull() {
stateEngineModel := models.ClickPipeDestinationTableEngineModel{}
if diags := stateTableDefinitionModel.Engine.As(ctx, &stateEngineModel, basetypes.ObjectAsOptions{}); diags.HasError() {
return fmt.Errorf("error reading ClickPipe destination engine: %v", diags)
}

// Preserve version_column_id and column_ids from state
engineModel.VersionColumnID = stateEngineModel.VersionColumnID
engineModel.ColumnIDs = stateEngineModel.ColumnIDs
}
}

tableDefinitionModel := models.ClickPipeDestinationTableDefinitionModel{
Expand Down
12 changes: 9 additions & 3 deletions pkg/resource/models/clickpipe_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,20 +348,26 @@ func (m ClickPipeDestinationColumnModel) ObjectValue() types.Object {
}

type ClickPipeDestinationTableEngineModel struct {
Type types.String `tfsdk:"type"`
Type types.String `tfsdk:"type"`
VersionColumnID types.String `tfsdk:"version_column_id"`
ColumnIDs types.List `tfsdk:"column_ids"`
}

func (m ClickPipeDestinationTableEngineModel) ObjectType() types.ObjectType {
return types.ObjectType{
AttrTypes: map[string]attr.Type{
"type": types.StringType,
"type": types.StringType,
"version_column_id": types.StringType,
"column_ids": types.ListType{ElemType: types.StringType},
},
}
}

func (m ClickPipeDestinationTableEngineModel) ObjectValue() types.Object {
return types.ObjectValueMust(m.ObjectType().AttrTypes, map[string]attr.Value{
"type": m.Type,
"type": m.Type,
"version_column_id": m.VersionColumnID,
"column_ids": m.ColumnIDs,
})
}

Expand Down
Loading