Skip to content

Commit 85347fc

Browse files
authored
feat: transfer limiter web UI updates (#740)
1 parent badb603 commit 85347fc

File tree

11 files changed

+418
-50
lines changed

11 files changed

+418
-50
lines changed

gql/resolver.go

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (r *resolver) Deal(ctx context.Context, args struct{ ID graphql.ID }) (*dea
102102
return nil, err
103103
}
104104

105-
return newDealResolver(deal, r.dealsDB, r.logsDB, r.spApi), nil
105+
return newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, r.spApi), nil
106106
}
107107

108108
type dealsArgs struct {
@@ -135,7 +135,7 @@ func (r *resolver) Deals(ctx context.Context, args dealsArgs) (*dealListResolver
135135

136136
resolvers := make([]*dealResolver, 0, len(deals))
137137
for _, deal := range deals {
138-
resolvers = append(resolvers, newDealResolver(&deal, r.dealsDB, r.logsDB, r.spApi))
138+
resolvers = append(resolvers, newDealResolver(&deal, r.provider, r.dealsDB, r.logsDB, r.spApi))
139139
}
140140

141141
return &dealListResolver{
@@ -168,7 +168,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
168168
}
169169

170170
net := make(chan *dealResolver, 1)
171-
net <- newDealResolver(deal, r.dealsDB, r.logsDB, r.spApi)
171+
net <- newDealResolver(deal, r.provider, r.dealsDB, r.logsDB, r.spApi)
172172

173173
// Updates to deal state are broadcast on pubsub. Pipe these updates to the
174174
// client
@@ -180,7 +180,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
180180
}
181181
return nil, fmt.Errorf("%s: subscribing to deal updates: %w", args.ID, err)
182182
}
183-
sub := &subLastUpdate{sub: dealUpdatesSub, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi}
183+
sub := &subLastUpdate{sub: dealUpdatesSub, provider: r.provider, dealsDB: r.dealsDB, logsDB: r.logsDB, spApi: r.spApi}
184184
go func() {
185185
sub.Pipe(ctx, net) // blocks until connection is closed
186186
close(net)
@@ -219,7 +219,7 @@ func (r *resolver) DealNew(ctx context.Context) (<-chan *dealNewResolver, error)
219219
case evti := <-sub.Out():
220220
// Pipe the deal to the new deal channel
221221
di := evti.(types.ProviderDealState)
222-
rsv := newDealResolver(&di, r.dealsDB, r.logsDB, r.spApi)
222+
rsv := newDealResolver(&di, r.provider, r.dealsDB, r.logsDB, r.spApi)
223223
totalCount, err := r.dealsDB.Count(ctx, "")
224224
if err != nil {
225225
log.Errorf("getting total deal count: %w", err)
@@ -330,15 +330,17 @@ func (r *resolver) dealList(ctx context.Context, query string, cursor *graphql.I
330330

331331
type dealResolver struct {
332332
types.ProviderDealState
333+
provider *storagemarket.Provider
333334
transferred uint64
334335
dealsDB *db.DealsDB
335336
logsDB *db.LogsDB
336337
spApi sealingpipeline.API
337338
}
338339

339-
func newDealResolver(deal *types.ProviderDealState, dealsDB *db.DealsDB, logsDB *db.LogsDB, spApi sealingpipeline.API) *dealResolver {
340+
func newDealResolver(deal *types.ProviderDealState, provider *storagemarket.Provider, dealsDB *db.DealsDB, logsDB *db.LogsDB, spApi sealingpipeline.API) *dealResolver {
340341
return &dealResolver{
341342
ProviderDealState: *deal,
343+
provider: provider,
342344
transferred: uint64(deal.NBytesReceived),
343345
dealsDB: dealsDB,
344346
logsDB: logsDB,
@@ -502,13 +504,17 @@ func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints.
502504
if dr.IsOffline {
503505
return "Awaiting Offline Data Import"
504506
}
505-
switch dr.transferred {
506-
case 0:
507+
switch {
508+
case dr.transferred == 0 && !dr.provider.IsTransferStalled(dr.DealUuid):
507509
return "Transfer Queued"
508-
case 100:
510+
case dr.transferred == 100:
509511
return "Transfer Complete"
510512
default:
511513
pct := (100 * dr.transferred) / dr.ProviderDealState.Transfer.Size
514+
isStalled := dr.provider.IsTransferStalled(dr.DealUuid)
515+
if isStalled {
516+
return fmt.Sprintf("Transfer stalled at %d%% ", pct)
517+
}
512518
return fmt.Sprintf("Transferring %d%%", pct)
513519
}
514520
case dealcheckpoints.Transferred:
@@ -533,6 +539,22 @@ func (dr *dealResolver) message(ctx context.Context, checkpoint dealcheckpoints.
533539
return checkpoint.String()
534540
}
535541

542+
func (dr *dealResolver) TransferSamples() []*transferPoint {
543+
points := dr.provider.Transfer(dr.DealUuid)
544+
pts := make([]*transferPoint, 0, len(points))
545+
for _, pt := range points {
546+
pts = append(pts, &transferPoint{
547+
At: graphql.Time{Time: pt.At},
548+
Bytes: gqltypes.Uint64(pt.Bytes),
549+
})
550+
}
551+
return pts
552+
}
553+
554+
func (dr *dealResolver) IsTransferStalled() bool {
555+
return dr.provider.IsTransferStalled(dr.DealUuid)
556+
}
557+
536558
func (dr *dealResolver) sealingState(ctx context.Context) string {
537559
si, err := dr.spApi.SectorsStatus(ctx, dr.SectorID, false)
538560
if err != nil {
@@ -594,10 +616,11 @@ func toUuid(id graphql.ID) (uuid.UUID, error) {
594616
}
595617

596618
type subLastUpdate struct {
597-
sub event.Subscription
598-
dealsDB *db.DealsDB
599-
logsDB *db.LogsDB
600-
spApi sealingpipeline.API
619+
sub event.Subscription
620+
provider *storagemarket.Provider
621+
dealsDB *db.DealsDB
622+
logsDB *db.LogsDB
623+
spApi sealingpipeline.API
601624
}
602625

603626
func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) {
@@ -636,7 +659,7 @@ func (s *subLastUpdate) Pipe(ctx context.Context, net chan *dealResolver) {
636659
loop:
637660
for {
638661
di := lastUpdate.(types.ProviderDealState)
639-
rsv := newDealResolver(&di, s.dealsDB, s.logsDB, s.spApi)
662+
rsv := newDealResolver(&di, s.provider, s.dealsDB, s.logsDB, s.spApi)
640663

641664
select {
642665
case <-ctx.Done():

gql/resolver_transfers.go

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"time"
77

88
gqltypes "github.com/filecoin-project/boost/gql/types"
9+
"github.com/filecoin-project/boost/storagemarket"
10+
"github.com/google/uuid"
911
"github.com/graph-gophers/graphql-go"
1012
)
1113

@@ -15,15 +17,62 @@ type transferPoint struct {
1517
}
1618

1719
// query: transfers: [TransferPoint]
18-
func (r *resolver) Transfers(_ context.Context) ([]*transferPoint, error) {
19-
deals := r.provider.Transfers()
20+
func (r *resolver) Transfers(_ context.Context) []*transferPoint {
21+
return r.getTransferSamples(r.provider.Transfers(), nil)
22+
}
23+
24+
type transferStats struct {
25+
HttpMaxConcurrentDownloads int32
26+
Stats []*hostTransferStats
27+
}
28+
29+
type hostTransferStats struct {
30+
Host string
31+
Total int32
32+
Started int32
33+
Stalled int32
34+
TransferSamples []*transferPoint
35+
}
36+
37+
// query: transferStats: TransferStats
38+
func (r *resolver) TransferStats(_ context.Context) *transferStats {
39+
transfersByDeal := r.provider.Transfers()
40+
stats := r.provider.TransferStats()
41+
gqlStats := make([]*hostTransferStats, 0, len(stats))
42+
for _, s := range stats {
43+
gqlStats = append(gqlStats, &hostTransferStats{
44+
Host: s.Host,
45+
Total: int32(s.Total),
46+
Started: int32(s.Started),
47+
Stalled: int32(s.Stalled),
48+
TransferSamples: r.getTransferSamples(transfersByDeal, s.DealUuids),
49+
})
50+
}
51+
return &transferStats{
52+
HttpMaxConcurrentDownloads: int32(r.cfg.Dealmaking.HttpTransferMaxConcurrentDownloads),
53+
Stats: gqlStats,
54+
}
55+
}
56+
57+
func (r *resolver) getTransferSamples(deals map[uuid.UUID][]storagemarket.TransferPoint, filter []uuid.UUID) []*transferPoint {
58+
// If filter is nil, include all deals
59+
if filter == nil {
60+
for dealUuid := range deals {
61+
filter = append(filter, dealUuid)
62+
}
63+
}
2064

2165
// We have
2266
// dealUUID -> [At: <time>, Transferred: <bytes>, At: <time>, Transferred: <bytes>, ...]
2367
// Convert this to
2468
// <time> -> <transferred per second>
2569
totalAt := make(map[time.Time]uint64)
26-
for _, points := range deals {
70+
for _, dealUuid := range filter {
71+
points, ok := deals[dealUuid]
72+
if !ok {
73+
continue
74+
}
75+
2776
var prev uint64
2877
first := true
2978
for _, pt := range points {
@@ -53,5 +102,5 @@ func (r *resolver) Transfers(_ context.Context) ([]*transferPoint, error) {
53102
return pts[i].At.Before(pts[j].At.Time)
54103
})
55104

56-
return pts, nil
105+
return pts
57106
}

gql/schema.graphql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type Deal {
5959
PublishCid: String!
6060
IsOffline: Boolean!
6161
Transfer: TransferParams!
62+
TransferSamples: [TransferPoint]!
63+
IsTransferStalled: Boolean!
6264
Checkpoint: String!
6365
CheckpointAt: Time!
6466
Err: String!
@@ -287,6 +289,19 @@ type TransferPoint {
287289
Bytes: Uint64!
288290
}
289291

292+
type HostStats {
293+
Host: String!
294+
Total: Int!
295+
Started: Int!
296+
Stalled: Int!
297+
TransferSamples: [TransferPoint]!
298+
}
299+
300+
type TransferStats {
301+
HttpMaxConcurrentDownloads: Int!
302+
Stats: [HostStats]!
303+
}
304+
290305
type MpoolMessage {
291306
From: String!
292307
To: String!
@@ -377,6 +392,9 @@ type RootQuery {
377392
"""Get ongoing transfers"""
378393
transfers: [TransferPoint]!
379394

395+
"""Get stats about queued / active transfers"""
396+
transferStats: TransferStats!
397+
380398
"""Get local messages in the mpool"""
381399
mpool(local: Boolean!): [MpoolMessage]!
382400

react/src/DealTransfers.css

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
.transfer-stats {
2+
margin: 1em;
3+
}
4+
5+
.transfer-stats .config {
6+
margin: 2em 0 1em 0;
7+
}
8+
9+
.transfer-stats table th, .transfer-stats table td {
10+
padding: 0.5em 1em;
11+
}
12+
13+
.transfer-stats table th {
14+
text-align: left;
15+
}
16+
17+
.transfer-stats table td {
18+
text-align: right;
19+
}
20+
21+
.transfer-stats table td.transfer-rate {
22+
color: #999999;
23+
}

0 commit comments

Comments
 (0)