From 68055ef4c97fb4e4b9f4236ae6a8db81e462c7f3 Mon Sep 17 00:00:00 2001 From: Sai Teja Suram Date: Fri, 8 May 2020 16:09:15 -0700 Subject: [PATCH 01/14] Request for 2 elements when checking if directory exists or not (#75) After https://github.com/kahing/goofys/pull/496, goofys gets confused about directories in azure if the there is a dir/ blob. This is how the bug looks like - ls root/dir : Fails - ls root/; ls root/dir works This PR addresses this issue. --- internal/backend_azblob.go | 36 ++++++++++++++++++++---------------- internal/dir.go | 7 +++++-- internal/goofys_test.go | 36 +++++++++++++++++++++++++++++++++++- 3 files changed, 60 insertions(+), 19 deletions(-) diff --git a/internal/backend_azblob.go b/internal/backend_azblob.go index bf0a4d72..5c8df2b3 100644 --- a/internal/backend_azblob.go +++ b/internal/backend_azblob.go @@ -554,13 +554,16 @@ func (b *AZBlob) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) { } } - if len(blobItems) == 1 && len(blobItems[0].Name) <= len(options.Prefix) && strings.HasSuffix(options.Prefix, "/") { + if len(blobItems) == 1 && len(blobItems[0].Name) < len(options.Prefix) && strings.HasSuffix(options.Prefix, "/") { // There is only 1 result and that one result does not have the desired prefix. This can // happen if we ask for ListBlobs under /some/path/ and the result is List(/some/path). This // means the prefix we are listing is a blob => So return empty response to indicate that // this prefix should not be treated a directory by goofys. // NOTE: This undesired behaviour happens only on azblob when hierarchial namespaces are // enabled. + azbLog.Debugf( + "Only one non-prefix result found. Returning empty ListBlobsResult. options: %#v blobItems: %#v", + options, blobItems) return &ListBlobsOutput{}, nil } var sortItems bool @@ -776,25 +779,11 @@ func (b *AZBlob) GetBlob(param *GetBlobInput) (*GetBlobOutput, error) { }, nil } -func (b *AZBlob) PutBlob(param *PutBlobInput) (*PutBlobOutput, error) { +func (b *AZBlob) putBlobRaw(param *PutBlobInput) (*PutBlobOutput, error) { c, err := b.refreshToken() if err != nil { return nil, err } - - if param.DirBlob && strings.HasSuffix(param.Key, "/") { - // turn this into an empty blob with "hdi_isfolder" metadata - param.Key = param.Key[:len(param.Key)-1] - if param.Metadata != nil { - param.Metadata[AzureDirBlobMetadataKey] = PString("true") - } else { - param.Metadata = map[string]*string{ - AzureDirBlobMetadataKey: PString("true"), - } - } - return b.PutBlob(param) - } - body := param.Body if body == nil { body = bytes.NewReader([]byte("")) @@ -817,6 +806,21 @@ func (b *AZBlob) PutBlob(param *PutBlobInput) (*PutBlobOutput, error) { }, nil } +func (b *AZBlob) PutBlob(param *PutBlobInput) (*PutBlobOutput, error) { + if param.DirBlob && strings.HasSuffix(param.Key, "/") { + // turn this into an empty blob with "hdi_isfolder" metadata + param.Key = param.Key[:len(param.Key)-1] + if param.Metadata != nil { + param.Metadata[AzureDirBlobMetadataKey] = PString("true") + } else { + param.Metadata = map[string]*string{ + AzureDirBlobMetadataKey: PString("true"), + } + } + } + return b.putBlobRaw(param) +} + func (b *AZBlob) MultipartBlobBegin(param *MultipartBlobBeginInput) (*MultipartBlobCommitInput, error) { // we can have up to 50K parts, so %05d should be sufficient uploadId := uuid.New().String() + "::%05d" diff --git a/internal/dir.go b/internal/dir.go index 687674f0..9c96ca77 100644 --- a/internal/dir.go +++ b/internal/dir.go @@ -1308,8 +1308,11 @@ func (parent *Inode) LookUpInodeDir(name string, c chan ListBlobsOutput, errc ch resp, err := cloud.ListBlobs(&ListBlobsInput{ Delimiter: aws.String("/"), - MaxKeys: PUInt32(1), - Prefix: &key, + // Ideally one result should be sufficient. But when azure hierarchial + // namespaces are enabled, azblob returns "a" when we list blobs under "a/". + // In such cases we remove "a" from the result. So request for 2 blobs. + MaxKeys: PUInt32(2), + Prefix: &key, }) if err != nil { diff --git a/internal/goofys_test.go b/internal/goofys_test.go index 292ab557..17e939cf 100644 --- a/internal/goofys_test.go +++ b/internal/goofys_test.go @@ -741,6 +741,40 @@ func (s *GoofysTest) TestLookUpInode(t *C) { t.Assert(err, IsNil) } +func (s *GoofysTest) IsAzBlobWithHNS() bool { + azblob, isAzBlob := s.cloud.(*AZBlob) + if !isAzBlob { + return false + } + // If HNS, adding "nested/file" will create automatically + // a dir blob at "nested" + azblob.PutBlob(&PutBlobInput{Key: "nested/file"}) + res, err := azblob.HeadBlob(&HeadBlobInput{Key: "nested"}) + return err == nil && res.IsDirBlob +} + +func (s *GoofysTest) TestDirBlobExistsWithoutDirMetadata(t *C) { + azblob, isAzBlob := s.cloud.(*AZBlob) + if !isAzBlob || s.IsAzBlobWithHNS() { + // This test is not relevant for hns because azure does not allow + // presence of "dir/" when HNS is on. + t.Skip("This test is only for azblob:noHNS") + } + + // Microsoft uses "dir" blobs with metadata field hdi_isfolder to represent + // a directory. Test that we dont get confused when "dir/" exists as blob + for _, b := range []string{ + "dir_blob_no_md/", "dir_blob_no_md/file"} { + _, err := azblob.putBlobRaw(&PutBlobInput{Key: b}) + t.Assert(err, IsNil) + } + _, err := s.LookUpInode(t, "dir_blob_no_md") + t.Assert(err, IsNil) + + _, err = s.LookUpInode(t, "dir_blob_no_md/file") + t.Assert(err, IsNil) +} + func (s *GoofysTest) TestPanicWrapper(t *C) { debug.SetTraceback("single") @@ -1390,7 +1424,7 @@ func (s *GoofysTest) TestBackendListPrefix(t *C) { }) t.Assert(err, IsNil) t.Assert(len(res.Prefixes), Equals, 0) - t.Assert(len(res.Items), Equals, 1) + t.Assert(res.Items, HasLen, 1) t.Assert(*res.Items[0].Key, Equals, "file1") res, err = s.cloud.ListBlobs(&ListBlobsInput{ From e0dd66b40ae867bcff507d030a063f68bb5f42a1 Mon Sep 17 00:00:00 2001 From: Jason Kim Date: Tue, 28 Jul 2020 11:15:16 -0700 Subject: [PATCH 02/14] Check for nil before ADLv1 file status field derefs (#79) --- internal/backend_adlv1.go | 9 ++++-- internal/backend_adlv1_test.go | 54 ++++++++++++++++++++++++++++++++++ internal/backend_azblob.go | 10 +------ internal/utils.go | 2 +- 4 files changed, 63 insertions(+), 12 deletions(-) create mode 100644 internal/backend_adlv1_test.go diff --git a/internal/backend_adlv1.go b/internal/backend_adlv1.go index 6b695653..11a1f3ca 100644 --- a/internal/backend_adlv1.go +++ b/internal/backend_adlv1.go @@ -244,10 +244,15 @@ func adlv1LastModified(t int64) time.Time { } func adlv1FileStatus2BlobItem(f *adl.FileStatusProperties, key *string) BlobItemOutput { + var lastModified *time.Time + if f.ModificationTime != nil { + lastModified = PTime(adlv1LastModified(*f.ModificationTime)) + } + return BlobItemOutput{ Key: key, - LastModified: PTime(adlv1LastModified(*f.ModificationTime)), - Size: uint64(*f.Length), + LastModified: lastModified, + Size: uint64(NilInt64(f.Length)), } } diff --git a/internal/backend_adlv1_test.go b/internal/backend_adlv1_test.go new file mode 100644 index 00000000..97564faf --- /dev/null +++ b/internal/backend_adlv1_test.go @@ -0,0 +1,54 @@ +package internal + +import ( + adl "github.com/Azure/azure-sdk-for-go/services/datalake/store/2016-11-01/filesystem" + . "github.com/kahing/goofys/api/common" + . "gopkg.in/check.v1" +) + +type Adlv1Test struct { + adlv1 *ADLv1 +} + +var _ = Suite(&Adlv1Test{}) + +func (s *Adlv1Test) SetUpSuite(t *C) { + var err error + s.adlv1, err = NewADLv1("", &FlagStorage{}, &ADLv1Config{ + Endpoint: "test.endpoint", + }) + t.Assert(err, IsNil) +} + +func (s *Adlv1Test) TestAdlv1FileStatus2BlobItem(t *C) { + type Adlv1FileStatus2BlobItemTest struct { + description string + inputFileStatusProperties *adl.FileStatusProperties + expectedBlobItemOutput BlobItemOutput + } + + adlv1FileStatus2BlobItemTests := []Adlv1FileStatus2BlobItemTest{ + { + description: "Should handle nil ModificationTime and Length", + inputFileStatusProperties: &adl.FileStatusProperties{}, + expectedBlobItemOutput: BlobItemOutput{}, + }, + { + description: "Should handle non-nil ModificationTime and Length", + inputFileStatusProperties: &adl.FileStatusProperties{ + ModificationTime: PInt64(1000), + Length: PInt64(100), + }, + expectedBlobItemOutput: BlobItemOutput{ + LastModified: PTime(adlv1LastModified(1000)), + Size: uint64(100), + }, + }, + } + + for i, test := range adlv1FileStatus2BlobItemTests { + t.Logf("Test %d: \t%s", i, test.description) + blobItemOutput := adlv1FileStatus2BlobItem(test.inputFileStatusProperties, nil) + t.Check(blobItemOutput, DeepEquals, test.expectedBlobItemOutput) + } +} diff --git a/internal/backend_azblob.go b/internal/backend_azblob.go index 5c8df2b3..2df1d582 100644 --- a/internal/backend_azblob.go +++ b/internal/backend_azblob.go @@ -470,14 +470,6 @@ func (b *AZBlob) HeadBlob(param *HeadBlobInput) (*HeadBlobOutput, error) { }, nil } -func nilUint32(v *uint32) uint32 { - if v == nil { - return 0 - } else { - return *v - } -} - func (b *AZBlob) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) { // azure blob does not support startAfter if param.StartAfter != nil { @@ -497,7 +489,7 @@ func (b *AZBlob) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) { options := azblob.ListBlobsSegmentOptions{ Prefix: NilStr(param.Prefix), - MaxResults: int32(nilUint32(param.MaxKeys)), + MaxResults: int32(NilUint32(param.MaxKeys)), Details: azblob.BlobListingDetails{ // blobfuse (following wasb) convention uses // an empty blob with "hdi_isfolder" metadata diff --git a/internal/utils.go b/internal/utils.go index 51e07800..548240c8 100644 --- a/internal/utils.go +++ b/internal/utils.go @@ -245,4 +245,4 @@ func ConvertBytesToIEC(size int64) string { exp += 1 } return fmt.Sprintf("%.1f %ciB", curSize, "KMGTPEZY"[exp-1]) -} \ No newline at end of file +} From 11a5478036ec7c4f3bc79eafa1d21cf0be50a9bb Mon Sep 17 00:00:00 2001 From: Sai Teja Suram Date: Wed, 26 Aug 2020 10:17:27 -0700 Subject: [PATCH 03/14] Handle ListBlobs returning less items than requested with cont token (#82) Both in s3 and azure, it is possible that when we call listPrefix with limit=N, we might get a result whose size is smaller than N, but still has a continuation token. This behaviour does not hurt when we are listing the fuse files under a directory. But when doing dir checks i.e. 1) testing if the given path is a directory 2) if a given directory is empty, this can make goofys wrongly think a directory is empty or a given prefix is not a directory. Add a wrapper in list.go, that does this: If the backend returns less items than requested and has a continuation token, it will use the continuation token to fetch more items. --- internal/dir.go | 60 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/internal/dir.go b/internal/dir.go index 9c96ca77..aae16e6a 100644 --- a/internal/dir.go +++ b/internal/dir.go @@ -392,7 +392,7 @@ func (dh *DirHandle) listObjects(prefix string) (resp *ListBlobsOutput, err erro // is nothing left to list or the last listed entry has all characters > "/" // Relavant test case: TestReadDirDash func listBlobsSafe(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutput, error) { - res, err := cloud.ListBlobs(param) + res, err := listBlobsWrapper(cloud, param) if err != nil { return nil, err } @@ -406,7 +406,7 @@ func listBlobsSafe(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutpu // Get the continuation token from the result. ContinuationToken: res.NextContinuationToken, } - nextRes, err := cloud.ListBlobs(nextReq) + nextRes, err := listBlobsWrapper(cloud, nextReq) if err != nil { return nil, err } @@ -426,6 +426,56 @@ func listBlobsSafe(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutpu return res, nil } + +// Both in s3 and azure, it is possible that when we call listPrefix with limit=N, we might get a +// result whose size is smaller than N, but still has a continuation token. This behaviour does not +// hurt when we are listing the fuse files under a directory. But when doing dir checks i.e. +// 1) testing if the given path is a directory 2) if a given directory is empty, this can make +// goofys wrongly think a directory is empty or a given prefix is not a directory. +// +// If the backend returns less items than requested and has a continuation token, this will use the +// continuation token to fetch more items. +func listBlobsWrapper(cloud StorageBackend, param *ListBlobsInput) (*ListBlobsOutput, error) { + targetNumElements := NilUint32(param.MaxKeys) + ret, err := cloud.ListBlobs(param) + if targetNumElements == 0 { + // If MaxKeys is not specified (or is 0), we don't need any of the following special handling. + return ret, err + } else if err != nil { + return nil, err + } + + for { + curNumElements := uint32(len(ret.Prefixes) + len(ret.Items)) + if curNumElements >= targetNumElements { + break // We got all we want. Nothing else to do. + } else if ret.NextContinuationToken == nil { + break // We got all blobs under the prefix. + } + + internalResp, err := cloud.ListBlobs(&ListBlobsInput{ + Prefix: param.Prefix, + Delimiter: param.Delimiter, + MaxKeys: PUInt32(targetNumElements - curNumElements), + ContinuationToken: ret.NextContinuationToken, + // We will not set StartAfter for page requests. Only the first request might have it. + }) + if err != nil { + return nil, err + } + + ret = &ListBlobsOutput{ + Prefixes: append(ret.Prefixes, internalResp.Prefixes...), + Items: append(ret.Items, internalResp.Items...), + NextContinuationToken: internalResp.NextContinuationToken, + IsTruncated: internalResp.IsTruncated, + RequestId: internalResp.RequestId, + } + } + return ret, nil +} + + // LOCKS_REQUIRED(dh.mu) // LOCKS_EXCLUDED(dh.inode.mu) // LOCKS_EXCLUDED(dh.inode.fs) @@ -972,7 +1022,7 @@ func (parent *Inode) isEmptyDir(fs *Goofys, name string) (isDir bool, err error) cloud, key := parent.cloud() key = appendChildName(key, name) + "/" - resp, err := cloud.ListBlobs(&ListBlobsInput{ + resp, err := listBlobsWrapper(cloud, &ListBlobsInput{ Delimiter: aws.String("/"), MaxKeys: PUInt32(2), Prefix: &key, @@ -1306,9 +1356,9 @@ func (parent *Inode) LookUpInodeDir(name string, c chan ListBlobsOutput, errc ch cloud, key := parent.cloud() key = appendChildName(key, name) + "/" - resp, err := cloud.ListBlobs(&ListBlobsInput{ + resp, err := listBlobsWrapper(cloud, &ListBlobsInput{ Delimiter: aws.String("/"), - // Ideally one result should be sufficient. But when azure hierarchial + // Ideally one result should be sufficient. But when azure hierarchical // namespaces are enabled, azblob returns "a" when we list blobs under "a/". // In such cases we remove "a" from the result. So request for 2 blobs. MaxKeys: PUInt32(2), From 8611a83c41661b7a91ef59965f467e6a5cb800bf Mon Sep 17 00:00:00 2001 From: Mei Gui <71293871+xinmeigui-db@users.noreply.github.com> Date: Wed, 23 Sep 2020 11:05:23 -0700 Subject: [PATCH 04/14] Add loggings for nil adlv1 blob last modified and size fields (#85) * [impl] add loggings for nil adlv1 blob last modified and size fields * [impl] log full path of the blob * [fix] change log level to warn from debug * [fix] dereference directly when not nil --- .gitignore | 1 + internal/backend_adlv1.go | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 2ccd54ac..20d6a07b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ goofys goofys.test xout s3proxy.jar +.idea diff --git a/internal/backend_adlv1.go b/internal/backend_adlv1.go index 11a1f3ca..2ffdb3c3 100644 --- a/internal/backend_adlv1.go +++ b/internal/backend_adlv1.go @@ -244,15 +244,29 @@ func adlv1LastModified(t int64) time.Time { } func adlv1FileStatus2BlobItem(f *adl.FileStatusProperties, key *string) BlobItemOutput { + logNilFileStatusField := func(msg string) { + adls1Log.Warnf("%s - type: %v, permission: %v, path: %v", + msg, f.Type, NilStr(f.Permission), NilStr(key)) + } + var lastModified *time.Time if f.ModificationTime != nil { lastModified = PTime(adlv1LastModified(*f.ModificationTime)) + } else { + logNilFileStatusField("empty modification time") + } + + var size uint64 + if f.Length != nil { + size = uint64(*f.Length) + } else { + logNilFileStatusField("empty length") } return BlobItemOutput{ Key: key, LastModified: lastModified, - Size: uint64(NilInt64(f.Length)), + Size: size, } } From db0f7dc72fde6dca187e7aca046a79e80d447adf Mon Sep 17 00:00:00 2001 From: Mei Gui <71293871+xinmeigui-db@users.noreply.github.com> Date: Thu, 8 Oct 2020 17:19:57 -0700 Subject: [PATCH 05/14] Improve handling of ReadDir when key contains invalid XML characters (#86) --- internal/backend_s3.go | 41 +++++++++++++++++++++++++++++++++++++++++ internal/goofys_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/internal/backend_s3.go b/internal/backend_s3.go index 3ee2d489..cbc8b75f 100644 --- a/internal/backend_s3.go +++ b/internal/backend_s3.go @@ -36,6 +36,10 @@ import ( "github.com/jacobsa/fuse" ) +const ( + s3ListObjectsInputUrlEncodingType = "url" +) + type S3Backend struct { *s3.S3 cap Capabilities @@ -287,6 +291,7 @@ func (s *S3Backend) ListObjectsV2(params *s3.ListObjectsV2Input) (*s3.ListObject if err != nil { return nil, "", err } + return resp, s.getRequestId(req), nil } else { v1 := s3.ListObjectsInput{ @@ -328,6 +333,36 @@ func (s *S3Backend) ListObjectsV2(params *s3.ListObjectsV2Input) (*s3.ListObject } } +func urlDecodeListObjectsV2Output(output *s3.ListObjectsV2Output) error { + for _, commonPrefix := range output.CommonPrefixes { + if commonPrefix == nil || commonPrefix.Prefix == nil { + continue + } + + if decodedPrefix, err := url.QueryUnescape(*commonPrefix.Prefix); err == nil { + commonPrefix.Prefix = &decodedPrefix + } else { + s3Log.Errorf("err decoding list object common prefix %v: %v", *commonPrefix.Prefix, err) + return err + } + } + + for _, content := range output.Contents { + if content == nil || content.Key == nil { + continue + } + + if decodedKey, err := url.QueryUnescape(*content.Key); err == nil { + content.Key = &decodedKey + } else { + s3Log.Errorf("err decoding list object content %v: %v", *content.Key, err) + return err + } + } + + return nil +} + func metadataToLower(m map[string]*string) map[string]*string { if m != nil { var toDelete []string @@ -394,11 +429,17 @@ func (s *S3Backend) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) { MaxKeys: maxKeys, StartAfter: param.StartAfter, ContinuationToken: param.ContinuationToken, + // Use URL encoding so that keys with invalid xml characters can be listed + EncodingType: PString(s3ListObjectsInputUrlEncodingType), }) if err != nil { return nil, mapAwsError(err) } + if err := urlDecodeListObjectsV2Output(resp); err != nil { + return nil, err + } + prefixes := make([]BlobPrefixOutput, 0) items := make([]BlobItemOutput, 0) diff --git a/internal/goofys_test.go b/internal/goofys_test.go index 17e939cf..deee6bf6 100644 --- a/internal/goofys_test.go +++ b/internal/goofys_test.go @@ -3196,6 +3196,39 @@ func (s *GoofysTest) TestIssue326(t *C) { "file1", "file2", "folder#1#", "folder@name.something", "zero"}) } +// Test for this issue: https://github.com/kahing/goofys/issues/564 +func (s *GoofysTest) TestInvalidXMLIssue(t *C) { + dirName := "invalidXml" + // This is not an invalid xml but checking if url decoding was successful + dirNameWithValidXml := "folder1 !#$%&'<>*+,-.=?@[\\^~_\t" + // Testing directory name with invalid xml + dirNameWithInvalidXml := "invalid\b\v\a" + + _, err := s.cloud.PutBlob(&PutBlobInput{ + Key: fmt.Sprintf("%s/%s", dirName, dirNameWithValidXml), + Body: bytes.NewReader([]byte("")), + Size: PUInt64(0), + }) + t.Assert(err, IsNil) + + path := fmt.Sprintf("%s/%s", dirName, dirNameWithInvalidXml) + _, err = s.cloud.PutBlob(&PutBlobInput{ + Key: path, + Body: bytes.NewReader([]byte("")), + Size: PUInt64(0), + }) + t.Assert(err, IsNil) + // Delete the blob with invalid xml in its key as DeleteBlobs in tear down is not able to delete it + defer func() { + _, err := s.cloud.DeleteBlob(&DeleteBlobInput{Key: path}) + t.Assert(err, IsNil) + }() + + dir, err := s.LookUpInode(t, dirName) + t.Assert(err, IsNil) + s.assertEntries(t, dir, []string{dirNameWithValidXml, dirNameWithInvalidXml}) +} + func (s *GoofysTest) TestSlurpFileAndDir(t *C) { if _, ok := s.cloud.Delegate().(*S3Backend); !ok { t.Skip("only for S3") From d0386606503dd0fc16107b3a040e412813e13f78 Mon Sep 17 00:00:00 2001 From: Ka-Hing Cheung Date: Fri, 20 Nov 2020 10:23:07 -0800 Subject: [PATCH 06/14] if AWS_REGION is set don't do region detection --- internal/backend_s3.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/internal/backend_s3.go b/internal/backend_s3.go index cbc8b75f..a8653abc 100644 --- a/internal/backend_s3.go +++ b/internal/backend_s3.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "net/url" + "os" "strconv" "strings" "sync/atomic" @@ -56,6 +57,11 @@ type S3Backend struct { } func NewS3(bucket string, flags *FlagStorage, config *S3Config) (*S3Backend, error) { + if !config.RegionSet && os.Getenv("AWS_REGION") != "" { + config.RegionSet = true + config.Region = os.Getenv("AWS_REGION") + } + awsConfig, err := config.ToAwsConfig(flags) if err != nil { return nil, err @@ -430,7 +436,7 @@ func (s *S3Backend) ListBlobs(param *ListBlobsInput) (*ListBlobsOutput, error) { StartAfter: param.StartAfter, ContinuationToken: param.ContinuationToken, // Use URL encoding so that keys with invalid xml characters can be listed - EncodingType: PString(s3ListObjectsInputUrlEncodingType), + EncodingType: PString(s3ListObjectsInputUrlEncodingType), }) if err != nil { return nil, mapAwsError(err) From 7171a1c64d90f115accdccd1bcb5fffcd1be5beb Mon Sep 17 00:00:00 2001 From: Ka-Hing Cheung Date: Mon, 8 Mar 2021 11:20:20 -0800 Subject: [PATCH 07/14] add -o bg option to do init in background (#92) add a bg mount option that delays init --- api/api.go | 12 +- api/common/config.go | 5 + api/common/lazy_init.go | 302 ++++++++++++++++++++++++++++++++++++++++ internal/flags.go | 6 + internal/goofys.go | 92 +++++++----- internal/goofys_test.go | 39 +++++- 6 files changed, 420 insertions(+), 36 deletions(-) create mode 100644 api/common/lazy_init.go diff --git a/api/api.go b/api/api.go index f78cf082..fdbc2d0a 100644 --- a/api/api.go +++ b/api/api.go @@ -120,7 +120,17 @@ func Mount( err = fmt.Errorf("Mount: initialization failed") return } - server := fuseutil.NewFileSystemServer(FusePanicLogger{fs}) + var fusefs fuseutil.FileSystem + + if flags.BgInit { + fusefs = &LazyInitFileSystem{ + Fs: fs, + } + } else { + fusefs = fs + } + + server := fuseutil.NewFileSystemServer(FusePanicLogger{fusefs}) mfs, err = fuse.Mount(flags.MountPoint, server, mountCfg) if err != nil { diff --git a/api/common/config.go b/api/common/config.go index e4f23ad5..9d73e832 100644 --- a/api/common/config.go +++ b/api/common/config.go @@ -32,6 +32,11 @@ type FlagStorage struct { MountPointArg string MountPointCreated string + // analog of `-o bg` option in nfs, initialize the mount point + // right away but let the work of actually contacting remote + // happen in the background + BgInit bool + Cache []string DirMode os.FileMode FileMode os.FileMode diff --git a/api/common/lazy_init.go b/api/common/lazy_init.go new file mode 100644 index 00000000..97dcd0ab --- /dev/null +++ b/api/common/lazy_init.go @@ -0,0 +1,302 @@ +// Copyright 2021 Databricks +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "context" + "sync" + + "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/fuse/fuseutil" +) + +type Initer interface { + Init() error +} + +type InitFileSystem interface { + Initer + fuseutil.FileSystem +} + +type LazyInitFileSystem struct { + Fs InitFileSystem + init sync.Once + initError error +} + +func (fs *LazyInitFileSystem) Init() error { + fs.init.Do(func() { + fs.initError = fs.Fs.Init() + }) + + return fs.initError +} + +func (fs *LazyInitFileSystem) StatFS(ctx context.Context, op *fuseops.StatFSOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.StatFS(ctx, op) +} + +func (fs *LazyInitFileSystem) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.LookUpInode(ctx, op) +} + +func (fs *LazyInitFileSystem) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.GetInodeAttributes(ctx, op) +} + +func (fs *LazyInitFileSystem) SetInodeAttributes(ctx context.Context, op *fuseops.SetInodeAttributesOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.SetInodeAttributes(ctx, op) +} + +func (fs *LazyInitFileSystem) Fallocate(ctx context.Context, op *fuseops.FallocateOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.Fallocate(ctx, op) +} + +func (fs *LazyInitFileSystem) ForgetInode(ctx context.Context, op *fuseops.ForgetInodeOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.ForgetInode(ctx, op) +} + +func (fs *LazyInitFileSystem) MkDir(ctx context.Context, op *fuseops.MkDirOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.MkDir(ctx, op) +} + +func (fs *LazyInitFileSystem) MkNode(ctx context.Context, op *fuseops.MkNodeOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.MkNode(ctx, op) +} + +func (fs *LazyInitFileSystem) CreateFile(ctx context.Context, op *fuseops.CreateFileOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.CreateFile(ctx, op) +} + +func (fs *LazyInitFileSystem) CreateLink(ctx context.Context, op *fuseops.CreateLinkOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.CreateLink(ctx, op) +} + +func (fs *LazyInitFileSystem) CreateSymlink(ctx context.Context, op *fuseops.CreateSymlinkOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.CreateSymlink(ctx, op) +} + +func (fs *LazyInitFileSystem) Rename(ctx context.Context, op *fuseops.RenameOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.Rename(ctx, op) +} + +func (fs *LazyInitFileSystem) RmDir(ctx context.Context, op *fuseops.RmDirOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.RmDir(ctx, op) +} + +func (fs *LazyInitFileSystem) Unlink(ctx context.Context, op *fuseops.UnlinkOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.Unlink(ctx, op) +} + +func (fs *LazyInitFileSystem) OpenDir(ctx context.Context, op *fuseops.OpenDirOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.OpenDir(ctx, op) +} + +func (fs *LazyInitFileSystem) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.ReadDir(ctx, op) +} + +func (fs *LazyInitFileSystem) ReleaseDirHandle(ctx context.Context, op *fuseops.ReleaseDirHandleOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.ReleaseDirHandle(ctx, op) +} + +func (fs *LazyInitFileSystem) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.OpenFile(ctx, op) +} + +func (fs *LazyInitFileSystem) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.ReadFile(ctx, op) +} + +func (fs *LazyInitFileSystem) WriteFile(ctx context.Context, op *fuseops.WriteFileOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.WriteFile(ctx, op) +} + +func (fs *LazyInitFileSystem) SyncFile(ctx context.Context, op *fuseops.SyncFileOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.SyncFile(ctx, op) +} + +func (fs *LazyInitFileSystem) FlushFile(ctx context.Context, op *fuseops.FlushFileOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.FlushFile(ctx, op) +} + +func (fs *LazyInitFileSystem) ReleaseFileHandle(ctx context.Context, op *fuseops.ReleaseFileHandleOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.ReleaseFileHandle(ctx, op) +} + +func (fs *LazyInitFileSystem) ReadSymlink(ctx context.Context, op *fuseops.ReadSymlinkOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.ReadSymlink(ctx, op) +} + +func (fs *LazyInitFileSystem) RemoveXattr(ctx context.Context, op *fuseops.RemoveXattrOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.RemoveXattr(ctx, op) +} + +func (fs *LazyInitFileSystem) GetXattr(ctx context.Context, op *fuseops.GetXattrOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.GetXattr(ctx, op) +} + +func (fs *LazyInitFileSystem) ListXattr(ctx context.Context, op *fuseops.ListXattrOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.ListXattr(ctx, op) +} + +func (fs *LazyInitFileSystem) SetXattr(ctx context.Context, op *fuseops.SetXattrOp) (err error) { + err = fs.Init() + if err != nil { + return + } + + return fs.Fs.SetXattr(ctx, op) +} + +func (fs *LazyInitFileSystem) Destroy() { + fs.Fs.Destroy() +} diff --git a/internal/flags.go b/internal/flags.go index f18633a8..b0a00f93 100644 --- a/internal/flags.go +++ b/internal/flags.go @@ -376,6 +376,12 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) { parseOptions(flags.MountOptions, o) } + if _, ok := flags.MountOptions["bg"]; ok { + flags.BgInit = true + // remove it from mount option so it doesn't go to the kernel + delete(flags.MountOptions, "bg") + } + flags.MountPointArg = c.Args()[1] flags.MountPoint = flags.MountPointArg var err error diff --git a/internal/goofys.go b/internal/goofys.go index b3321d4a..63a60a88 100644 --- a/internal/goofys.go +++ b/internal/goofys.go @@ -51,6 +51,8 @@ type Goofys struct { fuseutil.NotImplementedFileSystem bucket string + newRootBackend func(string, *FlagStorage) (StorageBackend, error) + flags *FlagStorage umask uint32 @@ -172,43 +174,16 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage, newBackend func(string, *FlagStorage) (StorageBackend, error)) *Goofys { // Set up the basic struct. fs := &Goofys{ - bucket: bucket, - flags: flags, - umask: 0122, - } - - var prefix string - colon := strings.Index(bucket, ":") - if colon != -1 { - prefix = bucket[colon+1:] - prefix = strings.Trim(prefix, "/") - if prefix != "" { - prefix += "/" - } - - fs.bucket = bucket[0:colon] - bucket = fs.bucket + newRootBackend: newBackend, + bucket: bucket, + flags: flags, + umask: 0122, } if flags.DebugS3 { s3Log.Level = logrus.DebugLevel } - cloud, err := newBackend(bucket, flags) - if err != nil { - log.Errorf("Unable to setup backend: %v", err) - return nil - } - _, fs.gcsS3 = cloud.Delegate().(*GCS3) - - randomObjectName := prefix + (RandStringBytesMaskImprSrc(32)) - err = cloud.Init(randomObjectName) - if err != nil { - log.Errorf("Unable to access '%v': %v", bucket, err) - return nil - } - go cloud.MultipartExpire(&MultipartExpireInput{}) - now := time.Now() fs.rootAttrs = InodeAttributes{ Size: 4096, @@ -222,8 +197,6 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage, root := NewInode(fs, nil, PString("")) root.Id = fuseops.RootInodeID root.ToDir() - root.dir.cloud = cloud - root.dir.mountPrefix = prefix root.Attributes.Mtime = fs.rootAttrs.Mtime fs.inodes[fuseops.RootInodeID] = root @@ -237,9 +210,62 @@ func newGoofys(ctx context.Context, bucket string, flags *FlagStorage, fs.replicators = Ticket{Total: 16}.Init() fs.restorers = Ticket{Total: 20}.Init() + if !flags.BgInit { + err := fs.Init() + if err != nil { + log.Errorf("Unable to setup backend: %v", err) + return nil + } + } else { + log.Infof("-o bg, delaying initialization") + } + return fs } +// performs all the non-local initializations (which can fail or take unbound amount of time) +func (fs *Goofys) Init() error { + log.Infof("bucket %v", fs.bucket) + + var prefix string + colon := strings.Index(fs.bucket, ":") + if colon != -1 { + prefix = fs.bucket[colon+1:] + prefix = strings.Trim(prefix, "/") + if prefix != "" { + prefix += "/" + } + + fs.bucket = fs.bucket[0:colon] + } + + cloud, err := fs.newRootBackend(fs.bucket, fs.flags) + if err != nil { + log.Errorf("Unable to setup backend: %v", err) + return err + } + _, fs.gcsS3 = cloud.Delegate().(*GCS3) + + root := fs.inodes[fuseops.RootInodeID] + root.dir.mountPrefix = prefix + log.Infof("prefix %v", prefix) + + randomObjectName := root.dir.mountPrefix + (RandStringBytesMaskImprSrc(32)) + log.Infof("testing object %v", randomObjectName) + + err = cloud.Init(randomObjectName) + if err != nil { + log.Errorf("Unable to access '%v': %v", fs.bucket, err) + return err + } + + go cloud.MultipartExpire(&MultipartExpireInput{}) + + root.dir.cloud = cloud + + return err +} + // from https://stackoverflow.com/questions/22892120/how-to-generate-a-random-string-of-a-fixed-length-in-golang func RandStringBytesMaskImprSrc(n int) string { const letterBytes = "abcdefghijklmnopqrstuvwxyz0123456789" diff --git a/internal/goofys_test.go b/internal/goofys_test.go index deee6bf6..65dd65b8 100644 --- a/internal/goofys_test.go +++ b/internal/goofys_test.go @@ -3205,7 +3205,7 @@ func (s *GoofysTest) TestInvalidXMLIssue(t *C) { dirNameWithInvalidXml := "invalid\b\v\a" _, err := s.cloud.PutBlob(&PutBlobInput{ - Key: fmt.Sprintf("%s/%s", dirName, dirNameWithValidXml), + Key: fmt.Sprintf("%s/%s", dirName, dirNameWithValidXml), Body: bytes.NewReader([]byte("")), Size: PUInt64(0), }) @@ -3213,7 +3213,7 @@ func (s *GoofysTest) TestInvalidXMLIssue(t *C) { path := fmt.Sprintf("%s/%s", dirName, dirNameWithInvalidXml) _, err = s.cloud.PutBlob(&PutBlobInput{ - Key: path, + Key: path, Body: bytes.NewReader([]byte("")), Size: PUInt64(0), }) @@ -4317,3 +4317,38 @@ func (s *GoofysTest) TestReadMyOwnNewFileFuse(t *C) { //t.Assert(err, IsNil) //t.Assert(string(buf), Equals, "filex") } + +func (s *GoofysTest) TestBgSlowMount(t *C) { + start := time.Now() + s.fs.flags.BgInit = true + + s.fs = newGoofys(context.Background(), s.fs.bucket, s.fs.flags, + func(bucket string, flags *FlagStorage) (StorageBackend, error) { + cloud, err := NewBackend(bucket, flags) + if err != nil { + return nil, err + } + + time.Sleep(2 * time.Second) + + return cloud, nil + }) + now := time.Now() + // check that this takes less than 1 second even though we are + // sleeping for 2 + t.Assert(now.Before(start.Add(1*time.Second)), Equals, true, + Commentf("start %v now %v", start, now)) + + // lookup still works via the lazy init wrapper + fs := &LazyInitFileSystem{ + Fs: s.fs, + } + + lookup := fuseops.LookUpInodeOp{ + Parent: fuseops.RootInodeID, + Name: "file1", + } + + err := fs.LookUpInode(nil, &lookup) + t.Assert(err, IsNil) +} From fda88aa3bafe34d8539c7076efb1552bc2b55e1d Mon Sep 17 00:00:00 2001 From: Mei Gui <71293871+xinmeigui-db@users.noreply.github.com> Date: Tue, 27 Apr 2021 11:33:50 -0700 Subject: [PATCH 08/14] Stop renewing lease after multipartBlobCommit (#97) Currently, after committing a blob for azure, we continue renewing lease. This PR fixes this behavior. * add logs * more logs * fix break * clean up logs * just return --- internal/backend_adlv2.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/backend_adlv2.go b/internal/backend_adlv2.go index baa6d3e3..b8da3b3c 100644 --- a/internal/backend_adlv2.go +++ b/internal/backend_adlv2.go @@ -768,7 +768,7 @@ func (b *ADLv2) MultipartBlobBegin(param *MultipartBlobBeginInput) (*MultipartBl for { select { case <-commitData.RenewLeaseStop: - break + return case <-time.After(30 * time.Second): b.lease(adl2.Renew, param.Key, leaseId, 60, "") } From 7828a3df6ee202a60197c650d8f2a881865ae4c9 Mon Sep 17 00:00:00 2001 From: Ka-Hing Cheung Date: Wed, 5 May 2021 16:57:18 -0700 Subject: [PATCH 09/14] add some azure logging --- api/common/conf_azure.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/api/common/conf_azure.go b/api/common/conf_azure.go index 2892af1a..a80744a6 100644 --- a/api/common/conf_azure.go +++ b/api/common/conf_azure.go @@ -260,6 +260,31 @@ func azureAccountsClient(account string) (azblob.AccountsClient, error) { } c.BaseClient.Authorizer = authorizer + c.BaseClient.RequestInspector = func(p autorest.Preparer) autorest.Preparer { + return autorest.PreparerFunc(func(r *http.Request) (*http.Request, error) { + azbLog.Debugf("%v %v", r.Method, r.URL.String()) + r, err := p.Prepare(r) + if err != nil { + azbLog.Error(err) + } + return r, err + }) + } + c.BaseClient.ResponseInspector = func(p autorest.Responder) autorest.Responder { + return autorest.ResponderFunc(func(r *http.Response) error { + // seems like this is called twice per + // response and causes one extra debug log, + // but that's better than not logging it + azbLog.Debugf("%v %v %v", + r.Request.Method, r.Request.URL.String(), r.Status) + err := p.Respond(r) + if err != nil { + azbLog.Error(err) + } + return err + }) + } + return c, nil } From cc0864bb6f4ae2046b6f7db6f6ec987ae3e604e3 Mon Sep 17 00:00:00 2001 From: Ka-Hing Cheung Date: Mon, 17 May 2021 09:42:36 -0700 Subject: [PATCH 10/14] use host style S3 requests to do region detection for aws private link host style access resolves to the correct private link IP (after some delay), whereas s3.amazonaws.com may not be accessible exception is when bucket isn't a valid DNS name, or it contains `.' because the latter will fail SSL verification Example: - An endpoint typically looks like this `s3.us-east-2.amazonaws.com`. - We were doing `HEAD https://s3.us-east-2.amazonaws.com` - Instead of (one of the following) - `HEAD https://mybucket.s3.us-east-2.amazonaws.com` - `HEAD https://s3.us-east-2.amazonaws.com/mybucket` see: 1. https://docs.aws.amazon.com/general/latest/gr/s3.html 2. https://docs.databricks.com/data/data-sources/aws/amazon-s3.html#configuration --- internal/backend_s3.go | 50 +++++++++++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/internal/backend_s3.go b/internal/backend_s3.go index a8653abc..4f589e4e 100644 --- a/internal/backend_s3.go +++ b/internal/backend_s3.go @@ -21,6 +21,7 @@ import ( "net/http" "net/url" "os" + "regexp" "strconv" "strings" "sync/atomic" @@ -136,21 +137,54 @@ func (s *S3Backend) newS3() { s.S3.Handlers.Sign.PushBack(addAcceptEncoding) } -func (s *S3Backend) detectBucketLocationByHEAD() (err error, isAws bool) { - u := url.URL{ - Scheme: "https", - Host: "s3.amazonaws.com", - Path: s.bucket, +// s3 allowed more flexible bucket names at some point and now +// requires more restrictive names that are always dns safe. This +// function doesn't validate all of the bucket naming rules in +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html +// but only those that impact whether we can use bucket name in DNS +func isBucketNameDNSSafe(bucket string) bool { + // We dont allow dot: The linked aws documentation says that. + // > For best compatibility, we recommend that you avoid using dots (.) in bucket names, + // > except for buckets that are used only for static website hosting. If you include dots + // > in a bucket's name, you can't use virtual-host-style addressing over HTTPS, unless + // > you perform your own certificate validation. This is because the security certificates + // > used for virtual hosting of buckets don't work for buckets with dots in their names. + matched, err := regexp.MatchString("^[a-zA-Z0-9\\-]{3,63}$", bucket) + if err != nil { + s3Log.Errorf("cannot match %v: %v", bucket, err) + return false } + return matched +} +func (s *S3Backend) detectBucketLocationByHEAD() (err error, isAws bool) { + var u url.URL if s.awsConfig.Endpoint != nil { + // If the endpoint is set, just use it as host. endpoint, err := url.Parse(*s.awsConfig.Endpoint) if err != nil { return err, false } - - u.Scheme = endpoint.Scheme - u.Host = endpoint.Host + u = url.URL { + Scheme: endpoint.Scheme, + Host: endpoint.Host, + Path: s.bucket, + } + } else if isBucketNameDNSSafe(s.bucket) { + // prefer using host-based bucket URL to detect region because + // in AWS PrivateLink, host-based bucket resolves to a usable + // IP whereas the address behind s3.amazonaws.com may be + // blocked + u = url.URL{ + Scheme: "https", + Host: s.bucket + ".s3.amazonaws.com", + } + } else { + u = url.URL{ + Scheme: "https", + Host: "s3.amazonaws.com", + Path: s.bucket, + } } var req *http.Request From 2dd33a2a23922dec7104864cbcf93b10910da9c1 Mon Sep 17 00:00:00 2001 From: Ka-Hing Cheung Date: Tue, 29 Jun 2021 17:25:53 -0700 Subject: [PATCH 11/14] Fix azure datalake v2 retry (#103) the previous retry code that attempted to re-use the body buffer never worked and instead removed the body on retry. This fixes that and added a test. also improved adlv2 logging when debug is on, and always dump the response if the request failed --- api/common/conf_azure.go | 13 ++- api/common/conf_s3.go | 5 ++ internal/backend_adlv1.go | 1 + internal/backend_adlv2.go | 173 +++++++++++++++++++++++++++---------- internal/backend_azblob.go | 4 + internal/goofys.go | 2 + internal/goofys_test.go | 75 +++++++++++++++- 7 files changed, 217 insertions(+), 56 deletions(-) diff --git a/api/common/conf_azure.go b/api/common/conf_azure.go index a80744a6..1c8468da 100644 --- a/api/common/conf_azure.go +++ b/api/common/conf_azure.go @@ -47,6 +47,9 @@ type AZBlobConfig struct { Container string Prefix string + + RetryDuration time.Duration + MaxRetries int32 } func (config *AZBlobConfig) Init() { @@ -83,16 +86,18 @@ func (config *AZBlobConfig) WithAuthorization() autorest.PrepareDecorator { } type ADLv1Config struct { - Endpoint string - Authorizer autorest.Authorizer + Endpoint string + Authorizer autorest.Authorizer + RetryDuration time.Duration } func (config *ADLv1Config) Init() { } type ADLv2Config struct { - Endpoint string - Authorizer autorest.Authorizer + Endpoint string + Authorizer autorest.Authorizer + RetryDuration time.Duration } type AzureAuthorizerConfig struct { diff --git a/api/common/conf_s3.go b/api/common/conf_s3.go index 2401119f..c6bff97f 100644 --- a/api/common/conf_s3.go +++ b/api/common/conf_s3.go @@ -55,6 +55,7 @@ type S3Config struct { Session *session.Session BucketOwner string + MaxRetries int } var s3Session *session.Session @@ -77,6 +78,10 @@ func (c *S3Config) ToAwsConfig(flags *FlagStorage) (*aws.Config, error) { Transport: &defaultHTTPTransport, Timeout: flags.HTTPTimeout, }) + if c.MaxRetries != 0 { + awsConfig.MaxRetries = &c.MaxRetries + } + if flags.DebugS3 { awsConfig.LogLevel = aws.LogLevel(aws.LogDebug | aws.LogDebugWithRequestErrors) } diff --git a/internal/backend_adlv1.go b/internal/backend_adlv1.go index 2ffdb3c3..5109e670 100644 --- a/internal/backend_adlv1.go +++ b/internal/backend_adlv1.go @@ -137,6 +137,7 @@ func NewADLv1(bucket string, flags *FlagStorage, config *ADLv1Config) (*ADLv1, e } adlClient := adl.NewClient() + adlClient.BaseClient.Client.RetryDuration = config.RetryDuration adlClient.BaseClient.Client.Authorizer = config.Authorizer adlClient.BaseClient.Client.RequestInspector = LogRequest adlClient.BaseClient.Client.ResponseInspector = LogResponse diff --git a/internal/backend_adlv2.go b/internal/backend_adlv2.go index b8da3b3c..3e4135c0 100644 --- a/internal/backend_adlv2.go +++ b/internal/backend_adlv2.go @@ -18,11 +18,13 @@ package internal import ( . "github.com/kahing/goofys/api/common" + "bytes" "context" "encoding/base64" "encoding/json" "fmt" "io" + "io/ioutil" "net/http" "net/url" "strconv" @@ -53,6 +55,11 @@ type ADLv2 struct { const ADL2_CLIENT_REQUEST_ID = "X-Ms-Client-Request-Id" const ADL2_REQUEST_ID = "X-Ms-Request-Id" +// intentionally invalid header name to track if a response has been +// logged. It's an invalid header to make sure we don't accidentally +// send this out in a request +const ADL2_RESP_LOGGED = "X Databricks Logged" + var adl2Log = GetLogger("adlv2") type ADLv2MultipartBlobCommitInput struct { @@ -65,22 +72,72 @@ func IsADLv2Endpoint(endpoint string) bool { return strings.HasPrefix(endpoint, "abfs://") } +func adl2LogReq(level logrus.Level, r *http.Request, reqType string) { + requestId := r.Header.Get(ADL2_CLIENT_REQUEST_ID) + + dump := strings.Builder{} + dump.WriteString(fmt.Sprintf("Request dump for %v %v: ", requestId, reqType)) + // prevent dumping of body. Setting it to nil will result in + // an error dumping the request. + b := r.Body + r.Body = ioutil.NopCloser(bytes.NewReader([]byte(""))) + // hide sensitive header + auth := r.Header.Get("Authorization") + if auth != "" { + r.Header.Set("Authorization", "") + } + defer func() { + r.Body = b + if auth != "" { + r.Header.Set("Authorization", auth) + } + }() + + // this would always fail with: + // Failed to log request: http: ContentLength=XXX with Body length 0 + // because of how we replaced the body + _ = r.Write(&dump) + + adl2Log.Log(level, dump.String()) +} + +func adl2LogRespOnce(level logrus.Level, r *http.Response, err string) { + logged := r.Header.Get(ADL2_RESP_LOGGED) + // don't log anything if this is being called twice, + // which could happen via ResponseInspector + if logged == "" { + r.Header.Add(ADL2_RESP_LOGGED, "true") + // if debug is enabled, we would have logged the request already + if !adl2Log.IsLevelEnabled(logrus.DebugLevel) { + adl2LogReq(level, r.Request, err) + } + requestId := r.Request.Header.Get(ADL2_CLIENT_REQUEST_ID) + + dump := strings.Builder{} + dump.WriteString(fmt.Sprintf("Response dump for %v: ", requestId)) + b := r.Body + r.Body = nil + // this would always fail with: + // Failed to log request: http: ContentLength=XXX with Body length 0 + // because of how we replaced the body + _ = r.Write(&dump) + r.Body = b + + adl2Log.Log(level, dump.String()) + r.Header.Del(ADL2_REQUEST_ID) + } +} + func adl2LogResp(level logrus.Level, r *http.Response) { if r == nil { return } - if adl2Log.IsLevelEnabled(level) { - requestId := r.Request.Header.Get(ADL2_CLIENT_REQUEST_ID) - respId := r.Header.Get(ADL2_REQUEST_ID) - // don't log anything if this is being called twice, - // which it is via ResponseInspector - if respId != "" { - adl2Log.Logf(level, "%v %v %v %v %v", r.Request.Method, - r.Request.URL.String(), - requestId, r.Status, respId) - r.Header.Del(ADL2_REQUEST_ID) - } + // always log if it's not an expected error + if r.StatusCode >= 400 && r.StatusCode != 404 && r.StatusCode != 403 { + adl2LogRespOnce(logrus.ErrorLevel, r, "error") + } else if adl2Log.IsLevelEnabled(level) { + adl2LogRespOnce(level, r, "") } } @@ -119,11 +176,11 @@ func NewADLv2(bucket string, flags *FlagStorage, config *ADLv2Config) (*ADLv2, e if r.Header.Get("Content-Length") == "0" { r.Body = http.NoBody } else if r.Body == nil { + r.Header.Set("Content-Length", "0") r.Body = http.NoBody } if adl2Log.IsLevelEnabled(logrus.DebugLevel) { - requestId := r.Header.Get(ADL2_CLIENT_REQUEST_ID) op := r.Method switch op { case http.MethodPost: @@ -140,13 +197,12 @@ func NewADLv2(bucket string, flags *FlagStorage, config *ADLv2Config) (*ADLv2, e op += fmt.Sprintf("(%v)", r.ContentLength) } } - adl2Log.Debugf("%v %v %v", op, - r.URL.String(), requestId) + adl2LogReq(logrus.DebugLevel, r, op) } r, err := p.Prepare(r) if err != nil { - adl2Log.Error(err) + adl2Log.Errorf("Prepare %v", err) } return r, err }) @@ -157,7 +213,7 @@ func NewADLv2(bucket string, flags *FlagStorage, config *ADLv2Config) (*ADLv2, e adl2LogResp(logrus.DebugLevel, r) err := p.Respond(r) if err != nil { - adl2Log.Error(err) + adl2Log.Errorf("Respond %v", err) } return err }) @@ -168,6 +224,7 @@ func NewADLv2(bucket string, flags *FlagStorage, config *ADLv2Config) (*ADLv2, e client.RequestInspector = LogRequest client.ResponseInspector = LogResponse client.Sender.(*http.Client).Transport = GetHTTPTransport() + client.Client.RetryDuration = config.RetryDuration b := &ADLv2{ flags: flags, @@ -248,7 +305,6 @@ func adlv2ErrLogHeaders(errCode string, resp *http.Response) { } func mapADLv2Error(resp *http.Response, err error, rawError bool) error { - if resp == nil { if err != nil { if detailedError, ok := err.(autorest.DetailedError); ok { @@ -279,9 +335,6 @@ func mapADLv2Error(resp *http.Response, err error, rawError bool) error { } else { switch resp.StatusCode { case http.StatusBadRequest: - if !adl2Log.IsLevelEnabled(logrus.DebugLevel) { - adl2LogResp(logrus.ErrorLevel, resp) - } adlErr, err := decodeADLv2Error(resp.Body) if err == nil { adlv2ErrLogHeaders(*adlErr.Error.Code, resp) @@ -1057,7 +1110,7 @@ func (client adl2PathClient) CreatePreparer(ctx context.Context, filesystem stri preparer = autorest.DecoratePreparer(preparer, autorest.WithHeader("x-ms-version", autorest.String(client.XMsVersion))) } - return preparer.Prepare((client.defaultRequest()).WithContext(ctx)) + return preparer.Prepare((&http.Request{}).WithContext(ctx)) } // CreateSender sends the Create request. The method will close the @@ -1161,7 +1214,7 @@ func (client adl2PathClient) DeletePreparer(ctx context.Context, filesystem stri preparer = autorest.DecoratePreparer(preparer, autorest.WithHeader("x-ms-version", autorest.String(client.XMsVersion))) } - return preparer.Prepare((client.defaultRequest()).WithContext(ctx)) + return preparer.Prepare((&http.Request{}).WithContext(ctx)) } // DeleteSender sends the Delete request. The method will close the @@ -1265,7 +1318,7 @@ func (client adl2PathClient) GetPropertiesPreparer(ctx context.Context, filesyst preparer = autorest.DecoratePreparer(preparer, autorest.WithHeader("x-ms-version", autorest.String(client.XMsVersion))) } - return preparer.Prepare((client.defaultRequest()).WithContext(ctx)) + return preparer.Prepare((&http.Request{}).WithContext(ctx)) } // GetPropertiesSender sends the GetProperties request. The method will close the @@ -1376,7 +1429,7 @@ func (client adl2PathClient) LeasePreparer(ctx context.Context, xMsLeaseAction a preparer = autorest.DecoratePreparer(preparer, autorest.WithHeader("x-ms-version", autorest.String(client.XMsVersion))) } - return preparer.Prepare((client.defaultRequest()).WithContext(ctx)) + return preparer.Prepare((&http.Request{}).WithContext(ctx)) } // LeaseSender sends the Lease request. The method will close the @@ -1468,7 +1521,7 @@ func (client adl2PathClient) ListPreparer(ctx context.Context, recursive bool, f preparer = autorest.DecoratePreparer(preparer, autorest.WithHeader("x-ms-version", autorest.String(client.XMsVersion))) } - return preparer.Prepare((client.defaultRequest()).WithContext(ctx)) + return preparer.Prepare((&http.Request{}).WithContext(ctx)) } // ListSender sends the List request. The method will close the @@ -1575,7 +1628,7 @@ func (client adl2PathClient) ReadPreparer(ctx context.Context, filesystem string preparer = autorest.DecoratePreparer(preparer, autorest.WithHeader("x-ms-version", autorest.String(client.XMsVersion))) } - return preparer.Prepare((client.defaultRequest()).WithContext(ctx)) + return preparer.Prepare((&http.Request{}).WithContext(ctx)) } // ReadSender sends the Read request. The method will close the @@ -1619,6 +1672,39 @@ func (client adl2PathClient) Update(ctx context.Context, action adl2.PathUpdateA return } +func withReadSeeker(seeker io.ReadSeeker, contentLength *int64) autorest.PrepareDecorator { + return func(p autorest.Preparer) autorest.Preparer { + return autorest.PreparerFunc(func(r *http.Request) (*http.Request, error) { + r, err := p.Prepare(r) + if err != nil { + return r, err + } + + r.GetBody = func() (io.ReadCloser, error) { + _, err := seeker.Seek(0, io.SeekStart) + // we don't need to close this because + // goofys closes it for us, letting + // azure close this would cause + // problem on retry because closing it + // frees underlining buffer + return ioutil.NopCloser(seeker), err + } + if contentLength != nil { + r.ContentLength = *contentLength + } else { + // seek to the end to get the length + r.ContentLength, err = seeker.Seek(0, io.SeekEnd) + if err != nil { + return nil, err + } + } + // initialize body, this resets the buf to beginnning + r.Body, err = r.GetBody() + return r, err + }) + } +} + // UpdatePreparer prepares the Update request. func (client adl2PathClient) UpdatePreparer(ctx context.Context, action adl2.PathUpdateAction, filesystem string, pathParameter string, position *int64, retainUncommittedData *bool, closeParameter *bool, contentLength *int64, contentMD5 string, xMsLeaseID string, xMsCacheControl string, xMsContentType string, xMsContentDisposition string, xMsContentEncoding string, xMsContentLanguage string, xMsContentMd5 string, xMsProperties string, xMsOwner string, xMsGroup string, xMsPermissions string, xMsACL string, ifMatch string, ifNoneMatch string, ifModifiedSince string, ifUnmodifiedSince string, requestBody io.ReadCloser, xMsClientRequestID string, timeout *int32, xMsDate string) (*http.Request, error) { urlParameters := map[string]interface{}{ @@ -1657,8 +1743,18 @@ func (client adl2PathClient) UpdatePreparer(ctx context.Context, action adl2.Pat autorest.WithPathParameters("/{filesystem}/{path}", pathParameters), autorest.WithQueryParameters(queryParameters)) if requestBody != nil { - preparer = autorest.DecoratePreparer(preparer, - autorest.WithFile(requestBody)) + var decorator autorest.PrepareDecorator + + if seeker, ok := requestBody.(io.ReadSeeker); ok { + // internally goofys always uses seekable + // readers, so we can rewind on + // retry. autorest makes a copy of the buffer + // and this avoids that waste + decorator = withReadSeeker(seeker, contentLength) + } else { + decorator = autorest.WithFile(requestBody) + } + preparer = autorest.DecoratePreparer(preparer, decorator) } if contentLength != nil { preparer = autorest.DecoratePreparer(preparer, @@ -1744,7 +1840,7 @@ func (client adl2PathClient) UpdatePreparer(ctx context.Context, action adl2.Pat preparer = autorest.DecoratePreparer(preparer, autorest.WithHeader("x-ms-version", autorest.String(client.XMsVersion))) } - return preparer.Prepare((client.defaultRequest()).WithContext(ctx)) + return preparer.Prepare((&http.Request{}).WithContext(ctx)) } // UpdateSender sends the Update request. The method will close the @@ -1767,25 +1863,6 @@ func (client adl2PathClient) UpdateResponder(resp *http.Response) (result autore return } -func (client adl2PathClient) defaultRequest() *http.Request { - r := &http.Request{} - r.GetBody = func() (io.ReadCloser, error) { - if r.Body == nil { - return http.NoBody, nil - } else if seeker, ok := r.Body.(io.ReadSeeker); ok { - // internally goofys always uses seekable - // readers, so we can rewind on - // retry. autorest makes a copy of the buffer - // and this avoids that waste - _, err := seeker.Seek(0, 0) - return &ReadSeekerCloser{seeker}, err - } else { - panic(fmt.Sprintf("Wrong type: %T", r.Body)) - } - } - return r -} - type adl2PathList struct { autorest.Response `json:"-"` Paths *[]adl2Path `json:"paths,omitempty"` diff --git a/internal/backend_azblob.go b/internal/backend_azblob.go index 2df1d582..da96da25 100644 --- a/internal/backend_azblob.go +++ b/internal/backend_azblob.go @@ -148,6 +148,10 @@ func NewAZBlob(container string, config *AZBlobConfig) (*AZBlob, error) { RequestLog: azblob.RequestLogOptions{ LogWarningIfTryOverThreshold: time.Duration(-1), }, + Retry: azblob.RetryOptions{ + MaxTries: config.MaxRetries, + RetryDelay: config.RetryDuration, + }, HTTPSender: newAzBlobHTTPClientFactory(), } diff --git a/internal/goofys.go b/internal/goofys.go index 63a60a88..0a4dc4e5 100644 --- a/internal/goofys.go +++ b/internal/goofys.go @@ -553,6 +553,8 @@ func mapHttpError(status int) error { return syscall.ENOTSUP case http.StatusConflict: return syscall.EINTR + case 411: // Length Required + return syscall.EINVAL case 429: return syscall.EAGAIN case 500: diff --git a/internal/goofys_test.go b/internal/goofys_test.go index 65dd65b8..6c129efc 100644 --- a/internal/goofys_test.go +++ b/internal/goofys_test.go @@ -211,6 +211,10 @@ func (t *GoofysTest) DeleteADLBlobs(cloud StorageBackend, items []string) error func (s *GoofysTest) selectTestConfig(t *C, flags *FlagStorage) (conf S3Config) { (&conf).Init() + // increase retries for flaky test, because sometimes we make + // parallel requests and the same stream of request can get + // unlucky and fail multiple times + conf.MaxRetries = 10 if hasEnv("AWS") { if isTravis() { conf.Region = "us-east-1" @@ -518,6 +522,8 @@ func (s *GoofysTest) SetUpTest(t *C) { } else if cloud == "azblob" { config, err := AzureBlobConfig(os.Getenv("ENDPOINT"), "", "blob") t.Assert(err, IsNil) + config.MaxRetries = 10 + config.RetryDuration = 200 * time.Millisecond if config.Endpoint == AzuriteEndpoint { s.azurite = true @@ -575,8 +581,9 @@ func (s *GoofysTest) SetUpTest(t *C) { t.Assert(err, IsNil) config := ADLv1Config{ - Endpoint: os.Getenv("ENDPOINT"), - Authorizer: auth, + Endpoint: os.Getenv("ENDPOINT"), + Authorizer: auth, + RetryDuration: 200 * time.Millisecond, } config.Init() @@ -605,8 +612,9 @@ func (s *GoofysTest) SetUpTest(t *C) { } config := ADLv2Config{ - Endpoint: os.Getenv("ENDPOINT"), - Authorizer: auth, + Endpoint: os.Getenv("ENDPOINT"), + Authorizer: auth, + RetryDuration: 200 * time.Millisecond, } flags.Backend = &config @@ -1150,6 +1158,61 @@ func (s *GoofysTest) TestWriteLargeFile(t *C) { s.testWriteFile(t, "testLargeFile3", int64(READAHEAD_CHUNK)+1, 128*1024) } +// fake connection that fails 2/3 of the requests +type FlakyConn struct { + net.Conn + fail bool +} + +func (c *FlakyConn) Write(b []byte) (n int, err error) { + if c.fail { + // for s3 we need to fake an error that aws thinks is + // retryable, and if we directly return a + // syscall.Errno aws sdk would use Errno.Temporary + // which includes only a very small subset of errnos + return 0, syscall.EAGAIN + } + return c.Conn.Write(b) +} + +type FlakyDialer struct { + StableContext func(ctx context.Context, network, addr string) (net.Conn, error) + fail int +} + +func (d *FlakyDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + conn, err := d.StableContext(ctx, network, address) + if err != nil { + return nil, err + } + d.fail += 1 + return &FlakyConn{conn, d.fail%3 != 0}, err +} + +func (s *GoofysTest) TestMultipartRetry(t *C) { + transport := GetHTTPTransport() + + // force connections to re-open to get the new flaky ones + transport.CloseIdleConnections() + // close the mock connections after the test so other tests + // won't re-use them + defer transport.CloseIdleConnections() + + oldContext := transport.DialContext + d := &FlakyDialer{transport.DialContext, 0} + transport.DialContext = d.DialContext + defer func() { transport.DialContext = oldContext }() + + transport.DisableKeepAlives = true + defer func() { transport.DisableKeepAlives = false }() + + // On adlv2 we also send lease renew requests in the + // background. Writing 10MB sends two 5MB requests. With 3 + // requests and 2/3 failure rate, this guarantees that one of + // the upload would fail and we tested the retry path + s.testWriteFile(t, "testLargeFile", 10*1024*1024, 128*1024) +} + func (s *GoofysTest) TestWriteReallyLargeFile(t *C) { if _, ok := s.cloud.(*S3Backend); ok && s.emulator { t.Skip("seems to be OOM'ing S3proxy 1.8.0") @@ -3198,6 +3261,10 @@ func (s *GoofysTest) TestIssue326(t *C) { // Test for this issue: https://github.com/kahing/goofys/issues/564 func (s *GoofysTest) TestInvalidXMLIssue(t *C) { + if _, ok := s.cloud.Delegate().(*S3Backend); !ok { + t.Skip("only for S3") + } + dirName := "invalidXml" // This is not an invalid xml but checking if url decoding was successful dirNameWithValidXml := "folder1 !#$%&'<>*+,-.=?@[\\^~_\t" From f687ab9c44e8b407ad7ad51e6d6dd0e8ce47d0e9 Mon Sep 17 00:00:00 2001 From: Ka-Hing Cheung Date: Fri, 9 Jul 2021 11:18:22 -0700 Subject: [PATCH 12/14] use AWS_REGION for sts region otherwise it only works for sts in the default region (us-east1) --- api/common/conf_s3.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/common/conf_s3.go b/api/common/conf_s3.go index c6bff97f..90ed904d 100644 --- a/api/common/conf_s3.go +++ b/api/common/conf_s3.go @@ -19,6 +19,7 @@ import ( "encoding/base64" "fmt" "net/http" + "os" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/client" @@ -150,6 +151,7 @@ func (c stsConfigProvider) ClientConfig(serviceName string, cfgs ...*aws.Config) } if c.StsEndpoint != "" { config.Endpoint = c.StsEndpoint + config.SigningRegion = os.Getenv("AWS_REGION") } return config From ac1adac41c39e5a15fbc3ee50e8d983638387485 Mon Sep 17 00:00:00 2001 From: Sai Teja Suram Date: Wed, 21 Jul 2021 21:35:38 -0700 Subject: [PATCH 13/14] Unit test for when to use host-based bucket URL (#106) --- internal/s3_init_test.go | 93 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 internal/s3_init_test.go diff --git a/internal/s3_init_test.go b/internal/s3_init_test.go new file mode 100644 index 00000000..ece2ad4d --- /dev/null +++ b/internal/s3_init_test.go @@ -0,0 +1,93 @@ +package internal + +import ( + . "github.com/kahing/goofys/api/common" + "net/http" +) +import . "gopkg.in/check.v1" + + +type S3Test struct { + orignalTransport http.RoundTripper +} + +var _ = Suite(&S3Test{}) + +func (s *S3Test) SetUpSuite(t *C) { + s.orignalTransport = http.DefaultTransport +} + +func (s *S3Test) TearDownSuite(t *C) { + http.DefaultTransport = s.orignalTransport +} + +type roundTripFunc func(r *http.Request) (*http.Response, error) + +func (s roundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return s(r) +} + +func mockTransportForS3RegionDetection(t *C, expectedUrl, regionToMock string) http.RoundTripper { + return roundTripFunc(func(r *http.Request) (*http.Response, error) { + t.Assert(expectedUrl, Equals, r.URL.String()) + h := http.Header{} + h.Add("X-Amz-Bucket-Region", regionToMock) + h.Add("Server", "AmazonS3") + return &http.Response{StatusCode: 403, Header: h, Request: r}, nil + }) +} + +func (s *S3Test) TestDetectDNSSafeBucketRegionWithEndpoint(t *C) { + // If end point is set, we should do HEAD on / always. + s3, err := NewS3("bucket1", &FlagStorage{Endpoint: "https://foo.com"}, &S3Config{}) + t.Assert(err, IsNil) + http.DefaultTransport = mockTransportForS3RegionDetection(t, "https://foo.com/bucket1", "awesome-region1") + err, isAws := s3.detectBucketLocationByHEAD() + t.Assert(err, IsNil) + t.Assert(isAws, Equals, true) + t.Assert(*s3.awsConfig.Region, Equals, "awesome-region1") +} + +func (s *S3Test) TestDetectDNSUnSafeBucketRegionWithEndpoint(t *C) { + // If end point is set, we should do HEAD on / always. + s3, err := NewS3("buc_ket1", &FlagStorage{Endpoint: "https://foo.com"}, &S3Config{}) + t.Assert(err, IsNil) + http.DefaultTransport = mockTransportForS3RegionDetection(t, "https://foo.com/buc_ket1", "awesome-region2") + err, isAws := s3.detectBucketLocationByHEAD() + t.Assert(err, IsNil) + t.Assert(isAws, Equals, true) + t.Assert(*s3.awsConfig.Region, Equals, "awesome-region2") +} + +func (s *S3Test) TestDetectDNSSafeBucketRegion(t *C) { + // bucket name is dns safe => use .s3.amazonaws.com for region detection. + s3, err := NewS3("bucket1", &FlagStorage{}, &S3Config{}) + t.Assert(err, IsNil) + http.DefaultTransport = mockTransportForS3RegionDetection(t, "https://bucket1.s3.amazonaws.com", "awesome-region3") + err, isAws := s3.detectBucketLocationByHEAD() + t.Assert(err, IsNil) + t.Assert(isAws, Equals, true) + t.Assert(*s3.awsConfig.Region, Equals, "awesome-region3") +} + +func (s *S3Test) TestDetectDNSUnSafeBucketRegion(t *C) { + // bucket name is *not• dns safe => use s3.amazonaws.com/ for region detection. + s3, err := NewS3("buc_ket1", &FlagStorage{}, &S3Config{}) + t.Assert(err, IsNil) + http.DefaultTransport = mockTransportForS3RegionDetection(t, "https://s3.amazonaws.com/buc_ket1", "awesome-region4") + err, isAws := s3.detectBucketLocationByHEAD() + t.Assert(err, IsNil) + t.Assert(isAws, Equals, true) + t.Assert(*s3.awsConfig.Region, Equals, "awesome-region4") +} + +func (s *S3Test) TestDetectBucketNameWithDotRegion(t *C) { + // if the bucket name has dot, we cant use use .s3.amazonaws.com. + s3, err := NewS3("buck.et1", &FlagStorage{}, &S3Config{}) + t.Assert(err, IsNil) + http.DefaultTransport = mockTransportForS3RegionDetection(t, "https://s3.amazonaws.com/buck.et1", "awesome-region5") + err, isAws := s3.detectBucketLocationByHEAD() + t.Assert(err, IsNil) + t.Assert(isAws, Equals, true) + t.Assert(*s3.awsConfig.Region, Equals, "awesome-region5") +} From 537b53f94c43b729a6f66e9de9a83c17da4a47f1 Mon Sep 17 00:00:00 2001 From: Ka-Hing Cheung Date: Mon, 26 Jul 2021 23:10:58 -0700 Subject: [PATCH 14/14] don't let mountpoint(1) block --- api/common/lazy_init.go | 18 +++++++++-------- internal/goofys_test.go | 44 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/api/common/lazy_init.go b/api/common/lazy_init.go index 97dcd0ab..92eb19aa 100644 --- a/api/common/lazy_init.go +++ b/api/common/lazy_init.go @@ -46,11 +46,8 @@ func (fs *LazyInitFileSystem) Init() error { } func (fs *LazyInitFileSystem) StatFS(ctx context.Context, op *fuseops.StatFSOp) (err error) { - err = fs.Init() - if err != nil { - return - } - + // don't need to init filesystem to do statfs() since we + // return fake data anyway return fs.Fs.StatFS(ctx, op) } @@ -64,9 +61,14 @@ func (fs *LazyInitFileSystem) LookUpInode(ctx context.Context, op *fuseops.LookU } func (fs *LazyInitFileSystem) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) (err error) { - err = fs.Init() - if err != nil { - return + // don't need to init filesystem if we are checking the root + // inode since we return fake data anyway. Also this is used + // by mountpoint(1) and we don't want that to block + if op.Inode != fuseops.RootInodeID { + err = fs.Init() + if err != nil { + return + } } return fs.Fs.GetInodeAttributes(ctx, op) diff --git a/internal/goofys_test.go b/internal/goofys_test.go index 6c129efc..fe5cc29d 100644 --- a/internal/goofys_test.go +++ b/internal/goofys_test.go @@ -1719,7 +1719,14 @@ func (s *GoofysTest) mount(t *C, mountPoint string) { err := os.MkdirAll(mountPoint, 0700) t.Assert(err, IsNil) - server := fuseutil.NewFileSystemServer(s.fs) + var fs InitFileSystem + fs = s.fs + if s.fs.flags.BgInit { + fs = &LazyInitFileSystem{ + Fs: fs, + } + } + server := fuseutil.NewFileSystemServer(fs) if isCatfs() { s.fs.flags.MountOptions = make(map[string]string) @@ -4419,3 +4426,38 @@ func (s *GoofysTest) TestBgSlowMount(t *C) { err := fs.LookUpInode(nil, &lookup) t.Assert(err, IsNil) } + +func (s *GoofysTest) TestBgSlowMountPoint(t *C) { + start := time.Now() + s.fs.flags.BgInit = true + + s.fs = newGoofys(context.Background(), s.fs.bucket, s.fs.flags, + func(bucket string, flags *FlagStorage) (StorageBackend, error) { + cloud, err := NewBackend(bucket, flags) + if err != nil { + return nil, err + } + + time.Sleep(2 * time.Second) + + return cloud, nil + }) + + mountPoint := "/tmp/mnt" + s.fs.bucket + defer s.umount(t, mountPoint) + s.runFuseTest(t, mountPoint, false, "/bin/mountpoint", mountPoint) + + now := time.Now() + // check that this takes less than 1 second even though we are + // sleeping for 2 + t.Assert(now.Before(start.Add(1*time.Second)), Equals, true, + Commentf("start %v now %v", start, now)) + + // double check that we eventually worked + file := "file1" + filePath := mountPoint + "/file1" + + buf, err := ioutil.ReadFile(filePath) + t.Assert(err, IsNil) + t.Assert(string(buf), Equals, file) +}