Skip to content

Commit 1360193

Browse files
authored
fix data race in config/schema.go (#432)
1 parent 3b2a3e1 commit 1360193

File tree

6 files changed

+46
-11
lines changed

6 files changed

+46
-11
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
if: ${{ github.repository == 'warpstreamlabs/bento' || github.event_name != 'schedule' }}
1414
runs-on: ubuntu-latest
1515
env:
16-
CGO_ENABLED: 0
16+
CGO_ENABLED: 1
1717
steps:
1818

1919
- name: Checkout code

Makefile

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
.PHONY: all serverless deps docker docker-cgo clean docs test test-race test-integration fmt lint install deploy-docs playground
2-
2+
.DEFAULT_GOAL := help
33
TAGS ?=
44

55
GOMAXPROCS ?= 1
@@ -29,12 +29,15 @@ DOCS_FLAGS ?=
2929
APPS = bento
3030
all: $(APPS)
3131

32-
install: $(APPS)
32+
help: ## Show this help
33+
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
34+
35+
install: $(APPS) ## Install binaries to $(INSTALL_DIR)
3336
@install -d $(INSTALL_DIR)
3437
@rm -f $(INSTALL_DIR)/bento
3538
@cp $(PATHINSTBIN)/* $(INSTALL_DIR)/
3639

37-
deps:
40+
deps: ## Go mod tidy
3841
@go mod tidy
3942

4043
SOURCE_FILES = $(shell find internal public cmd -type f)
@@ -85,16 +88,16 @@ fmt:
8588
@go list -f {{.Dir}} ./... | xargs -I{} goimports -w -local github.com/warpstreamlabs/bento {}
8689
@go mod tidy
8790

88-
lint:
91+
lint: ## Run Go linter
8992
@go vet $(GO_FLAGS) ./...
9093
@golangci-lint -j $(GOMAXPROCS) run --fix -c .golangci.yml ./...
9194

92-
test: $(APPS)
95+
test: $(APPS) ## Run tests
9396
@go test $(GO_FLAGS) -ldflags "$(LD_FLAGS)" -timeout 3m ./...
9497
@$(PATHINSTBIN)/bento template lint $(TEMPLATE_FILES)
9598
@$(PATHINSTBIN)/bento test ./config/test/...
9699

97-
test-race: $(APPS)
100+
test-race: $(APPS) ## Run tests with -race
98101
@go test $(GO_FLAGS) -ldflags "$(LD_FLAGS)" -timeout 3m -race ./...
99102

100103
test-integration:

internal/config/schema.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ func (t *Type) GetRawSource() any {
4545
return t.rawSource
4646
}
4747

48-
var httpField = docs.FieldObject(fieldHTTP, "Configures the service-wide HTTP server.").WithChildren(api.Spec()...)
49-
5048
func observabilityFields() docs.FieldSpecs {
5149
defaultMetrics := "none"
5250
if _, exists := bundle.GlobalEnvironment.GetDocs("prometheus", docs.TypeMetrics); exists {
@@ -74,6 +72,8 @@ func errorHandlingFields() docs.FieldSpecs {
7472

7573
// Spec returns a docs.FieldSpec for an entire Bento configuration.
7674
func Spec() docs.FieldSpecs {
75+
var httpField = docs.FieldObject(fieldHTTP, "Configures the service-wide HTTP server.").WithChildren(api.Spec()...)
76+
7777
fields := docs.FieldSpecs{httpField}
7878
fields = append(fields, stream.Spec()...)
7979
fields = append(fields, manager.Spec()...)

internal/config/schema_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package config
2+
3+
import (
4+
"sync"
5+
"testing"
6+
)
7+
8+
func TestSpecIsThreadSafe(t *testing.T) {
9+
t.Parallel()
10+
11+
// use a loop to increase the chance of reproducing the data race
12+
for i := 0; i < 100; i++ {
13+
var wg sync.WaitGroup
14+
goroutines := 2
15+
wg.Add(goroutines)
16+
for j := 0; j < goroutines; j++ {
17+
go func() {
18+
defer wg.Done()
19+
newSpec := Spec()
20+
newSpec.SetDefault(false, "http", "enabled")
21+
}()
22+
}
23+
24+
wg.Wait()
25+
}
26+
}

internal/impl/pure/input_inproc_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ inproc: foo
3737
}
3838

3939
func TestInprocDryRunNoConn(t *testing.T) {
40-
ctx, done := context.WithTimeout(context.Background(), time.Second*30)
40+
ctx, done := context.WithTimeout(context.Background(), time.Second*60)
4141
defer done()
4242

4343
t.Parallel()

internal/impl/pure/processor_workflow_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1028,15 +1028,21 @@ workflow:
10281028
strm, tracer, err := strmBuilder.BuildTraced()
10291029
require.NoError(t, err)
10301030

1031-
tCtx, done := context.WithTimeout(context.Background(), time.Second*30)
1031+
tCtx, done := context.WithTimeout(context.Background(), time.Second*60)
10321032
defer done()
10331033

1034+
wg := &sync.WaitGroup{}
1035+
wg.Add(1)
1036+
10341037
go func() {
1038+
defer wg.Done()
10351039
assert.NoError(t, strm.Run(tCtx))
10361040
}()
10371041
require.NoError(t, inFunc(tCtx, service.NewMessage([]byte(`{"id":"hello world","content":"waddup"}`))))
10381042
require.NoError(t, strm.Stop(tCtx))
10391043

1044+
wg.Wait()
1045+
10401046
assert.Equal(t, `{"content":"waddup","id":"HELLO WORLD","meta":{"workflow":{"succeeded":["fooproc"]}}}`, outValue)
10411047
assert.Equal(t, map[string][]service.TracingEvent{
10421048
"barproc": {

0 commit comments

Comments
 (0)