Skip to content

Commit 22f00ec

Browse files
authored
add bulk delete state method in client sdk (#139)
1 parent 8114d3f commit 22f00ec

File tree

4 files changed

+174
-0
lines changed

4 files changed

+174
-0
lines changed

client/client.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ type Client interface {
8484
// ExecuteStateTransaction provides way to execute multiple operations on a specified store.
8585
ExecuteStateTransaction(ctx context.Context, storeName string, meta map[string]string, ops []*StateOperation) error
8686

87+
// DeleteBulkState deletes content for multiple keys from store.
88+
DeleteBulkState(ctx context.Context, storeName string, keys []string) error
89+
90+
// DeleteBulkState deletes content for multiple keys from store.
91+
DeleteBulkStateItems(ctx context.Context, storeName string, items []*DeleteStateItem) error
92+
8793
// WithTraceID adds existing trace ID to the outgoing context.
8894
WithTraceID(ctx context.Context, id string) context.Context
8995

client/client_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,13 @@ func (s *testDaprServer) DeleteState(ctx context.Context, req *pb.DeleteStateReq
150150
return &empty.Empty{}, nil
151151
}
152152

153+
func (s *testDaprServer) DeleteBulkState(ctx context.Context, req *pb.DeleteBulkStateRequest) (*empty.Empty, error) {
154+
for _, item := range req.States {
155+
delete(s.state, item.Key)
156+
}
157+
return &empty.Empty{}, nil
158+
}
159+
153160
func (s *testDaprServer) ExecuteStateTransaction(ctx context.Context, in *pb.ExecuteStateTransactionRequest) (*empty.Empty, error) {
154161
for _, op := range in.GetOperations() {
155162
item := op.GetRequest()

client/state.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ type SetStateItem struct {
125125
Options *StateOptions
126126
}
127127

128+
// DeleteStateItem represents a single state to be deleted.
129+
type DeleteStateItem SetStateItem
130+
128131
// ETag represents an versioned record information
129132
type ETag struct {
130133
Value string
@@ -339,6 +342,58 @@ func (c *GRPCClient) DeleteStateWithETag(ctx context.Context, storeName, key str
339342
return nil
340343
}
341344

345+
// DeleteBulkState deletes content for multiple keys from store.
346+
func (c *GRPCClient) DeleteBulkState(ctx context.Context, storeName string, keys []string) error {
347+
if len(keys) == 0 {
348+
return nil
349+
}
350+
351+
items := make([]*DeleteStateItem, 0, len(keys))
352+
for i := 0; i < len(keys); i++ {
353+
item := &DeleteStateItem{
354+
Key: keys[i],
355+
}
356+
items = append(items, item)
357+
}
358+
359+
return c.DeleteBulkStateItems(ctx, storeName, items)
360+
}
361+
362+
// DeleteBulkState deletes content for multiple keys from store.
363+
func (c *GRPCClient) DeleteBulkStateItems(ctx context.Context, storeName string, items []*DeleteStateItem) error {
364+
if len(items) == 0 {
365+
return nil
366+
}
367+
368+
states := make([]*v1.StateItem, 0, len(items))
369+
for i := 0; i < len(items); i++ {
370+
item := items[i]
371+
if err := hasRequiredStateArgs(storeName, item.Key); err != nil {
372+
return errors.Wrap(err, "missing required arguments")
373+
}
374+
375+
state := &v1.StateItem{
376+
Key: item.Key,
377+
Metadata: item.Metadata,
378+
Options: toProtoStateOptions(item.Options),
379+
}
380+
if item.Etag != nil {
381+
state.Etag = &v1.Etag{
382+
Value: item.Etag.Value,
383+
}
384+
}
385+
states = append(states, state)
386+
}
387+
388+
req := &pb.DeleteBulkStateRequest{
389+
StoreName: storeName,
390+
States: states,
391+
}
392+
_, err := c.protoClient.DeleteBulkState(c.withAuthToken(ctx), req)
393+
394+
return err
395+
}
396+
342397
func hasRequiredStateArgs(storeName, key string) error {
343398
if storeName == "" {
344399
return errors.New("store")

client/state_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,112 @@ func TestDeleteState(t *testing.T) {
163163
})
164164
}
165165

166+
func TestDeleteBulkState(t *testing.T) {
167+
ctx := context.Background()
168+
data := "test"
169+
store := "test"
170+
keys := []string{"key1", "key2", "key3"}
171+
172+
t.Run("delete not exist data", func(t *testing.T) {
173+
err := testClient.DeleteBulkState(ctx, store, keys)
174+
assert.Nil(t, err)
175+
})
176+
177+
t.Run("delete not exist data with stateIem", func(t *testing.T) {
178+
items := make([]*DeleteStateItem, 0, len(keys))
179+
for _, key := range keys {
180+
items = append(items, &DeleteStateItem{
181+
Key: key,
182+
Metadata: map[string]string{},
183+
Options: &StateOptions{
184+
Concurrency: StateConcurrencyFirstWrite,
185+
Consistency: StateConsistencyEventual,
186+
},
187+
})
188+
}
189+
err := testClient.DeleteBulkStateItems(ctx, store, items)
190+
assert.Nil(t, err)
191+
})
192+
193+
t.Run("delete exist data", func(t *testing.T) {
194+
// save data
195+
items := make([]*SetStateItem, 0, len(keys))
196+
for _, key := range keys {
197+
items = append(items, &SetStateItem{
198+
Key: key,
199+
Value: []byte(data),
200+
Metadata: map[string]string{},
201+
Etag: &ETag{Value: "1"},
202+
Options: &StateOptions{
203+
Concurrency: StateConcurrencyFirstWrite,
204+
Consistency: StateConsistencyEventual,
205+
},
206+
})
207+
}
208+
err := testClient.SaveBulkState(ctx, store, items...)
209+
assert.Nil(t, err)
210+
211+
// confirm data saved
212+
getItems, err := testClient.GetBulkState(ctx, store, keys, nil, 1)
213+
assert.NoError(t, err)
214+
assert.Equal(t, len(keys), len(getItems))
215+
216+
// delete
217+
err = testClient.DeleteBulkState(ctx, store, keys)
218+
assert.NoError(t, err)
219+
220+
// confirm data deleted
221+
getItems, err = testClient.GetBulkState(ctx, store, keys, nil, 1)
222+
assert.NoError(t, err)
223+
assert.Equal(t, 0, len(getItems))
224+
})
225+
226+
t.Run("delete exist data with stateItem", func(t *testing.T) {
227+
// save data
228+
items := make([]*SetStateItem, 0, len(keys))
229+
for _, key := range keys {
230+
items = append(items, &SetStateItem{
231+
Key: key,
232+
Value: []byte(data),
233+
Metadata: map[string]string{},
234+
Etag: &ETag{Value: "1"},
235+
Options: &StateOptions{
236+
Concurrency: StateConcurrencyFirstWrite,
237+
Consistency: StateConsistencyEventual,
238+
},
239+
})
240+
}
241+
err := testClient.SaveBulkState(ctx, store, items...)
242+
assert.Nil(t, err)
243+
244+
// confirm data saved
245+
getItems, err := testClient.GetBulkState(ctx, store, keys, nil, 1)
246+
assert.NoError(t, err)
247+
assert.Equal(t, len(keys), len(getItems))
248+
249+
// delete
250+
deleteItems := make([]*DeleteStateItem, 0, len(keys))
251+
for _, key := range keys {
252+
deleteItems = append(deleteItems, &DeleteStateItem{
253+
Key: key,
254+
Metadata: map[string]string{},
255+
Etag: &ETag{Value: "1"},
256+
Options: &StateOptions{
257+
Concurrency: StateConcurrencyFirstWrite,
258+
Consistency: StateConsistencyEventual,
259+
},
260+
})
261+
}
262+
err = testClient.DeleteBulkStateItems(ctx, store, deleteItems)
263+
assert.Nil(t, err)
264+
265+
// confirm data deleted
266+
getItems, err = testClient.GetBulkState(ctx, store, keys, nil, 1)
267+
assert.NoError(t, err)
268+
assert.Equal(t, 0, len(getItems))
269+
})
270+
}
271+
166272
// go test -timeout 30s ./client -count 1 -run ^TestStateTransactions$
167273
func TestStateTransactions(t *testing.T) {
168274
ctx := context.Background()

0 commit comments

Comments
 (0)