Skip to content

Commit 2be2b82

Browse files
committed
Add conformance tests for ClickHouse state store and improve implementation
1 parent 7c3242e commit 2be2b82

File tree

6 files changed

+283
-1
lines changed

6 files changed

+283
-1
lines changed

state/clickhouse/clickhouse.go

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ func (c *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
146146
}
147147
}
148148

149-
etag := uuid.New().String()
150149
value, err := c.marshal(req.Value)
151150
if err != nil {
152151
return err
@@ -158,6 +157,32 @@ func (c *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
158157
expireTime = &t
159158
}
160159

160+
// Handle ETag for optimistic concurrency
161+
if req.ETag != nil && *req.ETag != "" {
162+
// First, get the current etag
163+
currentETag, err := c.getETag(ctx, req.Key)
164+
if err != nil {
165+
return err
166+
}
167+
168+
// If an etag exists and it doesn't match the provided etag, return error
169+
if currentETag != "" && currentETag != *req.ETag {
170+
return state.NewETagError(state.ETagMismatch, nil)
171+
}
172+
} else if req.Options.Concurrency == state.FirstWrite {
173+
// Check if the key already exists for first-write
174+
exists, err := c.keyExists(ctx, req.Key)
175+
if err != nil {
176+
return err
177+
}
178+
if exists {
179+
return state.NewETagError(state.ETagMismatch, nil)
180+
}
181+
}
182+
183+
// Generate a new etag for this write
184+
etag := uuid.New().String()
185+
161186
// ClickHouse uses ALTER TABLE ... UPDATE instead of ON DUPLICATE KEY
162187
// First try to insert
163188
insertQuery := fmt.Sprintf(`
@@ -188,6 +213,20 @@ func (c *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error
188213
return errors.New("key is empty")
189214
}
190215

216+
// Handle ETag for optimistic concurrency
217+
if req.ETag != nil && *req.ETag != "" {
218+
// First, get the current etag
219+
currentETag, err := c.getETag(ctx, req.Key)
220+
if err != nil {
221+
return err
222+
}
223+
224+
// If an etag exists and it doesn't match the provided etag, return error
225+
if currentETag != "" && currentETag != *req.ETag {
226+
return state.NewETagError(state.ETagMismatch, nil)
227+
}
228+
}
229+
191230
query := fmt.Sprintf("DELETE FROM %s.%s WHERE key = ?", c.config.Database, c.config.Table)
192231
_, err := c.db.ExecContext(ctx, query, req.Key)
193232
return err
@@ -309,3 +348,44 @@ func (c *StateStore) Close() error {
309348
}
310349
return nil
311350
}
351+
352+
// getETag retrieves the ETag for a specific key
353+
func (c *StateStore) getETag(ctx context.Context, key string) (string, error) {
354+
query := fmt.Sprintf(`
355+
SELECT etag
356+
FROM %s.%s FINAL -- Add FINAL to get the latest version
357+
WHERE key = ? AND (expire IS NULL OR expire > now64())
358+
`, c.config.Database, c.config.Table)
359+
360+
var etag string
361+
err := c.db.QueryRowContext(ctx, query, key).Scan(&etag)
362+
if err == sql.ErrNoRows {
363+
return "", nil
364+
}
365+
if err != nil {
366+
return "", fmt.Errorf("error getting etag: %v", err)
367+
}
368+
369+
return etag, nil
370+
}
371+
372+
// keyExists checks if a key exists in the state store
373+
func (c *StateStore) keyExists(ctx context.Context, key string) (bool, error) {
374+
query := fmt.Sprintf(`
375+
SELECT 1
376+
FROM %s.%s FINAL -- Add FINAL to get the latest version
377+
WHERE key = ? AND (expire IS NULL OR expire > now64())
378+
LIMIT 1
379+
`, c.config.Database, c.config.Table)
380+
381+
var exists int
382+
err := c.db.QueryRowContext(ctx, query, key).Scan(&exists)
383+
if err == sql.ErrNoRows {
384+
return false, nil
385+
}
386+
if err != nil {
387+
return false, fmt.Errorf("error checking key existence: %v", err)
388+
}
389+
390+
return true, nil
391+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
//go:build conftests
2+
// +build conftests
3+
4+
/*
5+
Copyright 2021 The Dapr Authors
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package clickhouse
18+
19+
import (
20+
"fmt"
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
"github.com/stretchr/testify/require"
25+
26+
"github.com/dapr/components-contrib/state"
27+
"github.com/dapr/components-contrib/state/clickhouse"
28+
"github.com/dapr/components-contrib/tests/certification/flow"
29+
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
30+
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
31+
"github.com/dapr/kit/logger"
32+
)
33+
34+
const (
35+
componentName = "clickhouse-store"
36+
)
37+
38+
func TestClickHouseStateStore(t *testing.T) {
39+
flow.New(t, "Test ClickHouse state store certification").
40+
Step(dockercompose.Run("docker-compose.yml")).
41+
Step("verify clickhouse store operations", testClickHouseStateStore()).
42+
Run()
43+
}
44+
45+
func testClickHouseStateStore() flow.Runnable {
46+
return func(ctx flow.Context) error {
47+
// Create a new ClickHouse state store instance
48+
store := clickhouse.NewClickHouseStateStore(logger.NewLogger("clickhouse-store-test"))
49+
50+
// Initialize the state store
51+
metadata := state.Metadata{}
52+
metadata.Properties = map[string]string{
53+
"clickhouseURL": "tcp://localhost:9000",
54+
"databaseName": "dapr_test",
55+
"tableName": "state_test",
56+
"username": "default",
57+
"password": "",
58+
}
59+
60+
err := store.Init(ctx, metadata)
61+
require.NoError(ctx.T, err)
62+
63+
// Set a value
64+
setReq := &state.SetRequest{
65+
Key: "test-key",
66+
Value: []byte("test-value"),
67+
}
68+
err = store.Set(ctx, setReq)
69+
require.NoError(ctx.T, err)
70+
71+
// Get the value
72+
getReq := &state.GetRequest{
73+
Key: "test-key",
74+
}
75+
getResp, err := store.Get(ctx, getReq)
76+
require.NoError(ctx.T, err)
77+
assert.Equal(ctx.T, "test-value", string(getResp.Data))
78+
79+
// Delete the value
80+
delReq := &state.DeleteRequest{
81+
Key: "test-key",
82+
}
83+
err = store.Delete(ctx, delReq)
84+
require.NoError(ctx.T, err)
85+
86+
// Verify the value is deleted
87+
getResp, err = store.Get(ctx, getReq)
88+
require.NoError(ctx.T, err)
89+
assert.Nil(ctx.T, getResp.Data)
90+
91+
// Test TTL
92+
ttlSetReq := &state.SetRequest{
93+
Key: "test-ttl-key",
94+
Value: []byte("test-ttl-value"),
95+
Metadata: map[string]string{
96+
"ttlInSeconds": "1", // 1 second TTL
97+
},
98+
}
99+
err = store.Set(ctx, ttlSetReq)
100+
require.NoError(ctx.T, err)
101+
102+
// Wait for TTL to expire (2 seconds to be safe)
103+
fmt.Println("Waiting for TTL to expire...")
104+
flow.Sleep(2 * flow.Second)
105+
106+
// Verify the value is expired
107+
ttlGetReq := &state.GetRequest{
108+
Key: "test-ttl-key",
109+
}
110+
ttlGetResp, err := store.Get(ctx, ttlGetReq)
111+
require.NoError(ctx.T, err)
112+
assert.Nil(ctx.T, ttlGetResp.Data)
113+
114+
// Test ETag
115+
etagSetReq := &state.SetRequest{
116+
Key: "test-etag-key",
117+
Value: []byte("test-etag-value"),
118+
}
119+
err = store.Set(ctx, etagSetReq)
120+
require.NoError(ctx.T, err)
121+
122+
// Get the value with ETag
123+
etagGetReq := &state.GetRequest{
124+
Key: "test-etag-key",
125+
}
126+
etagGetResp, err := store.Get(ctx, etagGetReq)
127+
require.NoError(ctx.T, err)
128+
assert.NotNil(ctx.T, etagGetResp.ETag)
129+
130+
// Update with correct ETag
131+
etagUpdateReq := &state.SetRequest{
132+
Key: "test-etag-key",
133+
Value: []byte("test-etag-value-updated"),
134+
ETag: etagGetResp.ETag,
135+
}
136+
err = store.Set(ctx, etagUpdateReq)
137+
require.NoError(ctx.T, err)
138+
139+
// Update with incorrect ETag
140+
badETag := "bad-etag"
141+
etagBadUpdateReq := &state.SetRequest{
142+
Key: "test-etag-key",
143+
Value: []byte("test-etag-value-updated-again"),
144+
ETag: &badETag,
145+
}
146+
err = store.Set(ctx, etagBadUpdateReq)
147+
require.Error(ctx.T, err)
148+
149+
// Clean up
150+
err = store.Delete(ctx, &state.DeleteRequest{Key: "test-etag-key"})
151+
require.NoError(ctx.T, err)
152+
153+
return nil
154+
}
155+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
version: '3.8'
2+
3+
services:
4+
clickhouse:
5+
image: clickhouse/clickhouse-server:latest
6+
ports:
7+
- "9000:9000"
8+
- "8123:8123"
9+
environment:
10+
- CLICKHOUSE_USER=default
11+
- CLICKHOUSE_PASSWORD=
12+
- CLICKHOUSE_DB=dapr_test
13+
ulimits:
14+
nofile:
15+
soft: 262144
16+
hard: 262144
17+
healthcheck:
18+
test: ["CMD", "wget", "--spider", "-q", "http://localhost:8123/ping"]
19+
interval: 5s
20+
timeout: 5s
21+
retries: 10
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: statestore
5+
spec:
6+
type: state.clickhouse
7+
version: v1
8+
metadata:
9+
- name: clickhouseURL
10+
value: "tcp://localhost:9000"
11+
- name: databaseName
12+
value: "dapr_test"
13+
- name: tableName
14+
value: "state_test"
15+
- name: username
16+
value: "default"
17+
- name: password
18+
value: ""

tests/config/state/tests.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,8 @@ components:
104104
operations: []
105105
- component: gcp.firestore.cloud
106106
operations: []
107+
- component: clickhouse
108+
operations: [ "etag", "first-write", "ttl" ]
109+
config:
110+
# This component requires etags to be UUIDs
111+
badEtag: "e9b9e142-74b1-4a2e-8e90-3f4ffeea2e70"

tests/conformance/state_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
s_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb"
3030
s_azuretablestorage "github.com/dapr/components-contrib/state/azure/tablestorage"
3131
s_cassandra "github.com/dapr/components-contrib/state/cassandra"
32+
s_clickhouse "github.com/dapr/components-contrib/state/clickhouse"
3233
s_cloudflareworkerskv "github.com/dapr/components-contrib/state/cloudflare/workerskv"
3334
s_cockroachdb_v1 "github.com/dapr/components-contrib/state/cockroachdb"
3435
s_etcd "github.com/dapr/components-contrib/state/etcd"
@@ -118,6 +119,8 @@ func loadStateStore(name string) state.Store {
118119
return s_cloudflareworkerskv.NewCFWorkersKV(testLogger)
119120
case "cockroachdb.v1":
120121
return s_cockroachdb_v1.New(testLogger)
122+
case "clickhouse":
123+
return s_clickhouse.NewClickHouseStateStore(testLogger)
121124
case "cockroachdb.v2":
122125
// v2 of the component is an alias for the PostgreSQL state store
123126
// We still have a conformance test to validate that the component works with CockroachDB

0 commit comments

Comments
 (0)