Skip to content

Commit a900aa9

Browse files
committed
assets+loopdb: handle asset deposit expiry and timeout sweep
With this commit, asset deposit expiry is checked on each new block. When a deposit expires, a timeout sweep is published. The deposit state is updated both when the sweep is first published and again upon its confirmation.
1 parent 064e45b commit a900aa9

File tree

5 files changed

+295
-0
lines changed

5 files changed

+295
-0
lines changed

assets/deposit/manager.go

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ var (
2626
// shutting down and that no further calls should be made to it.
2727
ErrManagerShuttingDown = errors.New("asset deposit manager is " +
2828
"shutting down")
29+
30+
// lockExpiration us the expiration time we use for sweep fee
31+
// paying inputs.
32+
lockExpiration = time.Hour * 24
2933
)
3034

3135
// DepositUpdateCallback is a callback that is called when a deposit state is
@@ -62,6 +66,10 @@ type Manager struct {
6266
// currentHeight is the current block height of the chain.
6367
currentHeight uint32
6468

69+
// pendingSweeps is a map of all pending timeout sweeps. The key is the
70+
// deposit ID.
71+
pendingSweeps map[string]struct{}
72+
6573
// deposits is a map of all active deposits. The key is the deposit ID.
6674
deposits map[string]*Deposit
6775

@@ -109,6 +117,7 @@ func NewManager(depositServiceClient swapserverrpc.AssetDepositServiceClient,
109117
sweeper: sweeper,
110118
addressParams: addressParams,
111119
deposits: make(map[string]*Deposit),
120+
pendingSweeps: make(map[string]struct{}),
112121
subscribers: make(map[string][]DepositUpdateCallback),
113122
callEnter: make(chan struct{}),
114123
callLeave: make(chan struct{}),
@@ -212,6 +221,43 @@ func (m *Manager) criticalError(err error) {
212221

213222
// handleBlockEpoch is called when a new block is added to the chain.
214223
func (m *Manager) handleBlockEpoch(ctx context.Context, height uint32) error {
224+
for _, d := range m.deposits {
225+
if d.State != StateConfirmed {
226+
continue
227+
}
228+
229+
log.Debugf("Checking if deposit %v is expired, expiry=%v", d.ID,
230+
d.ConfirmationHeight+d.CsvExpiry)
231+
232+
if height < d.ConfirmationHeight+d.CsvExpiry {
233+
continue
234+
}
235+
236+
err := m.handleDepositExpired(ctx, d)
237+
if err != nil {
238+
log.Errorf("Unable to update deposit %v state: %v",
239+
d.ID, err)
240+
241+
return err
242+
}
243+
}
244+
245+
// Now publish the timeout sweeps for all expired deposits and also
246+
// move them to the pending sweeps map.
247+
for _, d := range m.deposits {
248+
// TODO(bhandras): republish will insert a new transfer entry in
249+
// tapd, despite the transfer already existing. To avoid that,
250+
// we won't re-publish the timeout sweep for now.
251+
if d.State != StateExpired {
252+
continue
253+
}
254+
255+
err := m.publishTimeoutSweep(ctx, d)
256+
if err != nil {
257+
return err
258+
}
259+
}
260+
215261
return nil
216262
}
217263

@@ -635,3 +681,213 @@ func (m *Manager) ListDeposits(ctx context.Context, minConfs, maxConfs uint32) (
635681

636682
return filteredDeposits, nil
637683
}
684+
685+
// handleDepositStateUpdate updates the deposit state in the store and
686+
// notifies all subscribers of the deposit state change.
687+
func (m *Manager) handleDepositExpired(ctx context.Context, d *Deposit) error {
688+
// Generate a new address for the timeout sweep.
689+
rpcTimeoutSweepAddr, err := m.tapClient.NewAddr(
690+
ctx, &taprpc.NewAddrRequest{
691+
AssetId: d.AssetID[:],
692+
Amt: d.Amount,
693+
},
694+
)
695+
if err != nil {
696+
log.Errorf("Unable to create timeout sweep address: %v", err)
697+
698+
return err
699+
}
700+
701+
d.State = StateExpired
702+
d.SweepAddr = rpcTimeoutSweepAddr.Encoded
703+
704+
return m.handleDepositStateUpdate(ctx, d)
705+
}
706+
707+
// publishTimeoutSweep publishes a timeout sweep for the deposit. As we use the
708+
// same lock ID for the sponsoring inputs, it's possible to republish the sweep
709+
// however it'll create a new transfer entry in tapd, which we want to avoid
710+
// (for now).
711+
func (m *Manager) publishTimeoutSweep(ctx context.Context, d *Deposit) error {
712+
// Start monitoring the sweep unless we're already doing so.
713+
if _, ok := m.pendingSweeps[d.ID]; !ok {
714+
err := m.waitForDepositSpend(ctx, d)
715+
if err != nil {
716+
log.Errorf("Unable to wait for deposit %v spend: %v",
717+
d.ID, err)
718+
719+
return err
720+
}
721+
722+
m.pendingSweeps[d.ID] = struct{}{}
723+
}
724+
725+
log.Infof("(Re)publishing timeout sweep for deposit %v", d.ID)
726+
727+
// TODO(bhandras): conf target should be dynamic/configrable.
728+
const confTarget = 2
729+
feeRateSatPerKw, err := m.walletKit.EstimateFeeRate(
730+
ctx, confTarget,
731+
)
732+
733+
lockID, err := d.lockID()
734+
if err != nil {
735+
return err
736+
}
737+
738+
sweepAddr, err := address.DecodeAddress(d.SweepAddr, &m.addressParams)
739+
if err != nil {
740+
log.Errorf("Unable to decode timeout sweep address: %v", err)
741+
742+
return err
743+
}
744+
745+
snedResp, err := m.sweeper.PublishDepositTimeoutSweep(
746+
ctx, d.Kit, d.Proof, sweepAddr, feeRateSatPerKw.FeePerVByte(),
747+
lockID, lockExpiration,
748+
)
749+
if err != nil {
750+
// TOOD(bhandras): handle republish errors.
751+
log.Infof("Unable to publish timeout sweep for deposit %v: %v",
752+
d.ID, err)
753+
} else {
754+
log.Infof("Published timeout sweep for deposit %v: %x", d.ID,
755+
snedResp.Transfer.AnchorTxHash)
756+
757+
// Update deposit state on first successful publish.
758+
if d.State != StateTimeoutSweepPublished {
759+
d.State = StateTimeoutSweepPublished
760+
761+
err = m.handleDepositStateUpdate(ctx, d)
762+
if err != nil {
763+
log.Errorf("Unable to update deposit %v "+
764+
"state: %v", d.ID, err)
765+
766+
return err
767+
}
768+
}
769+
}
770+
771+
return nil
772+
}
773+
774+
// waitForDepositSpend waits for the deposit to be spent. It subscribes to
775+
// receive events for the deposit's sweep address notifying us once the transfer
776+
// has completed.
777+
func (m *Manager) waitForDepositSpend(ctx context.Context, d *Deposit) error {
778+
log.Infof("Waiting for deposit spend: %s, sweep_addr=%v, created_at=%v",
779+
d.ID, d.SweepAddr, d.CreatedAt)
780+
781+
resChan, errChan, err := m.tapClient.WaitForReceiveComplete(
782+
ctx, d.SweepAddr, d.CreatedAt,
783+
)
784+
785+
if err != nil {
786+
log.Errorf("unable to subscribe to receive events: %v", err)
787+
788+
return err
789+
}
790+
791+
go func() {
792+
select {
793+
case res := <-resChan:
794+
// At this point we can consider the deposit confirmed.
795+
err = m.handleDepositSpend(
796+
ctx, d, res.Outpoint.String(),
797+
)
798+
if err != nil {
799+
m.criticalError(err)
800+
}
801+
802+
case err := <-errChan:
803+
m.criticalError(err)
804+
}
805+
}()
806+
807+
return nil
808+
}
809+
810+
// handleDepositSpend is called when the deposit is spent. It updates the
811+
// deposit state and releases the inputs used for the deposit sweep.
812+
func (m *Manager) handleDepositSpend(ctx context.Context, d *Deposit,
813+
outpoint string) error {
814+
815+
done, err := m.scheduleNextCall()
816+
if err != nil {
817+
log.Errorf("Unable to schedule next call: %v", err)
818+
819+
return err
820+
}
821+
defer done()
822+
823+
switch d.State {
824+
case StateTimeoutSweepPublished:
825+
log.Infof("Deposit %s withdrawn in: %s", d.ID, outpoint)
826+
d.State = StateSwept
827+
828+
err := m.releaseDepositSweepInputs(ctx, d)
829+
if err != nil {
830+
log.Errorf("Unable to release deposit sweep inputs: "+
831+
"%v", err)
832+
833+
return err
834+
}
835+
836+
default:
837+
err := fmt.Errorf("Spent deposit %s in unexpected state %s",
838+
d.ID, d.State)
839+
840+
log.Errorf(err.Error())
841+
842+
return err
843+
}
844+
845+
// TODO(bhandras): should save the spend details to the store?
846+
err = m.handleDepositStateUpdate(ctx, d)
847+
if err != nil {
848+
log.Errorf("Unable to update deposit %v state: %v", d.ID, err)
849+
850+
return err
851+
}
852+
853+
// Sanity check that the deposit is in the pending sweeps map.
854+
if _, ok := m.pendingSweeps[d.ID]; !ok {
855+
log.Errorf("Deposit %v not found in pending deposits", d.ID)
856+
}
857+
858+
// We can now remove the deposit from the pending sweeps map as we don't
859+
// need to monitor for the spend anymore.
860+
delete(m.pendingSweeps, d.ID)
861+
862+
return nil
863+
}
864+
865+
// releaseDepositSweepInputs releases the inputs that were used for the deposit
866+
// sweep.
867+
func (m *Manager) releaseDepositSweepInputs(ctx context.Context,
868+
d *Deposit) error {
869+
870+
lockID, err := d.lockID()
871+
if err != nil {
872+
return err
873+
}
874+
875+
leases, err := m.walletKit.ListLeases(ctx)
876+
if err != nil {
877+
return err
878+
}
879+
880+
for _, lease := range leases {
881+
if lease.LockID != lockID {
882+
continue
883+
}
884+
885+
// Unlock any UTXOs that were used for the deposit sweep.
886+
err = m.walletKit.ReleaseOutput(ctx, lockID, lease.Outpoint)
887+
if err != nil {
888+
return err
889+
}
890+
}
891+
892+
return nil
893+
}

assets/deposit/sql_store.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type Querier interface {
2929

3030
GetAssetDeposits(ctx context.Context) ([]sqlc.GetAssetDepositsRow,
3131
error)
32+
33+
SetAssetDepositSweepAddr(ctx context.Context,
34+
arg sqlc.SetAssetDepositSweepAddrParams) error
3235
}
3336

3437
// DepositBaseDB is the interface that contains all the queries generated
@@ -128,6 +131,20 @@ func (s *SQLStore) UpdateDeposit(ctx context.Context, d *Deposit) error {
128131
return err
129132
}
130133

134+
case StateExpired:
135+
err := tx.SetAssetDepositSweepAddr(
136+
ctx, sqlc.SetAssetDepositSweepAddrParams{
137+
DepositID: d.ID,
138+
SweepAddr: sql.NullString{
139+
String: d.SweepAddr,
140+
Valid: true,
141+
},
142+
},
143+
)
144+
if err != nil {
145+
return err
146+
}
147+
131148
default:
132149
return fmt.Errorf("unimplemented deposit state "+
133150
"update: %v", d.State)

loopdb/sqlc/asset_deposits.sql.go

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

loopdb/sqlc/querier.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

loopdb/sqlc/queries/asset_deposits.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,8 @@ JOIN asset_deposit_updates u ON u.id = (
3939
LIMIT 1
4040
)
4141
ORDER BY d.created_at ASC;
42+
43+
-- name: SetAssetDepositSweepAddr :exec
44+
UPDATE asset_deposits
45+
SET sweep_addr = $2
46+
WHERE deposit_id = $1;

0 commit comments

Comments
 (0)