diff --git a/go.sum b/go.sum index 4d8fd9d..8a472cc 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,8 @@ github.com/aws/aws-sdk-go v1.35.30/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9 github.com/benbjohnson/clock v1.0.2/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= +github.com/aws/aws-sdk-go v1.28.13 h1:JyCQQ86yil3hg7MtWdNH8Pbcgx92qlUV2v22Km63Mf4= +github.com/aws/aws-sdk-go v1.28.13/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/s3.go b/s3.go index 34b2113..ecb338b 100644 --- a/s3.go +++ b/s3.go @@ -168,23 +168,64 @@ func (s *S3Bucket) Delete(k ds.Key) error { return err } +func querySupported(q dsq.Query) bool { + if len(q.Orders) > 0 { + switch q.Orders[0].(type) { + case dsq.OrderByKey, *dsq.OrderByKey: + // We order by key by default. + default: + return false + } + } + return len(q.Filters) == 0 +} + func (s *S3Bucket) Query(q dsq.Query) (dsq.Results, error) { - if q.Orders != nil || q.Filters != nil { - return nil, fmt.Errorf("s3ds: filters or orders are not supported") + // Handle ordering + if !querySupported(q) { + // OK, time to do this the naive way. + + // Skip the stuff we can't apply. + baseQuery := q + baseQuery.Filters = nil + baseQuery.Orders = nil + baseQuery.Limit = 0 // needs to apply after we order + baseQuery.Offset = 0 // ditto. + + // perform the base query. + res, err := s.Query(baseQuery) + if err != nil { + return nil, err + } + + // fix the query + res = dsq.ResultsReplaceQuery(res, q) + + // Remove the prefix, S3 has already handled it. + naiveQuery := q + naiveQuery.Prefix = "" + + // Apply the rest of the query + return dsq.NaiveQueryApply(naiveQuery, res), nil } - // S3 store a "/foo" key as "foo" so we need to trim the leading "/" - q.Prefix = strings.TrimPrefix(q.Prefix, "/") + // Normalize the path and strip the leading / as S3 stores values + // without the leading /. + prefix := ds.NewKey(q.Prefix).String()[1:] - limit := q.Limit + q.Offset - if limit == 0 || limit > listMax { - limit = listMax + sent := 0 + queryLimit := func() int64 { + if q.Limit > 0 && (q.Limit-sent) < listMax { + return int64(q.Limit - sent) + } + return listMax } resp, err := s.S3.ListObjectsV2(&s3.ListObjectsV2Input{ - Bucket: aws.String(s.Bucket), - Prefix: aws.String(s.s3Path(q.Prefix)), - MaxKeys: aws.Int64(int64(limit)), + Bucket: aws.String(s.Bucket), + Prefix: aws.String(s.s3Path(prefix)), + Delimiter: aws.String("/"), + MaxKeys: aws.Int64(queryLimit()), }) if err != nil { return nil, err @@ -192,6 +233,11 @@ func (s *S3Bucket) Query(q dsq.Query) (dsq.Results, error) { index := q.Offset nextValue := func() (dsq.Result, bool) { + tryAgain: + if q.Limit > 0 && sent >= q.Limit { + return dsq.Result{}, false + } + for index >= len(resp.Contents) { if !*resp.IsTruncated { return dsq.Result{}, false @@ -201,9 +247,9 @@ func (s *S3Bucket) Query(q dsq.Query) (dsq.Results, error) { resp, err = s.S3.ListObjectsV2(&s3.ListObjectsV2Input{ Bucket: aws.String(s.Bucket), - Prefix: aws.String(s.s3Path(q.Prefix)), + Prefix: aws.String(s.s3Path(prefix)), Delimiter: aws.String("/"), - MaxKeys: aws.Int64(listMax), + MaxKeys: aws.Int64(queryLimit()), ContinuationToken: resp.NextContinuationToken, }) if err != nil { @@ -217,13 +263,24 @@ func (s *S3Bucket) Query(q dsq.Query) (dsq.Results, error) { } if !q.KeysOnly { value, err := s.Get(ds.NewKey(entry.Key)) - if err != nil { - return dsq.Result{Error: err}, false + switch err { + case nil: + case ds.ErrNotFound: + // This just means the value got deleted in the + // mean-time. That's not an error. + // + // We could use a loop instead of a goto, but + // this is one of those rare cases where a goto + // is easier to understand. + goto tryAgain + default: + return dsq.Result{Entry: entry, Error: err}, false } entry.Value = value } index++ + sent++ return dsq.Result{Entry: entry}, true } diff --git a/s3_test.go b/s3_test.go index 4f573e7..8e34c30 100644 --- a/s3_test.go +++ b/s3_test.go @@ -34,18 +34,7 @@ func TestSuiteLocalS3(t *testing.T) { t.Fatal(err) } - t.Run("basic operations", func(t *testing.T) { - dstest.SubtestBasicPutGet(t, s3ds) - }) - t.Run("not found operations", func(t *testing.T) { - dstest.SubtestNotFounds(t, s3ds) - }) - t.Run("many puts and gets, query", func(t *testing.T) { - dstest.SubtestManyKeysAndQuery(t, s3ds) - }) - t.Run("return sizes", func(t *testing.T) { - dstest.SubtestReturnSizes(t, s3ds) - }) + dstest.SubtestAll(t, s3ds) } func devMakeBucket(s3obj *s3.S3, bucketName string) error {