Skip to content
Open
11 changes: 11 additions & 0 deletions examples/clickpipe/object_storage_s3_sqs_iam_role/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## ClickPipe Object Storage with S3/SQS/IAM role example

This example demonstrates how to deploy a ClickPipe with an S3 bucket as the input source with continuous ingestion using SQS event notifications, authenticated with an IAM role.

This setup enables event-based ingestion where new files are detected via S3 event notifications sent to an SQS queue, rather than polling S3 for new files. IAM role authentication is only available for AWS ClickHouse Cloud services and is the recommended authentication method.

## How to run

- Rename `variables.sample.tfvars` to `variables.tfvars` and fill in all needed data.
- Run `terraform init`
- Run `terraform <plan|apply> -var-file=variables.tfvars`
106 changes: 106 additions & 0 deletions examples/clickpipe/object_storage_s3_sqs_iam_role/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
variable "organization_id" {}
variable "token_key" {}
variable "token_secret" {}

variable "service_id" {
description = "ClickHouse Cloud service ID"
}

variable "bucket_url" {
description = "S3 bucket URL pattern (e.g., s3://my-bucket/path/*.json)"
}

variable "sqs_queue_url" {
description = "SQS queue URL for S3 event notifications (e.g., https://sqs.us-east-1.amazonaws.com/123456789012/my-queue)"
}

variable "iam_role" {
description = "ARN of the IAM role with permissions to read from S3 and receive SQS messages"
sensitive = true
}

# S3 ClickPipe with continuous ingestion using SQS event notifications
# This example demonstrates event-based continuous ingestion where new files
# are detected via S3 event notifications sent to an SQS queue, rather than
# polling S3 for new files in lexicographical order.
resource "clickhouse_clickpipe" "s3_sqs_continuous" {
name = "S3 Continuous ClickPipe with SQS (IAM Role)"
service_id = var.service_id

source = {
object_storage = {
type = "s3"
format = "JSONEachRow"
url = var.bucket_url

# Enable continuous ingestion with event-based processing
is_continuous = true
queue_url = var.sqs_queue_url

# IAM role authentication - recommended for AWS services
authentication = "IAM_ROLE"
iam_role = var.iam_role
}
}

destination = {
table = "s3_events_data"
managed_table = true

table_definition = {
engine = {
type = "MergeTree"
}

sorting_key = ["timestamp"]
}

columns = [
{
name = "id"
type = "String"
},
{
name = "timestamp"
type = "DateTime64(3)"
},
{
name = "event_type"
type = "String"
},
{
name = "data"
type = "String"
}
]
}

field_mappings = [
{
source_field = "id"
destination_field = "id"
},
{
source_field = "timestamp"
destination_field = "timestamp"
},
{
source_field = "event_type"
destination_field = "event_type"
},
{
source_field = "data"
destination_field = "data"
}
]
}

output "clickpipe_id" {
value = clickhouse_clickpipe.s3_sqs_continuous.id
description = "The ID of the created ClickPipe"
}

output "clickpipe_state" {
value = clickhouse_clickpipe.s3_sqs_continuous.state
description = "The current state of the ClickPipe"
}
13 changes: 13 additions & 0 deletions examples/clickpipe/object_storage_s3_sqs_iam_role/provider.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
terraform {
required_providers {
clickhouse = {
source = "ClickHouse/clickhouse"
}
}
}

provider "clickhouse" {
organization_id = var.organization_id
token_key = var.token_key
token_secret = var.token_secret
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
terraform {
required_providers {
clickhouse = {
version = "${CLICKHOUSE_TERRAFORM_PROVIDER_VERSION}"
source = "ClickHouse/clickhouse"
}
}
}

provider "clickhouse" {
organization_id = var.organization_id
token_key = var.token_key
token_secret = var.token_secret
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# These keys are for example only and won't work when pointed to a deployed ClickHouse OpenAPI server
organization_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71"
token_key = "avhj1U5QCdWAE9CA9"
token_secret = "4b1dROiHQEuSXJHlV8zHFd0S7WQj7CGxz5kGJeJnca"
service_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71"

bucket_url = "s3://mybucket/path/*.json"

# SQS queue URL must follow the format: https://sqs.{region}.amazonaws.com/{account-id}/{queue-name}
sqs_queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/my-clickpipe-queue"

# IAM role ARN with permissions to read S3 and receive SQS messages
iam_role = "arn:aws:iam::123456789012:role/ClickPipeRole"
11 changes: 11 additions & 0 deletions examples/clickpipe/object_storage_s3_sqs_iam_user/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## ClickPipe Object Storage with S3/SQS/IAM user example

This example demonstrates how to deploy a ClickPipe with an S3 bucket as the input source with continuous ingestion using SQS event notifications, authenticated with an IAM user.

This setup enables event-based ingestion where new files are detected via S3 event notifications sent to an SQS queue, rather than polling S3 for new files.

## How to run

- Rename `variables.sample.tfvars` to `variables.tfvars` and fill in all needed data.
- Run `terraform init`
- Run `terraform <plan|apply> -var-file=variables.tfvars`
114 changes: 114 additions & 0 deletions examples/clickpipe/object_storage_s3_sqs_iam_user/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
variable "organization_id" {}
variable "token_key" {}
variable "token_secret" {}

variable "service_id" {
description = "ClickHouse Cloud service ID"
}

variable "bucket_url" {
description = "S3 bucket URL pattern (e.g., s3://my-bucket/path/*.json)"
}

variable "sqs_queue_url" {
description = "SQS queue URL for S3 event notifications (e.g., https://sqs.us-east-1.amazonaws.com/123456789012/my-queue)"
}

variable "iam_access_key_id" {
description = "AWS IAM access key ID with permissions to read from S3 and receive SQS messages"
sensitive = true
}

variable "iam_secret_key" {
description = "AWS IAM secret access key"
sensitive = true
}

# S3 ClickPipe with continuous ingestion using SQS event notifications
# This example demonstrates event-based continuous ingestion where new files
# are detected via S3 event notifications sent to an SQS queue, rather than
# polling S3 for new files in lexicographical order.
resource "clickhouse_clickpipe" "s3_sqs_continuous" {
name = "S3 Continuous ClickPipe with SQS (IAM User)"
service_id = var.service_id

source = {
object_storage = {
type = "s3"
format = "JSONEachRow"
url = var.bucket_url

# Enable continuous ingestion with event-based processing
is_continuous = true
queue_url = var.sqs_queue_url

# IAM user authentication
authentication = "IAM_USER"
access_key = {
access_key_id = var.iam_access_key_id
secret_key = var.iam_secret_key
}
}
}

destination = {
table = "s3_events_data"
managed_table = true

table_definition = {
engine = {
type = "MergeTree"
}

sorting_key = ["timestamp"]
}

columns = [
{
name = "id"
type = "String"
},
{
name = "timestamp"
type = "DateTime64(3)"
},
{
name = "event_type"
type = "String"
},
{
name = "data"
type = "String"
}
]
}

field_mappings = [
{
source_field = "id"
destination_field = "id"
},
{
source_field = "timestamp"
destination_field = "timestamp"
},
{
source_field = "event_type"
destination_field = "event_type"
},
{
source_field = "data"
destination_field = "data"
}
]
}

output "clickpipe_id" {
value = clickhouse_clickpipe.s3_sqs_continuous.id
description = "The ID of the created ClickPipe"
}

output "clickpipe_state" {
value = clickhouse_clickpipe.s3_sqs_continuous.state
description = "The current state of the ClickPipe"
}
13 changes: 13 additions & 0 deletions examples/clickpipe/object_storage_s3_sqs_iam_user/provider.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
terraform {
required_providers {
clickhouse = {
source = "ClickHouse/clickhouse"
}
}
}

provider "clickhouse" {
organization_id = var.organization_id
token_key = var.token_key
token_secret = var.token_secret
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
terraform {
required_providers {
clickhouse = {
version = "${CLICKHOUSE_TERRAFORM_PROVIDER_VERSION}"
source = "ClickHouse/clickhouse"
}
}
}

provider "clickhouse" {
organization_id = var.organization_id
token_key = var.token_key
token_secret = var.token_secret
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# These keys are for example only and won't work when pointed to a deployed ClickHouse OpenAPI server
organization_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71"
token_key = "avhj1U5QCdWAE9CA9"
token_secret = "4b1dROiHQEuSXJHlV8zHFd0S7WQj7CGxz5kGJeJnca"
service_id = "aee076c1-3f83-4637-95b1-ad5a0a825b71"

bucket_url = "s3://mybucket/path/*.json"

# SQS queue URL must follow the format: https://sqs.{region}.amazonaws.com/{account-id}/{queue-name}
sqs_queue_url = "https://sqs.us-east-1.amazonaws.com/123456789012/my-clickpipe-queue"

iam_access_key_id = "AKIAIOSFODNN7EXAMPLE"
iam_secret_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
3 changes: 2 additions & 1 deletion pkg/internal/api/clickpipe_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ type ClickPipeObjectStorageSource struct {
Delimiter *string `json:"delimiter,omitempty"`
Compression *string `json:"compression,omitempty"`

IsContinuous bool `json:"isContinuous"`
IsContinuous bool `json:"isContinuous"`
QueueURL *string `json:"queueUrl,omitempty"`

Authentication *string `json:"authentication,omitempty"`
AccessKey *ClickPipeSourceAccessKey `json:"accessKey,omitempty"`
Expand Down
Loading
Loading