diff --git a/go.mod b/go.mod
index 86ca4214c6904..b6c20d0f1038d 100644
--- a/go.mod
+++ b/go.mod
@@ -123,6 +123,7 @@ require (
github.com/prometheus/alertmanager v0.25.0
github.com/prometheus/common/sigv4 v0.1.0
github.com/richardartoul/molecule v1.0.0
+ github.com/sourcegraph/conc v0.3.0
github.com/thanos-io/objstore v0.0.0-20230201072718-11ffbc490204
github.com/willf/bloom v2.0.3+incompatible
go4.org/netipx v0.0.0-20230125063823-8449b0a6169f
@@ -300,7 +301,7 @@ require (
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
- go.uber.org/multierr v1.8.0 // indirect
+ go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/term v0.10.0 // indirect
diff --git a/go.sum b/go.sum
index 23991bc8425af..86394861790d9 100644
--- a/go.sum
+++ b/go.sum
@@ -1621,6 +1621,8 @@ github.com/soniah/gosnmp v1.25.0/go.mod h1:8YvfZxH388NIIw2A+X5z2Oh97VcNhtmxDLt5Q
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
+github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
+github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
@@ -1777,8 +1779,8 @@ go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
-go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
-go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
+go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
+go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
diff --git a/pkg/querier/multi_tenant_querier.go b/pkg/querier/multi_tenant_querier.go
index c9b1b56b8b284..a655d432d24e1 100644
--- a/pkg/querier/multi_tenant_querier.go
+++ b/pkg/querier/multi_tenant_querier.go
@@ -8,6 +8,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
+ "github.com/sourcegraph/conc/pool"
"github.com/grafana/dskit/tenant"
@@ -53,17 +54,18 @@ func (q *MultiTenantQuerier) SelectLogs(ctx context.Context, params logql.Select
matchedTenants, filteredMatchers := filterValuesByMatchers(defaultTenantLabel, tenantIDs, selector.Matchers()...)
params.Selector = replaceMatchers(selector, filteredMatchers).String()
- iters := make([]iter.EntryIterator, len(matchedTenants))
- i := 0
+ p := pool.NewWithResults[iter.EntryIterator]().WithErrors() //.WithContext(ctx).WithCancelOnError()
for id := range matchedTenants {
- singleContext := user.InjectOrgID(ctx, id)
- iter, err := q.Querier.SelectLogs(singleContext, params)
- if err != nil {
- return nil, err
- }
-
- iters[i] = NewTenantEntryIterator(iter, id)
- i++
+ id := id
+ p.Go(func() (iter.EntryIterator, error) {
+ singleContext := user.InjectOrgID(ctx, id)
+ iter, err := q.Querier.SelectLogs(singleContext, params)
+ return NewTenantEntryIterator(iter, id), err
+ })
+ }
+ iters, err := p.Wait()
+ if err != nil {
+ return nil, err
}
return iter.NewSortEntryIterator(iters, params.Direction), nil
}
@@ -84,17 +86,18 @@ func (q *MultiTenantQuerier) SelectSamples(ctx context.Context, params logql.Sel
}
params.Selector = updatedSelector.String()
- iters := make([]iter.SampleIterator, len(matchedTenants))
- i := 0
+ p := pool.NewWithResults[iter.SampleIterator]().WithErrors() //.WithContext(ctx).WithCancelOnError()
for id := range matchedTenants {
- singleContext := user.InjectOrgID(ctx, id)
- iter, err := q.Querier.SelectSamples(singleContext, params)
- if err != nil {
- return nil, err
- }
-
- iters[i] = NewTenantSampleIterator(iter, id)
- i++
+ id := id
+ p.Go(func() (iter.SampleIterator, error) {
+ singleContext := user.InjectOrgID(ctx, id)
+ iter, err := q.Querier.SelectSamples(singleContext, params)
+ return NewTenantSampleIterator(iter, id), err
+ })
+ }
+ iters, err := p.Wait()
+ if err != nil {
+ return nil, err
}
return iter.NewSortSampleIterator(iters), nil
}
diff --git a/pkg/querier/multi_tenant_querier_test.go b/pkg/querier/multi_tenant_querier_test.go
index 0a74fe957677b..8133b78cdcda5 100644
--- a/pkg/querier/multi_tenant_querier_test.go
+++ b/pkg/querier/multi_tenant_querier_test.go
@@ -53,7 +53,7 @@ func TestMultiTenantQuerier_SelectLogs(t *testing.T) {
`{__tenant_id__="1", type="test"}`,
`{__tenant_id__="1", type="test"}`,
},
- []string{"line 1", "line 2", "line 1", "line 2"},
+ []string{"line 1", "line 2"},
},
{
"two tenants with selector and pipeline filter",
@@ -63,7 +63,7 @@ func TestMultiTenantQuerier_SelectLogs(t *testing.T) {
`{__tenant_id__="1", type="test"}`,
`{__tenant_id__="1", type="test"}`,
},
- []string{"line 1", "line 2", "line 1", "line 2"},
+ []string{"line 1", "line 2"},
},
{
"one tenant",
@@ -94,13 +94,14 @@ func TestMultiTenantQuerier_SelectLogs(t *testing.T) {
iter, err := multiTenantQuerier.SelectLogs(ctx, params)
require.NoError(t, err)
- entriesCount := 0
+ actualLabels := make([]string, 0)
+ actualLines := make([]string, 0)
for iter.Next() {
- require.Equal(t, tc.expLabels[entriesCount], iter.Labels())
- require.Equal(t, tc.expLines[entriesCount], iter.Entry().Line)
- entriesCount++
+ actualLabels = append(actualLabels, iter.Labels())
+ actualLines = append(actualLines, iter.Entry().Line)
}
- require.Equalf(t, len(tc.expLabels), entriesCount, "Expected %d entries but got %d", len(tc.expLabels), entriesCount)
+ require.ElementsMatch(t, tc.expLabels, actualLabels)
+ require.ElementsMatch(t, tc.expLines, actualLines)
})
}
}
@@ -165,6 +166,10 @@ func TestMultiTenantQuerier_SelectSamples(t *testing.T) {
iter, err := multiTenantQuerier.SelectSamples(ctx, params)
require.NoError(t, err)
+ // Subsequent calls succeed
+ _, err = multiTenantQuerier.SelectSamples(ctx, params)
+ require.NoError(t, err)
+
received := make([]string, 0, len(tc.expLabels))
for iter.Next() {
received = append(received, iter.Labels())
diff --git a/tools/dev/loki-boltdb-storage-s3/config/datasource.yaml b/tools/dev/loki-boltdb-storage-s3/config/datasource.yaml
index 4448583ad781c..fd408eae0c6ad 100644
--- a/tools/dev/loki-boltdb-storage-s3/config/datasource.yaml
+++ b/tools/dev/loki-boltdb-storage-s3/config/datasource.yaml
@@ -12,6 +12,14 @@ datasources:
httpHeaderName1: 'X-Scope-OrgID'
secureJsonData:
httpHeaderValue1: '1'
+ - name: Loki-Cross-Tenant
+ type: loki
+ access: proxy
+ url: http://query-frontend:8007
+ jsonData:
+ httpHeaderName1: 'X-Scope-OrgID'
+ secureJsonData:
+ httpHeaderValue1: '1|2'
- name: Loki-Limited
type: loki
access: proxy
diff --git a/vendor/github.com/sourcegraph/conc/.golangci.yml b/vendor/github.com/sourcegraph/conc/.golangci.yml
new file mode 100644
index 0000000000000..ae65a760a92de
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/.golangci.yml
@@ -0,0 +1,11 @@
+linters:
+ disable-all: true
+ enable:
+ - errcheck
+ - godot
+ - gosimple
+ - govet
+ - ineffassign
+ - staticcheck
+ - typecheck
+ - unused
diff --git a/vendor/github.com/sourcegraph/conc/LICENSE b/vendor/github.com/sourcegraph/conc/LICENSE
new file mode 100644
index 0000000000000..1081f4ef4a4c2
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/LICENSE
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2023 Sourcegraph
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/vendor/github.com/sourcegraph/conc/README.md b/vendor/github.com/sourcegraph/conc/README.md
new file mode 100644
index 0000000000000..1c87c3c969965
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/README.md
@@ -0,0 +1,464 @@
+
+
+# `conc`: better structured concurrency for go
+
+[](https://pkg.go.dev/github.com/sourcegraph/conc)
+[](https://sourcegraph.com/github.com/sourcegraph/conc)
+[](https://goreportcard.com/report/github.com/sourcegraph/conc)
+[](https://codecov.io/gh/sourcegraph/conc)
+[](https://discord.gg/bvXQXmtRjN)
+
+`conc` is your toolbelt for structured concurrency in go, making common tasks
+easier and safer.
+
+```sh
+go get github.com/sourcegraph/conc
+```
+
+# At a glance
+
+- Use [`conc.WaitGroup`](https://pkg.go.dev/github.com/sourcegraph/conc#WaitGroup) if you just want a safer version of `sync.WaitGroup`
+- Use [`pool.Pool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool) if you want a concurrency-limited task runner
+- Use [`pool.ResultPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultPool) if you want a concurrent task runner that collects task results
+- Use [`pool.(Result)?ErrorPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool) if your tasks are fallible
+- Use [`pool.(Result)?ContextPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ContextPool) if your tasks should be canceled on failure
+- Use [`stream.Stream`](https://pkg.go.dev/github.com/sourcegraph/conc/stream#Stream) if you want to process an ordered stream of tasks in parallel with serial callbacks
+- Use [`iter.Map`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#Map) if you want to concurrently map a slice
+- Use [`iter.ForEach`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#ForEach) if you want to concurrently iterate over a slice
+- Use [`panics.Catcher`](https://pkg.go.dev/github.com/sourcegraph/conc/panics#Catcher) if you want to catch panics in your own goroutines
+
+All pools are created with
+[`pool.New()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#New)
+or
+[`pool.NewWithResults[T]()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#NewWithResults),
+then configured with methods:
+
+- [`p.WithMaxGoroutines()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.MaxGoroutines) configures the maximum number of goroutines in the pool
+- [`p.WithErrors()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithErrors) configures the pool to run tasks that return errors
+- [`p.WithContext(ctx)`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithContext) configures the pool to run tasks that should be canceled on first error
+- [`p.WithFirstError()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool.WithFirstError) configures error pools to only keep the first returned error rather than an aggregated error
+- [`p.WithCollectErrored()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultContextPool.WithCollectErrored) configures result pools to collect results even when the task errored
+
+# Goals
+
+The main goals of the package are:
+1) Make it harder to leak goroutines
+2) Handle panics gracefully
+3) Make concurrent code easier to read
+
+## Goal #1: Make it harder to leak goroutines
+
+A common pain point when working with goroutines is cleaning them up. It's
+really easy to fire off a `go` statement and fail to properly wait for it to
+complete.
+
+`conc` takes the opinionated stance that all concurrency should be scoped.
+That is, goroutines should have an owner and that owner should always
+ensure that its owned goroutines exit properly.
+
+In `conc`, the owner of a goroutine is always a `conc.WaitGroup`. Goroutines
+are spawned in a `WaitGroup` with `(*WaitGroup).Go()`, and
+`(*WaitGroup).Wait()` should always be called before the `WaitGroup` goes out
+of scope.
+
+In some cases, you might want a spawned goroutine to outlast the scope of the
+caller. In that case, you could pass a `WaitGroup` into the spawning function.
+
+```go
+func main() {
+ var wg conc.WaitGroup
+ defer wg.Wait()
+
+ startTheThing(&wg)
+}
+
+func startTheThing(wg *conc.WaitGroup) {
+ wg.Go(func() { ... })
+}
+```
+
+For some more discussion on why scoped concurrency is nice, check out [this
+blog
+post](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/).
+
+## Goal #2: Handle panics gracefully
+
+A frequent problem with goroutines in long-running applications is handling
+panics. A goroutine spawned without a panic handler will crash the whole process
+on panic. This is usually undesirable.
+
+However, if you do add a panic handler to a goroutine, what do you do with the
+panic once you catch it? Some options:
+1) Ignore it
+2) Log it
+3) Turn it into an error and return that to the goroutine spawner
+4) Propagate the panic to the goroutine spawner
+
+Ignoring panics is a bad idea since panics usually mean there is actually
+something wrong and someone should fix it.
+
+Just logging panics isn't great either because then there is no indication to the spawner
+that something bad happened, and it might just continue on as normal even though your
+program is in a really bad state.
+
+Both (3) and (4) are reasonable options, but both require the goroutine to have
+an owner that can actually receive the message that something went wrong. This
+is generally not true with a goroutine spawned with `go`, but in the `conc`
+package, all goroutines have an owner that must collect the spawned goroutine.
+In the conc package, any call to `Wait()` will panic if any of the spawned goroutines
+panicked. Additionally, it decorates the panic value with a stacktrace from the child
+goroutine so that you don't lose information about what caused the panic.
+
+Doing this all correctly every time you spawn something with `go` is not
+trivial and it requires a lot of boilerplate that makes the important parts of
+the code more difficult to read, so `conc` does this for you.
+
+
+
+stdlib |
+conc |
+
+
+
+
+```go
+type caughtPanicError struct {
+ val any
+ stack []byte
+}
+
+func (e *caughtPanicError) Error() string {
+ return fmt.Sprintf(
+ "panic: %q\n%s",
+ e.val,
+ string(e.stack)
+ )
+}
+
+func main() {
+ done := make(chan error)
+ go func() {
+ defer func() {
+ if v := recover(); v != nil {
+ done <- &caughtPanicError{
+ val: v,
+ stack: debug.Stack()
+ }
+ } else {
+ done <- nil
+ }
+ }()
+ doSomethingThatMightPanic()
+ }()
+ err := <-done
+ if err != nil {
+ panic(err)
+ }
+}
+```
+ |
+
+
+```go
+func main() {
+ var wg conc.WaitGroup
+ wg.Go(doSomethingThatMightPanic)
+ // panics with a nice stacktrace
+ wg.Wait()
+}
+```
+ |
+
+
+
+## Goal #3: Make concurrent code easier to read
+
+Doing concurrency correctly is difficult. Doing it in a way that doesn't
+obfuscate what the code is actually doing is more difficult. The `conc` package
+attempts to make common operations easier by abstracting as much boilerplate
+complexity as possible.
+
+Want to run a set of concurrent tasks with a bounded set of goroutines? Use
+`pool.New()`. Want to process an ordered stream of results concurrently, but
+still maintain order? Try `stream.New()`. What about a concurrent map over
+a slice? Take a peek at `iter.Map()`.
+
+Browse some examples below for some comparisons with doing these by hand.
+
+# Examples
+
+Each of these examples forgoes propagating panics for simplicity. To see
+what kind of complexity that would add, check out the "Goal #2" header above.
+
+Spawn a set of goroutines and waiting for them to finish:
+
+
+
+stdlib |
+conc |
+
+
+
+
+```go
+func main() {
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ // crashes on panic!
+ doSomething()
+ }()
+ }
+ wg.Wait()
+}
+```
+ |
+
+
+```go
+func main() {
+ var wg conc.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Go(doSomething)
+ }
+ wg.Wait()
+}
+```
+ |
+
+
+
+Process each element of a stream in a static pool of goroutines:
+
+
+
+stdlib |
+conc |
+
+
+
+
+```go
+func process(stream chan int) {
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for elem := range stream {
+ handle(elem)
+ }
+ }()
+ }
+ wg.Wait()
+}
+```
+ |
+
+
+```go
+func process(stream chan int) {
+ p := pool.New().WithMaxGoroutines(10)
+ for elem := range stream {
+ elem := elem
+ p.Go(func() {
+ handle(elem)
+ })
+ }
+ p.Wait()
+}
+```
+ |
+
+
+
+Process each element of a slice in a static pool of goroutines:
+
+
+
+stdlib |
+conc |
+
+
+
+
+```go
+func process(values []int) {
+ feeder := make(chan int, 8)
+
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for elem := range feeder {
+ handle(elem)
+ }
+ }()
+ }
+
+ for _, value := range values {
+ feeder <- value
+ }
+ close(feeder)
+ wg.Wait()
+}
+```
+ |
+
+
+```go
+func process(values []int) {
+ iter.ForEach(values, handle)
+}
+```
+ |
+
+
+
+Concurrently map a slice:
+
+
+
+stdlib |
+conc |
+
+
+
+
+```go
+func concMap(
+ input []int,
+ f func(int) int,
+) []int {
+ res := make([]int, len(input))
+ var idx atomic.Int64
+
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ for {
+ i := int(idx.Add(1) - 1)
+ if i >= len(input) {
+ return
+ }
+
+ res[i] = f(input[i])
+ }
+ }()
+ }
+ wg.Wait()
+ return res
+}
+```
+ |
+
+
+```go
+func concMap(
+ input []int,
+ f func(*int) int,
+) []int {
+ return iter.Map(input, f)
+}
+```
+ |
+
+
+
+Process an ordered stream concurrently:
+
+
+
+
+stdlib |
+conc |
+
+
+
+
+```go
+func mapStream(
+ in chan int,
+ out chan int,
+ f func(int) int,
+) {
+ tasks := make(chan func())
+ taskResults := make(chan chan int)
+
+ // Worker goroutines
+ var workerWg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ workerWg.Add(1)
+ go func() {
+ defer workerWg.Done()
+ for task := range tasks {
+ task()
+ }
+ }()
+ }
+
+ // Ordered reader goroutines
+ var readerWg sync.WaitGroup
+ readerWg.Add(1)
+ go func() {
+ defer readerWg.Done()
+ for result := range taskResults {
+ item := <-result
+ out <- item
+ }
+ }()
+
+ // Feed the workers with tasks
+ for elem := range in {
+ resultCh := make(chan int, 1)
+ taskResults <- resultCh
+ tasks <- func() {
+ resultCh <- f(elem)
+ }
+ }
+
+ // We've exhausted input.
+ // Wait for everything to finish
+ close(tasks)
+ workerWg.Wait()
+ close(taskResults)
+ readerWg.Wait()
+}
+```
+ |
+
+
+```go
+func mapStream(
+ in chan int,
+ out chan int,
+ f func(int) int,
+) {
+ s := stream.New().WithMaxGoroutines(10)
+ for elem := range in {
+ elem := elem
+ s.Go(func() stream.Callback {
+ res := f(elem)
+ return func() { out <- res }
+ })
+ }
+ s.Wait()
+}
+```
+ |
+
+
+
+# Status
+
+This package is currently pre-1.0. There are likely to be minor breaking
+changes before a 1.0 release as we stabilize the APIs and tweak defaults.
+Please open an issue if you have questions, concerns, or requests that you'd
+like addressed before the 1.0 release. Currently, a 1.0 is targeted for
+March 2023.
diff --git a/vendor/github.com/sourcegraph/conc/internal/multierror/multierror_go119.go b/vendor/github.com/sourcegraph/conc/internal/multierror/multierror_go119.go
new file mode 100644
index 0000000000000..7087e32a8f438
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/internal/multierror/multierror_go119.go
@@ -0,0 +1,10 @@
+//go:build !go1.20
+// +build !go1.20
+
+package multierror
+
+import "go.uber.org/multierr"
+
+var (
+ Join = multierr.Combine
+)
diff --git a/vendor/github.com/sourcegraph/conc/internal/multierror/multierror_go120.go b/vendor/github.com/sourcegraph/conc/internal/multierror/multierror_go120.go
new file mode 100644
index 0000000000000..39cff829aceeb
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/internal/multierror/multierror_go120.go
@@ -0,0 +1,10 @@
+//go:build go1.20
+// +build go1.20
+
+package multierror
+
+import "errors"
+
+var (
+ Join = errors.Join
+)
diff --git a/vendor/github.com/sourcegraph/conc/panics/panics.go b/vendor/github.com/sourcegraph/conc/panics/panics.go
new file mode 100644
index 0000000000000..abbed7fa058f9
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/panics/panics.go
@@ -0,0 +1,102 @@
+package panics
+
+import (
+ "fmt"
+ "runtime"
+ "runtime/debug"
+ "sync/atomic"
+)
+
+// Catcher is used to catch panics. You can execute a function with Try,
+// which will catch any spawned panic. Try can be called any number of times,
+// from any number of goroutines. Once all calls to Try have completed, you can
+// get the value of the first panic (if any) with Recovered(), or you can just
+// propagate the panic (re-panic) with Repanic().
+type Catcher struct {
+ recovered atomic.Pointer[Recovered]
+}
+
+// Try executes f, catching any panic it might spawn. It is safe
+// to call from multiple goroutines simultaneously.
+func (p *Catcher) Try(f func()) {
+ defer p.tryRecover()
+ f()
+}
+
+func (p *Catcher) tryRecover() {
+ if val := recover(); val != nil {
+ rp := NewRecovered(1, val)
+ p.recovered.CompareAndSwap(nil, &rp)
+ }
+}
+
+// Repanic panics if any calls to Try caught a panic. It will panic with the
+// value of the first panic caught, wrapped in a panics.Recovered with caller
+// information.
+func (p *Catcher) Repanic() {
+ if val := p.Recovered(); val != nil {
+ panic(val)
+ }
+}
+
+// Recovered returns the value of the first panic caught by Try, or nil if
+// no calls to Try panicked.
+func (p *Catcher) Recovered() *Recovered {
+ return p.recovered.Load()
+}
+
+// NewRecovered creates a panics.Recovered from a panic value and a collected
+// stacktrace. The skip parameter allows the caller to skip stack frames when
+// collecting the stacktrace. Calling with a skip of 0 means include the call to
+// NewRecovered in the stacktrace.
+func NewRecovered(skip int, value any) Recovered {
+ // 64 frames should be plenty
+ var callers [64]uintptr
+ n := runtime.Callers(skip+1, callers[:])
+ return Recovered{
+ Value: value,
+ Callers: callers[:n],
+ Stack: debug.Stack(),
+ }
+}
+
+// Recovered is a panic that was caught with recover().
+type Recovered struct {
+ // The original value of the panic.
+ Value any
+ // The caller list as returned by runtime.Callers when the panic was
+ // recovered. Can be used to produce a more detailed stack information with
+ // runtime.CallersFrames.
+ Callers []uintptr
+ // The formatted stacktrace from the goroutine where the panic was recovered.
+ // Easier to use than Callers.
+ Stack []byte
+}
+
+// String renders a human-readable formatting of the panic.
+func (p *Recovered) String() string {
+ return fmt.Sprintf("panic: %v\nstacktrace:\n%s\n", p.Value, p.Stack)
+}
+
+// AsError casts the panic into an error implementation. The implementation
+// is unwrappable with the cause of the panic, if the panic was provided one.
+func (p *Recovered) AsError() error {
+ if p == nil {
+ return nil
+ }
+ return &ErrRecovered{*p}
+}
+
+// ErrRecovered wraps a panics.Recovered in an error implementation.
+type ErrRecovered struct{ Recovered }
+
+var _ error = (*ErrRecovered)(nil)
+
+func (p *ErrRecovered) Error() string { return p.String() }
+
+func (p *ErrRecovered) Unwrap() error {
+ if err, ok := p.Value.(error); ok {
+ return err
+ }
+ return nil
+}
diff --git a/vendor/github.com/sourcegraph/conc/panics/try.go b/vendor/github.com/sourcegraph/conc/panics/try.go
new file mode 100644
index 0000000000000..4ded92a1cb66a
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/panics/try.go
@@ -0,0 +1,11 @@
+package panics
+
+// Try executes f, catching and returning any panic it might spawn.
+//
+// The recovered panic can be propagated with panic(), or handled as a normal error with
+// (*panics.Recovered).AsError().
+func Try(f func()) *Recovered {
+ var c Catcher
+ c.Try(f)
+ return c.Recovered()
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/context_pool.go b/vendor/github.com/sourcegraph/conc/pool/context_pool.go
new file mode 100644
index 0000000000000..b2d7f8a03d440
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/context_pool.go
@@ -0,0 +1,94 @@
+package pool
+
+import (
+ "context"
+)
+
+// ContextPool is a pool that runs tasks that take a context.
+// A new ContextPool should be created with `New().WithContext(ctx)`.
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+type ContextPool struct {
+ errorPool ErrorPool
+
+ ctx context.Context
+ cancel context.CancelFunc
+
+ cancelOnError bool
+}
+
+// Go submits a task. If it returns an error, the error will be
+// collected and returned by Wait(). If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *ContextPool) Go(f func(ctx context.Context) error) {
+ p.errorPool.Go(func() error {
+ if p.cancelOnError {
+ // If we are cancelling on error, then we also want to cancel if a
+ // panic is raised. To do this, we need to recover, cancel, and then
+ // re-throw the caught panic.
+ defer func() {
+ if r := recover(); r != nil {
+ p.cancel()
+ panic(r)
+ }
+ }()
+ }
+
+ err := f(p.ctx)
+ if err != nil && p.cancelOnError {
+ // Leaky abstraction warning: We add the error directly because
+ // otherwise, canceling could cause another goroutine to exit and
+ // return an error before this error was added, which breaks the
+ // expectations of WithFirstError().
+ p.errorPool.addErr(err)
+ p.cancel()
+ return nil
+ }
+ return err
+ })
+}
+
+// Wait cleans up all spawned goroutines, propagates any panics, and
+// returns an error if any of the tasks errored.
+func (p *ContextPool) Wait() error {
+ // Make sure we call cancel after pool is done to avoid memory leakage.
+ defer p.cancel()
+ return p.errorPool.Wait()
+}
+
+// WithFirstError configures the pool to only return the first error
+// returned by a task. By default, Wait() will return a combined error.
+// This is particularly useful for (*ContextPool).WithCancelOnError(),
+// where all errors after the first are likely to be context.Canceled.
+func (p *ContextPool) WithFirstError() *ContextPool {
+ p.panicIfInitialized()
+ p.errorPool.WithFirstError()
+ return p
+}
+
+// WithCancelOnError configures the pool to cancel its context as soon as
+// any task returns an error or panics. By default, the pool's context is not
+// canceled until the parent context is canceled.
+//
+// In this case, all errors returned from the pool after the first will
+// likely be context.Canceled - you may want to also use
+// (*ContextPool).WithFirstError() to configure the pool to only return
+// the first error.
+func (p *ContextPool) WithCancelOnError() *ContextPool {
+ p.panicIfInitialized()
+ p.cancelOnError = true
+ return p
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *ContextPool) WithMaxGoroutines(n int) *ContextPool {
+ p.panicIfInitialized()
+ p.errorPool.WithMaxGoroutines(n)
+ return p
+}
+
+func (p *ContextPool) panicIfInitialized() {
+ p.errorPool.panicIfInitialized()
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/error_pool.go b/vendor/github.com/sourcegraph/conc/pool/error_pool.go
new file mode 100644
index 0000000000000..6e5aa99f2bf87
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/error_pool.go
@@ -0,0 +1,97 @@
+package pool
+
+import (
+ "context"
+ "sync"
+
+ "github.com/sourcegraph/conc/internal/multierror"
+)
+
+// ErrorPool is a pool that runs tasks that may return an error.
+// Errors are collected and returned by Wait().
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+//
+// A new ErrorPool should be created using `New().WithErrors()`.
+type ErrorPool struct {
+ pool Pool
+
+ onlyFirstError bool
+
+ mu sync.Mutex
+ errs error
+}
+
+// Go submits a task to the pool. If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *ErrorPool) Go(f func() error) {
+ p.pool.Go(func() {
+ p.addErr(f())
+ })
+}
+
+// Wait cleans up any spawned goroutines, propagating any panics and
+// returning any errors from tasks.
+func (p *ErrorPool) Wait() error {
+ p.pool.Wait()
+ return p.errs
+}
+
+// WithContext converts the pool to a ContextPool for tasks that should
+// run under the same context, such that they each respect shared cancellation.
+// For example, WithCancelOnError can be configured on the returned pool to
+// signal that all goroutines should be cancelled upon the first error.
+func (p *ErrorPool) WithContext(ctx context.Context) *ContextPool {
+ p.panicIfInitialized()
+ ctx, cancel := context.WithCancel(ctx)
+ return &ContextPool{
+ errorPool: p.deref(),
+ ctx: ctx,
+ cancel: cancel,
+ }
+}
+
+// WithFirstError configures the pool to only return the first error
+// returned by a task. By default, Wait() will return a combined error.
+func (p *ErrorPool) WithFirstError() *ErrorPool {
+ p.panicIfInitialized()
+ p.onlyFirstError = true
+ return p
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *ErrorPool) WithMaxGoroutines(n int) *ErrorPool {
+ p.panicIfInitialized()
+ p.pool.WithMaxGoroutines(n)
+ return p
+}
+
+// deref is a helper that creates a shallow copy of the pool with the same
+// settings. We don't want to just dereference the pointer because that makes
+// the copylock lint angry.
+func (p *ErrorPool) deref() ErrorPool {
+ return ErrorPool{
+ pool: p.pool.deref(),
+ onlyFirstError: p.onlyFirstError,
+ }
+}
+
+func (p *ErrorPool) panicIfInitialized() {
+ p.pool.panicIfInitialized()
+}
+
+func (p *ErrorPool) addErr(err error) {
+ if err != nil {
+ p.mu.Lock()
+ if p.onlyFirstError {
+ if p.errs == nil {
+ p.errs = err
+ }
+ } else {
+ p.errs = multierror.Join(p.errs, err)
+ }
+ p.mu.Unlock()
+ }
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/pool.go b/vendor/github.com/sourcegraph/conc/pool/pool.go
new file mode 100644
index 0000000000000..b63eb1984a4ff
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/pool.go
@@ -0,0 +1,168 @@
+package pool
+
+import (
+ "context"
+ "sync"
+
+ "github.com/sourcegraph/conc"
+)
+
+// New creates a new Pool.
+func New() *Pool {
+ return &Pool{}
+}
+
+// Pool is a pool of goroutines used to execute tasks concurrently.
+//
+// Tasks are submitted with Go(). Once all your tasks have been submitted, you
+// must call Wait() to clean up any spawned goroutines and propagate any
+// panics.
+//
+// Goroutines are started lazily, so creating a new pool is cheap. There will
+// never be more goroutines spawned than there are tasks submitted.
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+//
+// Pool is efficient, but not zero cost. It should not be used for very short
+// tasks. Startup and teardown come with an overhead of around 1µs, and each
+// task has an overhead of around 300ns.
+type Pool struct {
+ handle conc.WaitGroup
+ limiter limiter
+ tasks chan func()
+ initOnce sync.Once
+}
+
+// Go submits a task to be run in the pool. If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *Pool) Go(f func()) {
+ p.init()
+
+ if p.limiter == nil {
+ // No limit on the number of goroutines.
+ select {
+ case p.tasks <- f:
+ // A goroutine was available to handle the task.
+ default:
+ // No goroutine was available to handle the task.
+ // Spawn a new one and send it the task.
+ p.handle.Go(p.worker)
+ p.tasks <- f
+ }
+ } else {
+ select {
+ case p.limiter <- struct{}{}:
+ // If we are below our limit, spawn a new worker rather
+ // than waiting for one to become available.
+ p.handle.Go(p.worker)
+
+ // We know there is at least one worker running, so wait
+ // for it to become available. This ensures we never spawn
+ // more workers than the number of tasks.
+ p.tasks <- f
+ case p.tasks <- f:
+ // A worker is available and has accepted the task.
+ return
+ }
+ }
+
+}
+
+// Wait cleans up spawned goroutines, propagating any panics that were
+// raised by a tasks.
+func (p *Pool) Wait() {
+ p.init()
+
+ close(p.tasks)
+
+ p.handle.Wait()
+}
+
+// MaxGoroutines returns the maximum size of the pool.
+func (p *Pool) MaxGoroutines() int {
+ return p.limiter.limit()
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *Pool) WithMaxGoroutines(n int) *Pool {
+ p.panicIfInitialized()
+ if n < 1 {
+ panic("max goroutines in a pool must be greater than zero")
+ }
+ p.limiter = make(limiter, n)
+ return p
+}
+
+// init ensures that the pool is initialized before use. This makes the
+// zero value of the pool usable.
+func (p *Pool) init() {
+ p.initOnce.Do(func() {
+ p.tasks = make(chan func())
+ })
+}
+
+// panicIfInitialized will trigger a panic if a configuration method is called
+// after the pool has started any goroutines for the first time. In the case that
+// new settings are needed, a new pool should be created.
+func (p *Pool) panicIfInitialized() {
+ if p.tasks != nil {
+ panic("pool can not be reconfigured after calling Go() for the first time")
+ }
+}
+
+// WithErrors converts the pool to an ErrorPool so the submitted tasks can
+// return errors.
+func (p *Pool) WithErrors() *ErrorPool {
+ p.panicIfInitialized()
+ return &ErrorPool{
+ pool: p.deref(),
+ }
+}
+
+// deref is a helper that creates a shallow copy of the pool with the same
+// settings. We don't want to just dereference the pointer because that makes
+// the copylock lint angry.
+func (p *Pool) deref() Pool {
+ p.panicIfInitialized()
+ return Pool{
+ limiter: p.limiter,
+ }
+}
+
+// WithContext converts the pool to a ContextPool for tasks that should
+// run under the same context, such that they each respect shared cancellation.
+// For example, WithCancelOnError can be configured on the returned pool to
+// signal that all goroutines should be cancelled upon the first error.
+func (p *Pool) WithContext(ctx context.Context) *ContextPool {
+ p.panicIfInitialized()
+ ctx, cancel := context.WithCancel(ctx)
+ return &ContextPool{
+ errorPool: p.WithErrors().deref(),
+ ctx: ctx,
+ cancel: cancel,
+ }
+}
+
+func (p *Pool) worker() {
+ // The only time this matters is if the task panics.
+ // This makes it possible to spin up new workers in that case.
+ defer p.limiter.release()
+
+ for f := range p.tasks {
+ f()
+ }
+}
+
+type limiter chan struct{}
+
+func (l limiter) limit() int {
+ return cap(l)
+}
+
+func (l limiter) release() {
+ if l != nil {
+ <-l
+ }
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go
new file mode 100644
index 0000000000000..55dc3bc21f90c
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/result_context_pool.go
@@ -0,0 +1,75 @@
+package pool
+
+import (
+ "context"
+)
+
+// ResultContextPool is a pool that runs tasks that take a context and return a
+// result. The context passed to the task will be canceled if any of the tasks
+// return an error, which makes its functionality different than just capturing
+// a context with the task closure.
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+type ResultContextPool[T any] struct {
+ contextPool ContextPool
+ agg resultAggregator[T]
+ collectErrored bool
+}
+
+// Go submits a task to the pool. If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) {
+ p.contextPool.Go(func(ctx context.Context) error {
+ res, err := f(ctx)
+ if err == nil || p.collectErrored {
+ p.agg.add(res)
+ }
+ return err
+ })
+}
+
+// Wait cleans up all spawned goroutines, propagates any panics, and
+// returns an error if any of the tasks errored.
+func (p *ResultContextPool[T]) Wait() ([]T, error) {
+ err := p.contextPool.Wait()
+ return p.agg.results, err
+}
+
+// WithCollectErrored configures the pool to still collect the result of a task
+// even if the task returned an error. By default, the result of tasks that errored
+// are ignored and only the error is collected.
+func (p *ResultContextPool[T]) WithCollectErrored() *ResultContextPool[T] {
+ p.panicIfInitialized()
+ p.collectErrored = true
+ return p
+}
+
+// WithFirstError configures the pool to only return the first error
+// returned by a task. By default, Wait() will return a combined error.
+func (p *ResultContextPool[T]) WithFirstError() *ResultContextPool[T] {
+ p.panicIfInitialized()
+ p.contextPool.WithFirstError()
+ return p
+}
+
+// WithCancelOnError configures the pool to cancel its context as soon as
+// any task returns an error. By default, the pool's context is not
+// canceled until the parent context is canceled.
+func (p *ResultContextPool[T]) WithCancelOnError() *ResultContextPool[T] {
+ p.panicIfInitialized()
+ p.contextPool.WithCancelOnError()
+ return p
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *ResultContextPool[T]) WithMaxGoroutines(n int) *ResultContextPool[T] {
+ p.panicIfInitialized()
+ p.contextPool.WithMaxGoroutines(n)
+ return p
+}
+
+func (p *ResultContextPool[T]) panicIfInitialized() {
+ p.contextPool.panicIfInitialized()
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go
new file mode 100644
index 0000000000000..4caaadc4e3203
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/result_error_pool.go
@@ -0,0 +1,80 @@
+package pool
+
+import (
+ "context"
+)
+
+// ResultErrorPool is a pool that executes tasks that return a generic result
+// type and an error. Tasks are executed in the pool with Go(), then the
+// results of the tasks are returned by Wait().
+//
+// The order of the results is not guaranteed to be the same as the order the
+// tasks were submitted. If your use case requires consistent ordering,
+// consider using the `stream` package or `Map` from the `iter` package.
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+type ResultErrorPool[T any] struct {
+ errorPool ErrorPool
+ agg resultAggregator[T]
+ collectErrored bool
+}
+
+// Go submits a task to the pool. If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *ResultErrorPool[T]) Go(f func() (T, error)) {
+ p.errorPool.Go(func() error {
+ res, err := f()
+ if err == nil || p.collectErrored {
+ p.agg.add(res)
+ }
+ return err
+ })
+}
+
+// Wait cleans up any spawned goroutines, propagating any panics and
+// returning the results and any errors from tasks.
+func (p *ResultErrorPool[T]) Wait() ([]T, error) {
+ err := p.errorPool.Wait()
+ return p.agg.results, err
+}
+
+// WithCollectErrored configures the pool to still collect the result of a task
+// even if the task returned an error. By default, the result of tasks that errored
+// are ignored and only the error is collected.
+func (p *ResultErrorPool[T]) WithCollectErrored() *ResultErrorPool[T] {
+ p.panicIfInitialized()
+ p.collectErrored = true
+ return p
+}
+
+// WithContext converts the pool to a ResultContextPool for tasks that should
+// run under the same context, such that they each respect shared cancellation.
+// For example, WithCancelOnError can be configured on the returned pool to
+// signal that all goroutines should be cancelled upon the first error.
+func (p *ResultErrorPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] {
+ p.panicIfInitialized()
+ return &ResultContextPool[T]{
+ contextPool: *p.errorPool.WithContext(ctx),
+ }
+}
+
+// WithFirstError configures the pool to only return the first error
+// returned by a task. By default, Wait() will return a combined error.
+func (p *ResultErrorPool[T]) WithFirstError() *ResultErrorPool[T] {
+ p.panicIfInitialized()
+ p.errorPool.WithFirstError()
+ return p
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *ResultErrorPool[T]) WithMaxGoroutines(n int) *ResultErrorPool[T] {
+ p.panicIfInitialized()
+ p.errorPool.WithMaxGoroutines(n)
+ return p
+}
+
+func (p *ResultErrorPool[T]) panicIfInitialized() {
+ p.errorPool.panicIfInitialized()
+}
diff --git a/vendor/github.com/sourcegraph/conc/pool/result_pool.go b/vendor/github.com/sourcegraph/conc/pool/result_pool.go
new file mode 100644
index 0000000000000..ea304cbb7165d
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/pool/result_pool.go
@@ -0,0 +1,93 @@
+package pool
+
+import (
+ "context"
+ "sync"
+)
+
+// NewWithResults creates a new ResultPool for tasks with a result of type T.
+//
+// The configuration methods (With*) will panic if they are used after calling
+// Go() for the first time.
+func NewWithResults[T any]() *ResultPool[T] {
+ return &ResultPool[T]{
+ pool: *New(),
+ }
+}
+
+// ResultPool is a pool that executes tasks that return a generic result type.
+// Tasks are executed in the pool with Go(), then the results of the tasks are
+// returned by Wait().
+//
+// The order of the results is not guaranteed to be the same as the order the
+// tasks were submitted. If your use case requires consistent ordering,
+// consider using the `stream` package or `Map` from the `iter` package.
+type ResultPool[T any] struct {
+ pool Pool
+ agg resultAggregator[T]
+}
+
+// Go submits a task to the pool. If all goroutines in the pool
+// are busy, a call to Go() will block until the task can be started.
+func (p *ResultPool[T]) Go(f func() T) {
+ p.pool.Go(func() {
+ p.agg.add(f())
+ })
+}
+
+// Wait cleans up all spawned goroutines, propagating any panics, and returning
+// a slice of results from tasks that did not panic.
+func (p *ResultPool[T]) Wait() []T {
+ p.pool.Wait()
+ return p.agg.results
+}
+
+// MaxGoroutines returns the maximum size of the pool.
+func (p *ResultPool[T]) MaxGoroutines() int {
+ return p.pool.MaxGoroutines()
+}
+
+// WithErrors converts the pool to an ResultErrorPool so the submitted tasks
+// can return errors.
+func (p *ResultPool[T]) WithErrors() *ResultErrorPool[T] {
+ p.panicIfInitialized()
+ return &ResultErrorPool[T]{
+ errorPool: *p.pool.WithErrors(),
+ }
+}
+
+// WithContext converts the pool to a ResultContextPool for tasks that should
+// run under the same context, such that they each respect shared cancellation.
+// For example, WithCancelOnError can be configured on the returned pool to
+// signal that all goroutines should be cancelled upon the first error.
+func (p *ResultPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] {
+ p.panicIfInitialized()
+ return &ResultContextPool[T]{
+ contextPool: *p.pool.WithContext(ctx),
+ }
+}
+
+// WithMaxGoroutines limits the number of goroutines in a pool.
+// Defaults to unlimited. Panics if n < 1.
+func (p *ResultPool[T]) WithMaxGoroutines(n int) *ResultPool[T] {
+ p.panicIfInitialized()
+ p.pool.WithMaxGoroutines(n)
+ return p
+}
+
+func (p *ResultPool[T]) panicIfInitialized() {
+ p.pool.panicIfInitialized()
+}
+
+// resultAggregator is a utility type that lets us safely append from multiple
+// goroutines. The zero value is valid and ready to use.
+type resultAggregator[T any] struct {
+ mu sync.Mutex
+ results []T
+}
+
+func (r *resultAggregator[T]) add(res T) {
+ r.mu.Lock()
+ r.results = append(r.results, res)
+ r.mu.Unlock()
+}
diff --git a/vendor/github.com/sourcegraph/conc/waitgroup.go b/vendor/github.com/sourcegraph/conc/waitgroup.go
new file mode 100644
index 0000000000000..47b1bc1a5cb44
--- /dev/null
+++ b/vendor/github.com/sourcegraph/conc/waitgroup.go
@@ -0,0 +1,52 @@
+package conc
+
+import (
+ "sync"
+
+ "github.com/sourcegraph/conc/panics"
+)
+
+// NewWaitGroup creates a new WaitGroup.
+func NewWaitGroup() *WaitGroup {
+ return &WaitGroup{}
+}
+
+// WaitGroup is the primary building block for scoped concurrency.
+// Goroutines can be spawned in the WaitGroup with the Go method,
+// and calling Wait() will ensure that each of those goroutines exits
+// before continuing. Any panics in a child goroutine will be caught
+// and propagated to the caller of Wait().
+//
+// The zero value of WaitGroup is usable, just like sync.WaitGroup.
+// Also like sync.WaitGroup, it must not be copied after first use.
+type WaitGroup struct {
+ wg sync.WaitGroup
+ pc panics.Catcher
+}
+
+// Go spawns a new goroutine in the WaitGroup.
+func (h *WaitGroup) Go(f func()) {
+ h.wg.Add(1)
+ go func() {
+ defer h.wg.Done()
+ h.pc.Try(f)
+ }()
+}
+
+// Wait will block until all goroutines spawned with Go exit and will
+// propagate any panics spawned in a child goroutine.
+func (h *WaitGroup) Wait() {
+ h.wg.Wait()
+
+ // Propagate a panic if we caught one from a child goroutine.
+ h.pc.Repanic()
+}
+
+// WaitAndRecover will block until all goroutines spawned with Go exit and
+// will return a *panics.Recovered if one of the child goroutines panics.
+func (h *WaitGroup) WaitAndRecover() *panics.Recovered {
+ h.wg.Wait()
+
+ // Return a recovered panic if we caught one from a child goroutine.
+ return h.pc.Recovered()
+}
diff --git a/vendor/go.uber.org/multierr/CHANGELOG.md b/vendor/go.uber.org/multierr/CHANGELOG.md
index 3ba05276f1a85..d2c8aadaf0e4f 100644
--- a/vendor/go.uber.org/multierr/CHANGELOG.md
+++ b/vendor/go.uber.org/multierr/CHANGELOG.md
@@ -1,6 +1,14 @@
Releases
========
+v1.9.0 (2022-12-12)
+===================
+
+- Add `AppendFunc` that allow passsing functions to similar to
+ `AppendInvoke`.
+
+- Bump up yaml.v3 dependency to 3.0.1.
+
v1.8.0 (2022-02-28)
===================
diff --git a/vendor/go.uber.org/multierr/error.go b/vendor/go.uber.org/multierr/error.go
index f45af149c1025..cdd91ae56d92f 100644
--- a/vendor/go.uber.org/multierr/error.go
+++ b/vendor/go.uber.org/multierr/error.go
@@ -20,106 +20,109 @@
// Package multierr allows combining one or more errors together.
//
-// Overview
+// # Overview
//
// Errors can be combined with the use of the Combine function.
//
-// multierr.Combine(
-// reader.Close(),
-// writer.Close(),
-// conn.Close(),
-// )
+// multierr.Combine(
+// reader.Close(),
+// writer.Close(),
+// conn.Close(),
+// )
//
// If only two errors are being combined, the Append function may be used
// instead.
//
-// err = multierr.Append(reader.Close(), writer.Close())
+// err = multierr.Append(reader.Close(), writer.Close())
//
// The underlying list of errors for a returned error object may be retrieved
// with the Errors function.
//
-// errors := multierr.Errors(err)
-// if len(errors) > 0 {
-// fmt.Println("The following errors occurred:", errors)
-// }
+// errors := multierr.Errors(err)
+// if len(errors) > 0 {
+// fmt.Println("The following errors occurred:", errors)
+// }
//
-// Appending from a loop
+// # Appending from a loop
//
// You sometimes need to append into an error from a loop.
//
-// var err error
-// for _, item := range items {
-// err = multierr.Append(err, process(item))
-// }
+// var err error
+// for _, item := range items {
+// err = multierr.Append(err, process(item))
+// }
//
// Cases like this may require knowledge of whether an individual instance
// failed. This usually requires introduction of a new variable.
//
-// var err error
-// for _, item := range items {
-// if perr := process(item); perr != nil {
-// log.Warn("skipping item", item)
-// err = multierr.Append(err, perr)
-// }
-// }
+// var err error
+// for _, item := range items {
+// if perr := process(item); perr != nil {
+// log.Warn("skipping item", item)
+// err = multierr.Append(err, perr)
+// }
+// }
//
// multierr includes AppendInto to simplify cases like this.
//
-// var err error
-// for _, item := range items {
-// if multierr.AppendInto(&err, process(item)) {
-// log.Warn("skipping item", item)
-// }
-// }
+// var err error
+// for _, item := range items {
+// if multierr.AppendInto(&err, process(item)) {
+// log.Warn("skipping item", item)
+// }
+// }
//
// This will append the error into the err variable, and return true if that
// individual error was non-nil.
//
-// See AppendInto for more information.
+// See [AppendInto] for more information.
//
-// Deferred Functions
+// # Deferred Functions
//
// Go makes it possible to modify the return value of a function in a defer
// block if the function was using named returns. This makes it possible to
// record resource cleanup failures from deferred blocks.
//
-// func sendRequest(req Request) (err error) {
-// conn, err := openConnection()
-// if err != nil {
-// return err
-// }
-// defer func() {
-// err = multierr.Append(err, conn.Close())
-// }()
-// // ...
-// }
+// func sendRequest(req Request) (err error) {
+// conn, err := openConnection()
+// if err != nil {
+// return err
+// }
+// defer func() {
+// err = multierr.Append(err, conn.Close())
+// }()
+// // ...
+// }
//
// multierr provides the Invoker type and AppendInvoke function to make cases
// like the above simpler and obviate the need for a closure. The following is
// roughly equivalent to the example above.
//
-// func sendRequest(req Request) (err error) {
-// conn, err := openConnection()
-// if err != nil {
-// return err
-// }
-// defer multierr.AppendInvoke(&err, multierr.Close(conn))
-// // ...
-// }
+// func sendRequest(req Request) (err error) {
+// conn, err := openConnection()
+// if err != nil {
+// return err
+// }
+// defer multierr.AppendInvoke(&err, multierr.Close(conn))
+// // ...
+// }
//
-// See AppendInvoke and Invoker for more information.
+// See [AppendInvoke] and [Invoker] for more information.
//
-// Advanced Usage
+// NOTE: If you're modifying an error from inside a defer, you MUST use a named
+// return value for that function.
+//
+// # Advanced Usage
//
// Errors returned by Combine and Append MAY implement the following
// interface.
//
-// type errorGroup interface {
-// // Returns a slice containing the underlying list of errors.
-// //
-// // This slice MUST NOT be modified by the caller.
-// Errors() []error
-// }
+// type errorGroup interface {
+// // Returns a slice containing the underlying list of errors.
+// //
+// // This slice MUST NOT be modified by the caller.
+// Errors() []error
+// }
//
// Note that if you need access to list of errors behind a multierr error, you
// should prefer using the Errors function. That said, if you need cheap
@@ -128,13 +131,13 @@
// because errors returned by Combine and Append are not guaranteed to
// implement this interface.
//
-// var errors []error
-// group, ok := err.(errorGroup)
-// if ok {
-// errors = group.Errors()
-// } else {
-// errors = []error{err}
-// }
+// var errors []error
+// group, ok := err.(errorGroup)
+// if ok {
+// errors = group.Errors()
+// } else {
+// errors = []error{err}
+// }
package multierr // import "go.uber.org/multierr"
import (
@@ -185,8 +188,8 @@ type errorGroup interface {
// Errors returns a slice containing zero or more errors that the supplied
// error is composed of. If the error is nil, a nil slice is returned.
//
-// err := multierr.Append(r.Close(), w.Close())
-// errors := multierr.Errors(err)
+// err := multierr.Append(r.Close(), w.Close())
+// errors := multierr.Errors(err)
//
// If the error is not composed of other errors, the returned slice contains
// just the error that was passed in.
@@ -209,10 +212,7 @@ func Errors(err error) []error {
return []error{err}
}
- errors := eg.Errors()
- result := make([]error, len(errors))
- copy(result, errors)
- return result
+ return append(([]error)(nil), eg.Errors()...)
}
// multiError is an error that holds one or more errors.
@@ -393,8 +393,7 @@ func fromSlice(errors []error) error {
// Otherwise "errors" escapes to the heap
// unconditionally for all other cases.
// This lets us optimize for the "no errors" case.
- out := make([]error, len(errors))
- copy(out, errors)
+ out := append(([]error)(nil), errors...)
return &multiError{errors: out}
}
}
@@ -420,32 +419,32 @@ func fromSlice(errors []error) error {
// If zero arguments were passed or if all items are nil, a nil error is
// returned.
//
-// Combine(nil, nil) // == nil
+// Combine(nil, nil) // == nil
//
// If only a single error was passed, it is returned as-is.
//
-// Combine(err) // == err
+// Combine(err) // == err
//
// Combine skips over nil arguments so this function may be used to combine
// together errors from operations that fail independently of each other.
//
-// multierr.Combine(
-// reader.Close(),
-// writer.Close(),
-// pipe.Close(),
-// )
+// multierr.Combine(
+// reader.Close(),
+// writer.Close(),
+// pipe.Close(),
+// )
//
// If any of the passed errors is a multierr error, it will be flattened along
// with the other errors.
//
-// multierr.Combine(multierr.Combine(err1, err2), err3)
-// // is the same as
-// multierr.Combine(err1, err2, err3)
+// multierr.Combine(multierr.Combine(err1, err2), err3)
+// // is the same as
+// multierr.Combine(err1, err2, err3)
//
// The returned error formats into a readable multi-line error message if
// formatted with %+v.
//
-// fmt.Sprintf("%+v", multierr.Combine(err1, err2))
+// fmt.Sprintf("%+v", multierr.Combine(err1, err2))
func Combine(errors ...error) error {
return fromSlice(errors)
}
@@ -455,16 +454,19 @@ func Combine(errors ...error) error {
// This function is a specialization of Combine for the common case where
// there are only two errors.
//
-// err = multierr.Append(reader.Close(), writer.Close())
+// err = multierr.Append(reader.Close(), writer.Close())
//
// The following pattern may also be used to record failure of deferred
// operations without losing information about the original error.
//
-// func doSomething(..) (err error) {
-// f := acquireResource()
-// defer func() {
-// err = multierr.Append(err, f.Close())
-// }()
+// func doSomething(..) (err error) {
+// f := acquireResource()
+// defer func() {
+// err = multierr.Append(err, f.Close())
+// }()
+//
+// Note that the variable MUST be a named return to append an error to it from
+// the defer statement. See also [AppendInvoke].
func Append(left error, right error) error {
switch {
case left == nil:
@@ -494,37 +496,37 @@ func Append(left error, right error) error {
// AppendInto appends an error into the destination of an error pointer and
// returns whether the error being appended was non-nil.
//
-// var err error
-// multierr.AppendInto(&err, r.Close())
-// multierr.AppendInto(&err, w.Close())
+// var err error
+// multierr.AppendInto(&err, r.Close())
+// multierr.AppendInto(&err, w.Close())
//
// The above is equivalent to,
//
-// err := multierr.Append(r.Close(), w.Close())
+// err := multierr.Append(r.Close(), w.Close())
//
// As AppendInto reports whether the provided error was non-nil, it may be
// used to build a multierr error in a loop more ergonomically. For example:
//
-// var err error
-// for line := range lines {
-// var item Item
-// if multierr.AppendInto(&err, parse(line, &item)) {
-// continue
-// }
-// items = append(items, item)
-// }
+// var err error
+// for line := range lines {
+// var item Item
+// if multierr.AppendInto(&err, parse(line, &item)) {
+// continue
+// }
+// items = append(items, item)
+// }
//
// Compare this with a version that relies solely on Append:
//
-// var err error
-// for line := range lines {
-// var item Item
-// if parseErr := parse(line, &item); parseErr != nil {
-// err = multierr.Append(err, parseErr)
-// continue
-// }
-// items = append(items, item)
-// }
+// var err error
+// for line := range lines {
+// var item Item
+// if parseErr := parse(line, &item); parseErr != nil {
+// err = multierr.Append(err, parseErr)
+// continue
+// }
+// items = append(items, item)
+// }
func AppendInto(into *error, err error) (errored bool) {
if into == nil {
// We panic if 'into' is nil. This is not documented above
@@ -545,7 +547,7 @@ func AppendInto(into *error, err error) (errored bool) {
// AppendInvoke to append the result of calling the function into an error.
// This allows you to conveniently defer capture of failing operations.
//
-// See also, Close and Invoke.
+// See also, [Close] and [Invoke].
type Invoker interface {
Invoke() error
}
@@ -556,19 +558,22 @@ type Invoker interface {
//
// For example,
//
-// func processReader(r io.Reader) (err error) {
-// scanner := bufio.NewScanner(r)
-// defer multierr.AppendInvoke(&err, multierr.Invoke(scanner.Err))
-// for scanner.Scan() {
-// // ...
-// }
-// // ...
-// }
+// func processReader(r io.Reader) (err error) {
+// scanner := bufio.NewScanner(r)
+// defer multierr.AppendInvoke(&err, multierr.Invoke(scanner.Err))
+// for scanner.Scan() {
+// // ...
+// }
+// // ...
+// }
//
// In this example, the following line will construct the Invoker right away,
// but defer the invocation of scanner.Err() until the function returns.
//
-// defer multierr.AppendInvoke(&err, multierr.Invoke(scanner.Err))
+// defer multierr.AppendInvoke(&err, multierr.Invoke(scanner.Err))
+//
+// Note that the error you're appending to from the defer statement MUST be a
+// named return.
type Invoke func() error
// Invoke calls the supplied function and returns its result.
@@ -579,19 +584,22 @@ func (i Invoke) Invoke() error { return i() }
//
// For example,
//
-// func processFile(path string) (err error) {
-// f, err := os.Open(path)
-// if err != nil {
-// return err
-// }
-// defer multierr.AppendInvoke(&err, multierr.Close(f))
-// return processReader(f)
-// }
+// func processFile(path string) (err error) {
+// f, err := os.Open(path)
+// if err != nil {
+// return err
+// }
+// defer multierr.AppendInvoke(&err, multierr.Close(f))
+// return processReader(f)
+// }
//
// In this example, multierr.Close will construct the Invoker right away, but
// defer the invocation of f.Close until the function returns.
//
-// defer multierr.AppendInvoke(&err, multierr.Close(f))
+// defer multierr.AppendInvoke(&err, multierr.Close(f))
+//
+// Note that the error you're appending to from the defer statement MUST be a
+// named return.
func Close(closer io.Closer) Invoker {
return Invoke(closer.Close)
}
@@ -601,52 +609,73 @@ func Close(closer io.Closer) Invoker {
// invocation of fallible operations until a function returns, and capture the
// resulting errors.
//
-// func doSomething(...) (err error) {
-// // ...
-// f, err := openFile(..)
-// if err != nil {
-// return err
-// }
+// func doSomething(...) (err error) {
+// // ...
+// f, err := openFile(..)
+// if err != nil {
+// return err
+// }
+//
+// // multierr will call f.Close() when this function returns and
+// // if the operation fails, its append its error into the
+// // returned error.
+// defer multierr.AppendInvoke(&err, multierr.Close(f))
//
-// // multierr will call f.Close() when this function returns and
-// // if the operation fails, its append its error into the
-// // returned error.
-// defer multierr.AppendInvoke(&err, multierr.Close(f))
+// scanner := bufio.NewScanner(f)
+// // Similarly, this scheduled scanner.Err to be called and
+// // inspected when the function returns and append its error
+// // into the returned error.
+// defer multierr.AppendInvoke(&err, multierr.Invoke(scanner.Err))
//
-// scanner := bufio.NewScanner(f)
-// // Similarly, this scheduled scanner.Err to be called and
-// // inspected when the function returns and append its error
-// // into the returned error.
-// defer multierr.AppendInvoke(&err, multierr.Invoke(scanner.Err))
+// // ...
+// }
//
-// // ...
-// }
+// NOTE: If used with a defer, the error variable MUST be a named return.
//
// Without defer, AppendInvoke behaves exactly like AppendInto.
//
-// err := // ...
-// multierr.AppendInvoke(&err, mutltierr.Invoke(foo))
+// err := // ...
+// multierr.AppendInvoke(&err, mutltierr.Invoke(foo))
//
-// // ...is roughly equivalent to...
+// // ...is roughly equivalent to...
//
-// err := // ...
-// multierr.AppendInto(&err, foo())
+// err := // ...
+// multierr.AppendInto(&err, foo())
//
// The advantage of the indirection introduced by Invoker is to make it easy
// to defer the invocation of a function. Without this indirection, the
// invoked function will be evaluated at the time of the defer block rather
// than when the function returns.
//
-// // BAD: This is likely not what the caller intended. This will evaluate
-// // foo() right away and append its result into the error when the
-// // function returns.
-// defer multierr.AppendInto(&err, foo())
+// // BAD: This is likely not what the caller intended. This will evaluate
+// // foo() right away and append its result into the error when the
+// // function returns.
+// defer multierr.AppendInto(&err, foo())
//
-// // GOOD: This will defer invocation of foo unutil the function returns.
-// defer multierr.AppendInvoke(&err, multierr.Invoke(foo))
+// // GOOD: This will defer invocation of foo unutil the function returns.
+// defer multierr.AppendInvoke(&err, multierr.Invoke(foo))
//
// multierr provides a few Invoker implementations out of the box for
-// convenience. See Invoker for more information.
+// convenience. See [Invoker] for more information.
func AppendInvoke(into *error, invoker Invoker) {
AppendInto(into, invoker.Invoke())
}
+
+// AppendFunc is a shorthand for [AppendInvoke].
+// It allows using function or method value directly
+// without having to wrap it into an [Invoker] interface.
+//
+// func doSomething(...) (err error) {
+// w, err := startWorker(...)
+// if err != nil {
+// return err
+// }
+//
+// // multierr will call w.Stop() when this function returns and
+// // if the operation fails, it appends its error into the
+// // returned error.
+// defer multierr.AppendFunc(&err, w.Stop)
+// }
+func AppendFunc(into *error, fn func() error) {
+ AppendInvoke(into, Invoke(fn))
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 028a3dcc6141a..e3b3707bf9776 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1325,6 +1325,12 @@ github.com/soheilhy/cmux
# github.com/sony/gobreaker v0.5.0
## explicit; go 1.12
github.com/sony/gobreaker
+# github.com/sourcegraph/conc v0.3.0
+## explicit; go 1.19
+github.com/sourcegraph/conc
+github.com/sourcegraph/conc/internal/multierror
+github.com/sourcegraph/conc/panics
+github.com/sourcegraph/conc/pool
# github.com/spaolacci/murmur3 v1.1.0
## explicit
github.com/spaolacci/murmur3
@@ -1498,8 +1504,8 @@ go.uber.org/atomic
## explicit; go 1.18
go.uber.org/goleak
go.uber.org/goleak/internal/stack
-# go.uber.org/multierr v1.8.0
-## explicit; go 1.14
+# go.uber.org/multierr v1.9.0
+## explicit; go 1.19
go.uber.org/multierr
# go.uber.org/zap v1.21.0
## explicit; go 1.13