Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/aws/aws-sdk-go v1.35.30/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9
github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/aws/aws-sdk-go v1.28.13 h1:JyCQQ86yil3hg7MtWdNH8Pbcgx92qlUV2v22Km63Mf4=
github.com/aws/aws-sdk-go v1.28.13/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
85 changes: 71 additions & 14 deletions s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,30 +168,76 @@ func (s *S3Bucket) Delete(k ds.Key) error {
return err
}

func querySupported(q dsq.Query) bool {
if len(q.Orders) > 0 {
switch q.Orders[0].(type) {
case dsq.OrderByKey, *dsq.OrderByKey:
// We order by key by default.
default:
return false
}
}
return len(q.Filters) == 0
}

func (s *S3Bucket) Query(q dsq.Query) (dsq.Results, error) {
if q.Orders != nil || q.Filters != nil {
return nil, fmt.Errorf("s3ds: filters or orders are not supported")
// Handle ordering
if !querySupported(q) {
// OK, time to do this the naive way.

// Skip the stuff we can't apply.
baseQuery := q
baseQuery.Filters = nil
baseQuery.Orders = nil
baseQuery.Limit = 0 // needs to apply after we order
baseQuery.Offset = 0 // ditto.

// perform the base query.
res, err := s.Query(baseQuery)
if err != nil {
return nil, err
}

// fix the query
res = dsq.ResultsReplaceQuery(res, q)

// Remove the prefix, S3 has already handled it.
naiveQuery := q
naiveQuery.Prefix = ""

// Apply the rest of the query
return dsq.NaiveQueryApply(naiveQuery, res), nil
}

// S3 store a "/foo" key as "foo" so we need to trim the leading "/"
q.Prefix = strings.TrimPrefix(q.Prefix, "/")
// Normalize the path and strip the leading / as S3 stores values
// without the leading /.
prefix := ds.NewKey(q.Prefix).String()[1:]

limit := q.Limit + q.Offset
if limit == 0 || limit > listMax {
limit = listMax
sent := 0
queryLimit := func() int64 {
if q.Limit > 0 && (q.Limit-sent) < listMax {
return int64(q.Limit - sent)
}
return listMax
}

resp, err := s.S3.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(s.Bucket),
Prefix: aws.String(s.s3Path(q.Prefix)),
MaxKeys: aws.Int64(int64(limit)),
Bucket: aws.String(s.Bucket),
Prefix: aws.String(s.s3Path(prefix)),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(queryLimit()),
})
if err != nil {
return nil, err
}

index := q.Offset
nextValue := func() (dsq.Result, bool) {
tryAgain:
if q.Limit > 0 && sent >= q.Limit {
return dsq.Result{}, false
}

for index >= len(resp.Contents) {
if !*resp.IsTruncated {
return dsq.Result{}, false
Expand All @@ -201,9 +247,9 @@ func (s *S3Bucket) Query(q dsq.Query) (dsq.Results, error) {

resp, err = s.S3.ListObjectsV2(&s3.ListObjectsV2Input{
Bucket: aws.String(s.Bucket),
Prefix: aws.String(s.s3Path(q.Prefix)),
Prefix: aws.String(s.s3Path(prefix)),
Delimiter: aws.String("/"),
MaxKeys: aws.Int64(listMax),
MaxKeys: aws.Int64(queryLimit()),
ContinuationToken: resp.NextContinuationToken,
})
if err != nil {
Expand All @@ -217,13 +263,24 @@ func (s *S3Bucket) Query(q dsq.Query) (dsq.Results, error) {
}
if !q.KeysOnly {
value, err := s.Get(ds.NewKey(entry.Key))
if err != nil {
return dsq.Result{Error: err}, false
switch err {
case nil:
case ds.ErrNotFound:
// This just means the value got deleted in the
// mean-time. That's not an error.
//
// We could use a loop instead of a goto, but
// this is one of those rare cases where a goto
// is easier to understand.
goto tryAgain
default:
return dsq.Result{Entry: entry, Error: err}, false
}
entry.Value = value
}

index++
sent++
return dsq.Result{Entry: entry}, true
}

Expand Down
13 changes: 1 addition & 12 deletions s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,7 @@ func TestSuiteLocalS3(t *testing.T) {
t.Fatal(err)
}

t.Run("basic operations", func(t *testing.T) {
dstest.SubtestBasicPutGet(t, s3ds)
})
t.Run("not found operations", func(t *testing.T) {
dstest.SubtestNotFounds(t, s3ds)
})
t.Run("many puts and gets, query", func(t *testing.T) {
dstest.SubtestManyKeysAndQuery(t, s3ds)
})
t.Run("return sizes", func(t *testing.T) {
dstest.SubtestReturnSizes(t, s3ds)
})
dstest.SubtestAll(t, s3ds)
}

func devMakeBucket(s3obj *s3.S3, bucketName string) error {
Expand Down