Skip to content

Commit f67f246

Browse files
authored
Merge pull request #6117 from filecoin-project/opt/elvindu/snapimport-resume
Auto-resume interrupted snapshot imports / 自动重链快照导入失败时
2 parents 7eb8d81 + 7c96899 commit f67f246

File tree

2 files changed

+127
-10
lines changed

2 files changed

+127
-10
lines changed

cmd/import.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ import (
55
"context"
66
"fmt"
77
"io"
8-
"net/http"
98
"os"
109
"strings"
1110

11+
"github.com/filecoin-project/venus/pkg/httpreader"
12+
1213
"github.com/DataDog/zstd"
1314
"github.com/filecoin-project/venus/pkg/chain"
1415
"github.com/filecoin-project/venus/pkg/repo"
@@ -30,18 +31,13 @@ func importChain(ctx context.Context, r repo.Repo, fname string) error {
3031
var rd io.Reader
3132
var l int64
3233
if strings.HasPrefix(fname, "http://") || strings.HasPrefix(fname, "https://") {
33-
resp, err := http.Get(fname) //nolint:gosec
34+
rrd, err := httpreader.NewResumableReader(ctx, fname)
3435
if err != nil {
35-
return err
36-
}
37-
defer resp.Body.Close() //nolint:errcheck
38-
39-
if resp.StatusCode != http.StatusOK {
40-
return fmt.Errorf("non-200 response: %d", resp.StatusCode)
36+
return fmt.Errorf("fetching chain CAR failed: setting up resumable reader: %w", err)
4137
}
4238

43-
rd = resp.Body
44-
l = resp.ContentLength
39+
rd = rrd
40+
l = rrd.ContentLength()
4541
} else {
4642
fname, err := homedir.Expand(fname)
4743
if err != nil {

pkg/httpreader/resumable.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package httpreader
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"strconv"
9+
10+
logging "github.com/ipfs/go-log/v2"
11+
"go.uber.org/multierr"
12+
)
13+
14+
var log = logging.Logger("httpreader")
15+
16+
type ResumableReader struct {
17+
ctx context.Context
18+
initialURL string
19+
finalURL *string
20+
position int64
21+
contentLength int64
22+
client *http.Client
23+
reader io.ReadCloser
24+
}
25+
26+
func NewResumableReader(ctx context.Context, url string) (*ResumableReader, error) {
27+
finalURL := ""
28+
29+
client := &http.Client{
30+
CheckRedirect: func(req *http.Request, via []*http.Request) error {
31+
finalURL = req.URL.String()
32+
if len(via) >= 10 {
33+
return fmt.Errorf("stopped after 10 redirects")
34+
}
35+
return nil
36+
},
37+
}
38+
39+
r := &ResumableReader{
40+
ctx: ctx,
41+
initialURL: url,
42+
finalURL: &finalURL,
43+
position: 0,
44+
client: client,
45+
}
46+
47+
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
resp, err := r.client.Do(req)
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
if resp.StatusCode != http.StatusOK {
58+
return nil, fmt.Errorf("failed to fetch resource, status code: %d", resp.StatusCode)
59+
}
60+
61+
contentLength, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
62+
if err != nil {
63+
if err = resp.Body.Close(); err != nil {
64+
err = multierr.Append(err, err)
65+
}
66+
return nil, err
67+
}
68+
69+
r.contentLength = contentLength
70+
r.reader = resp.Body
71+
72+
return r, nil
73+
}
74+
75+
func (r *ResumableReader) ContentLength() int64 {
76+
return r.contentLength
77+
}
78+
79+
func (r *ResumableReader) Read(p []byte) (n int, err error) {
80+
for {
81+
if r.reader == nil {
82+
reqURL := r.initialURL
83+
if *r.finalURL != "" {
84+
reqURL = *r.finalURL
85+
}
86+
87+
req, err := http.NewRequestWithContext(r.ctx, "GET", reqURL, nil)
88+
if err != nil {
89+
return 0, err
90+
}
91+
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", r.position))
92+
resp, err := r.client.Do(req)
93+
if err != nil {
94+
return 0, err
95+
}
96+
97+
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
98+
return 0, fmt.Errorf("non-resumable status code: %d", resp.StatusCode)
99+
}
100+
r.reader = resp.Body
101+
}
102+
103+
n, err = r.reader.Read(p)
104+
r.position += int64(n)
105+
106+
if err == io.EOF || err == io.ErrUnexpectedEOF {
107+
if r.position == r.contentLength {
108+
if err := r.reader.Close(); err != nil {
109+
log.Warnf("error closing reader: %+v", err)
110+
}
111+
return n, io.EOF
112+
}
113+
if err := r.reader.Close(); err != nil {
114+
log.Warnf("error closing reader: %+v", err)
115+
}
116+
r.reader = nil
117+
} else {
118+
return n, err
119+
}
120+
}
121+
}

0 commit comments

Comments
 (0)