Skip to content
3 changes: 2 additions & 1 deletion pkg/internal/api/clickpipe_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ type ClickPipeObjectStorageSource struct {
Delimiter *string `json:"delimiter,omitempty"`
Compression *string `json:"compression,omitempty"`

IsContinuous bool `json:"isContinuous"`
IsContinuous bool `json:"isContinuous"`
QueueURL *string `json:"queueUrl,omitempty"`

Authentication *string `json:"authentication,omitempty"`
AccessKey *ClickPipeSourceAccessKey `json:"accessKey,omitempty"`
Expand Down
54 changes: 54 additions & 0 deletions pkg/resource/clickpipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package resource
import (
"context"
"fmt"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -401,6 +402,19 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest,
boolplanmodifier.RequiresReplace(),
},
},
"queue_url": schema.StringAttribute{
MarkdownDescription: "SQS queue URL for event-based continuous ingestion. When provided, files are ingested based on S3 event notifications rather than lexicographical order. Only applicable when `is_continuous` is `true`, storage type is `s3`, and authentication is provided. Format: `https://sqs.{region}.amazonaws.com/{account-id}/{queue-name}`",
Optional: true,
Validators: []validator.String{
stringvalidator.RegexMatches(
regexp.MustCompile(`^https://sqs\.[a-z0-9.-]+\.amazonaws\.com/\d{12}/[a-zA-Z0-9._-]+$`),
"must be a valid SQS URL in the format https://sqs.{region}.amazonaws.com/{12-digit-account-id}/{queue-name}",
),
},
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
},
"authentication": schema.StringAttribute{
MarkdownDescription: "CONNECTION_STRING is for Azure Blob Storage. IAM_ROLE and IAM_USER are for AWS S3/GCS/DigitalOcean. If not provided, no authentication is used",
Optional: true,
Expand Down Expand Up @@ -796,6 +810,44 @@ func (c *ClickPipeResource) ModifyPlan(ctx context.Context, request resource.Mod
}
}

// Validate queue_url configuration for object storage
if !plan.Source.IsNull() {
sourceModel := models.ClickPipeSourceModel{}
response.Diagnostics.Append(plan.Source.As(ctx, &sourceModel, basetypes.ObjectAsOptions{})...)

if !sourceModel.ObjectStorage.IsNull() {
objectStorageModel := models.ClickPipeObjectStorageSourceModel{}
response.Diagnostics.Append(sourceModel.ObjectStorage.As(ctx, &objectStorageModel, basetypes.ObjectAsOptions{})...)

// Validate queue_url is only provided when is_continuous is true
if !objectStorageModel.QueueURL.IsNull() && !objectStorageModel.QueueURL.IsUnknown() && objectStorageModel.QueueURL.ValueString() != "" {
if !objectStorageModel.IsContinuous.ValueBool() {
response.Diagnostics.AddError(
"Invalid Configuration",
"queue_url can only be provided when is_continuous is true",
)
}

// Validate queue_url is only used with S3 storage type
if objectStorageModel.Type.ValueString() != api.ClickPipeObjectStorageS3Type {
response.Diagnostics.AddError(
"Invalid Configuration",
"queue_url is only supported for S3 object storage",
)
}

// Validate queue_url requires IAM authentication
authType := objectStorageModel.Authentication.ValueString()
if authType != api.ClickPipeAuthenticationIAMUser && authType != api.ClickPipeAuthenticationIAMRole {
response.Diagnostics.AddError(
"Invalid Configuration",
"queue_url requires authentication type to be either IAM_USER or IAM_ROLE",
)
}
}
}
}

if !request.State.Raw.IsNull() && !state.State.IsNull() {
currentState := state.State.ValueString()

Expand Down Expand Up @@ -1184,6 +1236,7 @@ func (c *ClickPipeResource) extractSourceFromPlan(ctx context.Context, diagnosti
Delimiter: objectStorageModel.Delimiter.ValueStringPointer(),
Compression: objectStorageModel.Compression.ValueStringPointer(),
IsContinuous: objectStorageModel.IsContinuous.ValueBool(),
QueueURL: objectStorageModel.QueueURL.ValueStringPointer(),
Authentication: objectStorageModel.Authentication.ValueStringPointer(),
AccessKey: accessKey,
IAMRole: objectStorageModel.IAMRole.ValueStringPointer(),
Expand Down Expand Up @@ -1402,6 +1455,7 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model
Delimiter: types.StringPointerValue(clickPipe.Source.ObjectStorage.Delimiter),
Compression: types.StringPointerValue(clickPipe.Source.ObjectStorage.Compression),
IsContinuous: types.BoolValue(clickPipe.Source.ObjectStorage.IsContinuous),
QueueURL: types.StringPointerValue(clickPipe.Source.ObjectStorage.QueueURL),
IAMRole: types.StringPointerValue(clickPipe.Source.ObjectStorage.IAMRole),
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/resource/models/clickpipe_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ type ClickPipeObjectStorageSourceModel struct {
Delimiter types.String `tfsdk:"delimiter"`
Compression types.String `tfsdk:"compression"`
IsContinuous types.Bool `tfsdk:"is_continuous"`
QueueURL types.String `tfsdk:"queue_url"`
Authentication types.String `tfsdk:"authentication"`
AccessKey types.Object `tfsdk:"access_key"`
IAMRole types.String `tfsdk:"iam_role"`
Expand All @@ -276,6 +277,7 @@ func (m ClickPipeObjectStorageSourceModel) ObjectType() types.ObjectType {
"delimiter": types.StringType,
"compression": types.StringType,
"is_continuous": types.BoolType,
"queue_url": types.StringType,
"authentication": types.StringType,
"access_key": ClickPipeSourceAccessKeyModel{}.ObjectType(),
"iam_role": types.StringType,
Expand All @@ -294,6 +296,7 @@ func (m ClickPipeObjectStorageSourceModel) ObjectValue() types.Object {
"delimiter": m.Delimiter,
"compression": m.Compression,
"is_continuous": m.IsContinuous,
"queue_url": m.QueueURL,
"authentication": m.Authentication,
"access_key": m.AccessKey,
"iam_role": m.IAMRole,
Expand Down
Loading