|
1 | 1 | package db
|
2 | 2 |
|
3 |
| -import ( |
4 |
| - "context" |
5 |
| - "database/sql" |
6 |
| - "math" |
7 |
| - prand "math/rand" |
8 |
| - "time" |
9 |
| - |
10 |
| - "github.com/lightninglabs/lightning-terminal/db/sqlc" |
11 |
| - "github.com/lightningnetwork/lnd/sqldb/v2" |
12 |
| -) |
13 |
| - |
14 |
| -var ( |
15 |
| - // DefaultStoreTimeout is the default timeout used for any interaction |
16 |
| - // with the storage/database. |
17 |
| - DefaultStoreTimeout = time.Second * 10 |
18 |
| -) |
19 |
| - |
20 |
| -const ( |
21 |
| - // DefaultNumTxRetries is the default number of times we'll retry a |
22 |
| - // transaction if it fails with an error that permits transaction |
23 |
| - // repetition. |
24 |
| - DefaultNumTxRetries = 10 |
25 |
| - |
26 |
| - // DefaultInitialRetryDelay is the default initial delay between |
27 |
| - // retries. This will be used to generate a random delay between -50% |
28 |
| - // and +50% of this value, so 20 to 60 milliseconds. The retry will be |
29 |
| - // doubled after each attempt until we reach DefaultMaxRetryDelay. We |
30 |
| - // start with a random value to avoid multiple goroutines that are |
31 |
| - // created at the same time to effectively retry at the same time. |
32 |
| - DefaultInitialRetryDelay = time.Millisecond * 40 |
33 |
| - |
34 |
| - // DefaultMaxRetryDelay is the default maximum delay between retries. |
35 |
| - DefaultMaxRetryDelay = time.Second * 3 |
36 |
| -) |
37 |
| - |
38 |
| -// TxOptions represents a set of options one can use to control what type of |
39 |
| -// database transaction is created. Transaction can wither be read or write. |
40 |
| -type TxOptions interface { |
41 |
| - // ReadOnly returns true if the transaction should be read only. |
42 |
| - ReadOnly() bool |
43 |
| -} |
44 |
| - |
45 |
| -// BatchedTx is a generic interface that represents the ability to execute |
46 |
| -// several operations to a given storage interface in a single atomic |
47 |
| -// transaction. Typically, Q here will be some subset of the main sqlc.Querier |
48 |
| -// interface allowing it to only depend on the routines it needs to implement |
49 |
| -// any additional business logic. |
50 |
| -type BatchedTx[Q any] interface { |
51 |
| - // ExecTx will execute the passed txBody, operating upon generic |
52 |
| - // parameter Q (usually a storage interface) in a single transaction. |
53 |
| - // The set of TxOptions are passed in in order to allow the caller to |
54 |
| - // specify if a transaction should be read-only and optionally what |
55 |
| - // type of concurrency control should be used. |
56 |
| - ExecTx(ctx context.Context, txOptions TxOptions, |
57 |
| - txBody func(Q) error) error |
58 |
| - |
59 |
| - // Backend returns the type of the database backend used. |
60 |
| - Backend() sqldb.BackendType |
61 |
| -} |
62 |
| - |
63 |
| -// Tx represents a database transaction that can be committed or rolled back. |
64 |
| -type Tx interface { |
65 |
| - // Commit commits the database transaction, an error should be returned |
66 |
| - // if the commit isn't possible. |
67 |
| - Commit() error |
68 |
| - |
69 |
| - // Rollback rolls back an incomplete database transaction. |
70 |
| - // Transactions that were able to be committed can still call this as a |
71 |
| - // noop. |
72 |
| - Rollback() error |
73 |
| -} |
74 |
| - |
75 |
| -// QueryCreator is a generic function that's used to create a Querier, which is |
76 |
| -// a type of interface that implements storage related methods from a database |
77 |
| -// transaction. This will be used to instantiate an object callers can use to |
78 |
| -// apply multiple modifications to an object interface in a single atomic |
79 |
| -// transaction. |
80 |
| -type QueryCreator[Q any] func(*sql.Tx) Q |
81 |
| - |
82 |
| -// BatchedQuerier is a generic interface that allows callers to create a new |
83 |
| -// database transaction based on an abstract type that implements the TxOptions |
84 |
| -// interface. |
85 |
| -type BatchedQuerier interface { |
86 |
| - // Querier is the underlying query source, this is in place so we can |
87 |
| - // pass a BatchedQuerier implementation directly into objects that |
88 |
| - // create a batched version of the normal methods they need. |
89 |
| - sqlc.Querier |
90 |
| - |
91 |
| - // CustomQueries is the set of custom queries that we have manually |
92 |
| - // defined in addition to the ones generated by sqlc. |
93 |
| - sqlc.CustomQueries |
94 |
| - |
95 |
| - // BeginTx creates a new database transaction given the set of |
96 |
| - // transaction options. |
97 |
| - BeginTx(ctx context.Context, options TxOptions) (*sql.Tx, error) |
98 |
| -} |
99 |
| - |
100 |
| -// txExecutorOptions is a struct that holds the options for the transaction |
101 |
| -// executor. This can be used to do things like retry a transaction due to an |
102 |
| -// error a certain amount of times. |
103 |
| -type txExecutorOptions struct { |
104 |
| - numRetries int |
105 |
| - initialRetryDelay time.Duration |
106 |
| - maxRetryDelay time.Duration |
107 |
| -} |
108 |
| - |
109 |
| -// defaultTxExecutorOptions returns the default options for the transaction |
110 |
| -// executor. |
111 |
| -func defaultTxExecutorOptions() *txExecutorOptions { |
112 |
| - return &txExecutorOptions{ |
113 |
| - numRetries: DefaultNumTxRetries, |
114 |
| - initialRetryDelay: DefaultInitialRetryDelay, |
115 |
| - maxRetryDelay: DefaultMaxRetryDelay, |
116 |
| - } |
117 |
| -} |
118 |
| - |
119 |
| -// randRetryDelay returns a random retry delay between -50% and +50% |
120 |
| -// of the configured delay that is doubled for each attempt and capped at a max |
121 |
| -// value. |
122 |
| -func (t *txExecutorOptions) randRetryDelay(attempt int) time.Duration { |
123 |
| - halfDelay := t.initialRetryDelay / 2 |
124 |
| - randDelay := prand.Int63n(int64(t.initialRetryDelay)) //nolint:gosec |
125 |
| - |
126 |
| - // 50% plus 0%-100% gives us the range of 50%-150%. |
127 |
| - initialDelay := halfDelay + time.Duration(randDelay) |
128 |
| - |
129 |
| - // If this is the first attempt, we just return the initial delay. |
130 |
| - if attempt == 0 { |
131 |
| - return initialDelay |
132 |
| - } |
133 |
| - |
134 |
| - // For each subsequent delay, we double the initial delay. This still |
135 |
| - // gives us a somewhat random delay, but it still increases with each |
136 |
| - // attempt. If we double something n times, that's the same as |
137 |
| - // multiplying the value with 2^n. We limit the power to 32 to avoid |
138 |
| - // overflows. |
139 |
| - factor := time.Duration(math.Pow(2, math.Min(float64(attempt), 32))) |
140 |
| - actualDelay := initialDelay * factor |
141 |
| - |
142 |
| - // Cap the delay at the maximum configured value. |
143 |
| - if actualDelay > t.maxRetryDelay { |
144 |
| - return t.maxRetryDelay |
145 |
| - } |
146 |
| - |
147 |
| - return actualDelay |
148 |
| -} |
149 |
| - |
150 |
| -// TxExecutorOption is a functional option that allows us to pass in optional |
151 |
| -// argument when creating the executor. |
152 |
| -type TxExecutorOption func(*txExecutorOptions) |
153 |
| - |
154 |
| -// WithTxRetries is a functional option that allows us to specify the number of |
155 |
| -// times a transaction should be retried if it fails with a repeatable error. |
156 |
| -func WithTxRetries(numRetries int) TxExecutorOption { |
157 |
| - return func(o *txExecutorOptions) { |
158 |
| - o.numRetries = numRetries |
159 |
| - } |
160 |
| -} |
161 |
| - |
162 |
| -// WithTxRetryDelay is a functional option that allows us to specify the delay |
163 |
| -// to wait before a transaction is retried. |
164 |
| -func WithTxRetryDelay(delay time.Duration) TxExecutorOption { |
165 |
| - return func(o *txExecutorOptions) { |
166 |
| - o.initialRetryDelay = delay |
167 |
| - } |
168 |
| -} |
169 |
| - |
170 |
| -// TransactionExecutor is a generic struct that abstracts away from the type of |
171 |
| -// query a type needs to run under a database transaction, and also the set of |
172 |
| -// options for that transaction. The QueryCreator is used to create a query |
173 |
| -// given a database transaction created by the BatchedQuerier. |
174 |
| -type TransactionExecutor[Query any] struct { |
175 |
| - BatchedQuerier |
176 |
| - |
177 |
| - createQuery QueryCreator[Query] |
178 |
| - |
179 |
| - opts *txExecutorOptions |
180 |
| -} |
181 |
| - |
182 |
| -// NewTransactionExecutor creates a new instance of a TransactionExecutor given |
183 |
| -// a Querier query object and a concrete type for the type of transactions the |
184 |
| -// Querier understands. |
185 |
| -func NewTransactionExecutor[Querier any](db BatchedQuerier, |
186 |
| - createQuery QueryCreator[Querier], |
187 |
| - opts ...TxExecutorOption) *TransactionExecutor[Querier] { |
188 |
| - |
189 |
| - txOpts := defaultTxExecutorOptions() |
190 |
| - for _, optFunc := range opts { |
191 |
| - optFunc(txOpts) |
192 |
| - } |
193 |
| - |
194 |
| - return &TransactionExecutor[Querier]{ |
195 |
| - BatchedQuerier: db, |
196 |
| - createQuery: createQuery, |
197 |
| - opts: txOpts, |
198 |
| - } |
199 |
| -} |
200 |
| - |
201 |
| -// ExecTx is a wrapper for txBody to abstract the creation and commit of a db |
202 |
| -// transaction. The db transaction is embedded in a `*Queries` that txBody |
203 |
| -// needs to use when executing each one of the queries that need to be applied |
204 |
| -// atomically. This can be used by other storage interfaces to parameterize the |
205 |
| -// type of query and options run, in order to have access to batched operations |
206 |
| -// related to a storage object. |
207 |
| -func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context, |
208 |
| - txOptions TxOptions, txBody func(Q) error) error { |
209 |
| - |
210 |
| - waitBeforeRetry := func(attemptNumber int) { |
211 |
| - retryDelay := t.opts.randRetryDelay(attemptNumber) |
212 |
| - |
213 |
| - log.Tracef("Retrying transaction due to tx serialization or "+ |
214 |
| - "deadlock error, attempt_number=%v, delay=%v", |
215 |
| - attemptNumber, retryDelay) |
216 |
| - |
217 |
| - // Before we try again, we'll wait with a random backoff based |
218 |
| - // on the retry delay. |
219 |
| - time.Sleep(retryDelay) |
220 |
| - } |
221 |
| - |
222 |
| - for i := 0; i < t.opts.numRetries; i++ { |
223 |
| - // Create the db transaction. |
224 |
| - tx, err := t.BatchedQuerier.BeginTx(ctx, txOptions) |
225 |
| - if err != nil { |
226 |
| - dbErr := MapSQLError(err) |
227 |
| - if IsSerializationOrDeadlockError(dbErr) { |
228 |
| - // Nothing to roll back here, since we didn't |
229 |
| - // even get a transaction yet. |
230 |
| - waitBeforeRetry(i) |
231 |
| - continue |
232 |
| - } |
233 |
| - |
234 |
| - return dbErr |
235 |
| - } |
236 |
| - |
237 |
| - // Rollback is safe to call even if the tx is already closed, |
238 |
| - // so if the tx commits successfully, this is a no-op. |
239 |
| - defer func() { |
240 |
| - _ = tx.Rollback() |
241 |
| - }() |
242 |
| - |
243 |
| - if err := txBody(t.createQuery(tx)); err != nil { |
244 |
| - dbErr := MapSQLError(err) |
245 |
| - if IsSerializationOrDeadlockError(dbErr) { |
246 |
| - // Roll back the transaction, then pop back up |
247 |
| - // to try once again. |
248 |
| - _ = tx.Rollback() |
249 |
| - |
250 |
| - waitBeforeRetry(i) |
251 |
| - continue |
252 |
| - } |
253 |
| - |
254 |
| - return dbErr |
255 |
| - } |
256 |
| - |
257 |
| - // Commit transaction. |
258 |
| - if err = tx.Commit(); err != nil { |
259 |
| - dbErr := MapSQLError(err) |
260 |
| - if IsSerializationOrDeadlockError(dbErr) { |
261 |
| - // Roll back the transaction, then pop back up |
262 |
| - // to try once again. |
263 |
| - _ = tx.Rollback() |
264 |
| - |
265 |
| - waitBeforeRetry(i) |
266 |
| - continue |
267 |
| - } |
268 |
| - |
269 |
| - return dbErr |
270 |
| - } |
271 |
| - |
272 |
| - return nil |
273 |
| - } |
274 |
| - |
275 |
| - // If we get to this point, then we weren't able to successfully commit |
276 |
| - // a tx given the max number of retries. |
277 |
| - return ErrRetriesExceeded |
278 |
| -} |
279 |
| - |
280 |
| -// Backend returns the type of the database backend used. |
281 |
| -func (t *TransactionExecutor[Q]) Backend() sqldb.BackendType { |
282 |
| - return t.BatchedQuerier.Backend() |
283 |
| -} |
284 |
| - |
285 |
| -// BaseDB is the base database struct that each implementation can embed to |
286 |
| -// gain some common functionality. |
287 |
| -type BaseDB struct { |
288 |
| - *sql.DB |
289 |
| - |
290 |
| - *sqlc.Queries |
291 |
| -} |
292 |
| - |
293 |
| -// BeginTx wraps the normal sql specific BeginTx method with the TxOptions |
294 |
| -// interface. This interface is then mapped to the concrete sql tx options |
295 |
| -// struct. |
296 |
| -func (s *BaseDB) BeginTx(ctx context.Context, opts TxOptions) (*sql.Tx, error) { |
297 |
| - sqlOptions := sql.TxOptions{ |
298 |
| - ReadOnly: opts.ReadOnly(), |
299 |
| - Isolation: sql.LevelSerializable, |
300 |
| - } |
301 |
| - return s.DB.BeginTx(ctx, &sqlOptions) |
302 |
| -} |
303 |
| - |
304 |
| -// Backend returns the type of the database backend used. |
305 |
| -func (s *BaseDB) Backend() sqldb.BackendType { |
306 |
| - return s.Queries.Backend() |
307 |
| -} |
308 |
| - |
309 | 3 | // QueriesTxOptions defines the set of db txn options the SQLQueries
|
310 | 4 | // understands.
|
311 | 5 | type QueriesTxOptions struct {
|
|
0 commit comments