@@ -4,71 +4,263 @@ package etcd
44
55import (
66 "context"
7+ "errors"
78 "fmt"
89
910 etcd "go.etcd.io/etcd/client/v3"
1011
1112 "github.com/tarantool/go-storage/driver"
13+ "github.com/tarantool/go-storage/kv"
1214 "github.com/tarantool/go-storage/operation"
1315 "github.com/tarantool/go-storage/predicate"
1416 "github.com/tarantool/go-storage/tx"
1517 "github.com/tarantool/go-storage/watch"
1618)
1719
20+ // Client defines the minimal interface needed for etcd operations.
21+ // This allows for easier testing and mock implementations.
22+ type Client interface {
23+ // Txn creates a new transaction.
24+ Txn (ctx context.Context ) etcd.Txn
25+ }
26+
27+ // Watcher defines the interface for watching etcd changes.
28+ // This extends the etcd.Watcher interface to match our usage pattern.
29+ type Watcher interface {
30+ // Watch watches for changes on a key (using etcd's signature).
31+ Watch (ctx context.Context , key string , opts ... etcd.OpOption ) etcd.WatchChan
32+ // Close closes the watcher.
33+ Close () error
34+ }
35+
36+ // WatcherFactory creates new watchers from a client.
37+ type WatcherFactory interface {
38+ // NewWatcher creates a new watcher.
39+ NewWatcher (client Client ) Watcher
40+ }
41+
1842// Driver is an etcd implementation of the storage driver interface.
1943// It uses etcd as the underlying key-value storage backend.
2044type Driver struct {
21- client * etcd.Client // etcd client instance..
45+ client Client // etcd client interface.
46+ watcherFactory WatcherFactory // factory for creating watchers.
2247}
2348
2449var (
2550 _ driver.Driver = & Driver {} //nolint:exhaustruct
51+
52+ // Static error definitions to avoid dynamic errors.
53+ errUnsupportedPredicateTarget = errors .New ("unsupported predicate target" )
54+ errValuePredicateRequiresBytes = errors .New ("value predicate requires []byte value" )
55+ errUnsupportedValueOperation = errors .New ("unsupported operation for value predicate" )
56+ errVersionPredicateRequiresInt = errors .New ("version predicate requires int64 value" )
57+ errUnsupportedVersionOperation = errors .New ("unsupported operation for version predicate" )
58+ errUnsupportedOperationType = errors .New ("unsupported operation type" )
2659)
2760
28- // New creates a new etcd driver instance.
29- // It establishes a connection to the etcd cluster using the provided endpoints.
30- func New (ctx context.Context , endpoints []string ) (* Driver , error ) {
31- client , err := etcd .New (etcd.Config {
32- Context : ctx ,
33- Endpoints : endpoints ,
34- AutoSyncInterval : 0 ,
35- DialTimeout : 0 ,
36- DialKeepAliveTime : 0 ,
37- DialKeepAliveTimeout : 0 ,
38- MaxCallSendMsgSize : 0 ,
39- MaxCallRecvMsgSize : 0 ,
40- TLS : nil ,
41- Username : "" ,
42- Password : "" ,
43- RejectOldCluster : false ,
44- DialOptions : nil ,
45- Logger : nil ,
46- LogConfig : nil ,
47- PermitWithoutStream : false ,
48- MaxUnaryRetries : 0 ,
49- BackoffWaitBetween : 0 ,
50- BackoffJitterFraction : 0 ,
51- })
52- if err != nil {
53- return nil , fmt .Errorf ("failed to create etcd client: %w" , err )
61+ // etcdClientAdapter wraps etcd.Client to implement our Client interface.
62+ type etcdClientAdapter struct {
63+ client * etcd.Client
64+ }
65+
66+ func (a * etcdClientAdapter ) Txn (ctx context.Context ) etcd.Txn {
67+ return a .client .Txn (ctx )
68+ }
69+
70+ // etcdWatcherAdapter wraps etcd.Watcher to implement our Watcher interface.
71+ type etcdWatcherAdapter struct {
72+ watcher etcd.Watcher
73+ }
74+
75+ func (a * etcdWatcherAdapter ) Watch (ctx context.Context , key string , opts ... etcd.OpOption ) etcd.WatchChan {
76+ return a .watcher .Watch (ctx , key , opts ... )
77+ }
78+
79+ func (a * etcdWatcherAdapter ) Close () error {
80+ return fmt .Errorf ("failed to close: %w" , a .watcher .Close ())
81+ }
82+
83+ // etcdWatcherFactory implements WatcherFactory for etcd clients.
84+ type etcdWatcherFactory struct {}
85+
86+ func (f * etcdWatcherFactory ) NewWatcher (client Client ) Watcher {
87+ // For etcd clients, we need access to the underlying client.
88+ if adapter , ok := client .(* etcdClientAdapter ); ok {
89+ return & etcdWatcherAdapter {
90+ watcher : etcd .NewWatcher (adapter .client ),
91+ }
5492 }
93+ // For other implementations, return a no-op watcher.
94+ return & noopWatcher {}
95+ }
96+
97+ // noopWatcher is a no-op implementation of Watcher for non-etcd clients.
98+ type noopWatcher struct {}
99+
100+ func (w * noopWatcher ) Watch (_ context.Context , _ string , _ ... etcd.OpOption ) etcd.WatchChan {
101+ ch := make (chan etcd.WatchResponse )
102+ close (ch )
55103
56- return & Driver {client : client }, nil
104+ return ch
105+ }
106+
107+ func (w * noopWatcher ) Close () error {
108+ return nil
109+ }
110+
111+ // New creates a new etcd driver instance using an existing etcd client.
112+ // The client should be properly configured and connected to an etcd cluster.
113+ func New (client * etcd.Client ) * Driver {
114+ return & Driver {
115+ client : & etcdClientAdapter {client : client },
116+ watcherFactory : & etcdWatcherFactory {},
117+ }
118+ }
119+
120+ // NewWithInterfaces creates a new etcd driver instance using interface abstractions.
121+ // This is useful for testing with mock clients.
122+ func NewWithInterfaces (client Client , watcherFactory WatcherFactory ) * Driver {
123+ if watcherFactory == nil {
124+ watcherFactory = & etcdWatcherFactory {}
125+ }
126+
127+ return & Driver {
128+ client : client ,
129+ watcherFactory : watcherFactory ,
130+ }
57131}
58132
59133// Execute executes a transactional operation with conditional logic.
60134// It processes predicates to determine whether to execute thenOps or elseOps.
61135func (d Driver ) Execute (
62- _ context.Context ,
63- _ []predicate.Predicate ,
64- _ []operation.Operation ,
65- _ []operation.Operation ,
136+ ctx context.Context ,
137+ predicates []predicate.Predicate ,
138+ thenOps []operation.Operation ,
139+ elseOps []operation.Operation ,
66140) (tx.Response , error ) {
67- panic ("implement me" )
141+ txn := d .client .Txn (ctx )
142+
143+ for _ , p := range predicates {
144+ cmp , err := predicateToCmp (p )
145+ if err != nil {
146+ return tx.Response {}, fmt .Errorf ("failed to convert predicate: %w" , err )
147+ }
148+
149+ txn = txn .If (cmp )
150+ }
151+
152+ thenEtcdOps , err := operationsToEtcdOps (thenOps )
153+ if err != nil {
154+ return tx.Response {}, fmt .Errorf ("failed to convert then operations: %w" , err )
155+ }
156+
157+ txn = txn .Then (thenEtcdOps ... )
158+
159+ elseEtcdOps , err := operationsToEtcdOps (elseOps )
160+ if err != nil {
161+ return tx.Response {}, fmt .Errorf ("failed to convert else operations: %w" , err )
162+ }
163+
164+ txn = txn .Else (elseEtcdOps ... )
165+
166+ resp , err := txn .Commit ()
167+ if err != nil {
168+ return tx.Response {}, fmt .Errorf ("transaction failed: %w" , err )
169+ }
170+
171+ return etcdResponseToTxResponse (resp ), nil
68172}
69173
174+ const (
175+ eventChannelSize = 100
176+ )
177+
70178// Watch monitors changes to a specific key and returns a stream of events.
71179// It supports optional watch configuration through the opts parameter.
72- func (d Driver ) Watch (_ context.Context , _ []byte , _ ... watch.Option ) (<- chan watch.Event , func (), error ) {
73- panic ("implement me" )
180+ func (d Driver ) Watch (ctx context.Context , key []byte , _ ... watch.Option ) (<- chan watch.Event , func (), error ) {
181+ eventCh := make (chan watch.Event , eventChannelSize )
182+
183+ parentWatcher := d .watcherFactory .NewWatcher (d .client )
184+
185+ go func () {
186+ defer close (eventCh )
187+
188+ watchChan := parentWatcher .Watch (ctx , string (key ))
189+
190+ for {
191+ select {
192+ case <- ctx .Done ():
193+ return
194+ case watchResp , ok := <- watchChan :
195+ if ! ok {
196+ return
197+ }
198+
199+ if watchResp .Err () != nil {
200+ continue
201+ }
202+
203+ for _ , etcdEvent := range watchResp .Events {
204+ event := etcdEventToWatchEvent (etcdEvent )
205+ select {
206+ case eventCh <- event :
207+ case <- ctx .Done ():
208+ return
209+ }
210+ }
211+ }
212+ }
213+ }()
214+
215+ return eventCh , func () {
216+ _ = parentWatcher .Close ()
217+ }, nil
218+ }
219+
220+ // etcdResponseToTxResponse converts an etcd transaction response to tx.Response.
221+ func etcdResponseToTxResponse (resp * etcd.TxnResponse ) tx.Response {
222+ results := make ([]tx.RequestResponse , 0 , len (resp .Responses ))
223+
224+ for _ , etcdResp := range resp .Responses {
225+ var values []kv.KeyValue
226+
227+ switch {
228+ case etcdResp .GetResponseRange () != nil :
229+ getResp := etcdResp .GetResponseRange ()
230+ for _ , etcdKv := range getResp .Kvs {
231+ values = append (values , kv.KeyValue {
232+ Key : etcdKv .Key ,
233+ Value : etcdKv .Value ,
234+ ModRevision : etcdKv .ModRevision ,
235+ })
236+ }
237+ case etcdResp .GetResponsePut () != nil :
238+ // Put operations don't return data.
239+ case etcdResp .GetResponseDeleteRange () != nil :
240+ deleteResp := etcdResp .GetResponseDeleteRange ()
241+ for _ , etcdKv := range deleteResp .PrevKvs {
242+ values = append (values , kv.KeyValue {
243+ Key : etcdKv .Key ,
244+ Value : etcdKv .Value ,
245+ ModRevision : etcdKv .ModRevision ,
246+ })
247+ }
248+ }
249+
250+ results = append (results , tx.RequestResponse {
251+ Values : values ,
252+ })
253+ }
254+
255+ return tx.Response {
256+ Succeeded : resp .Succeeded ,
257+ Results : results ,
258+ }
259+ }
260+
261+ // etcdEventToWatchEvent converts an etcd event to a watch event.
262+ func etcdEventToWatchEvent (etcdEvent * etcd.Event ) watch.Event {
263+ return watch.Event {
264+ Prefix : etcdEvent .Kv .Key ,
265+ }
74266}
0 commit comments