diff --git a/core/internal/evaluator/caching.go b/core/internal/evaluator/caching.go index f7b1edf0..cc956318 100644 --- a/core/internal/evaluator/caching.go +++ b/core/internal/evaluator/caching.go @@ -170,7 +170,8 @@ func (module *CachingEvaluator) getConsumerStatus(request *protocol.EvaluatorReq func (module *CachingEvaluator) evaluateConsumerStatus(clusterAndConsumer string) (interface{}, error) { // First off, we need to separate the cluster and consumer values from the string provided - parts := strings.Split(clusterAndConsumer, " ") + // Allow space in consumer name + parts := strings.SplitN(clusterAndConsumer, " ", 2) if len(parts) != 2 { module.Log.Error("query with bad clusterAndConsumer", zap.String("arg", clusterAndConsumer)) return nil, &cacheError{StatusCode: 500, Reason: "bad request"} diff --git a/core/internal/helpers/validation.go b/core/internal/helpers/validation.go index 6d1fe8aa..612660dc 100644 --- a/core/internal/helpers/validation.go +++ b/core/internal/helpers/validation.go @@ -94,7 +94,7 @@ func ValidateURL(rawURL string) bool { // ValidateHostList returns true if the provided slice of strings can all be parsed by ValidateHostPort func ValidateHostList(hosts []string) bool { for _, host := range hosts { - if !ValidateHostPort(host, false) { + if !ValidateHostPort(host, true) { return false } } diff --git a/core/internal/helpers/validation_test.go b/core/internal/helpers/validation_test.go index 7fadd636..8ccacb11 100644 --- a/core/internal/helpers/validation_test.go +++ b/core/internal/helpers/validation_test.go @@ -96,6 +96,13 @@ func TestValidateTopic(t *testing.T) { } } +func TestValidateFilename(t *testing.T) { + for i, testSet := range testTopics { + result := ValidateFilename(testSet.TestValue) + assert.Equalf(t, testSet.Result, result, "Test %v - Expected '%v' to return %v, not %v", i, testSet.TestValue, testSet.Result, result) + } +} + var testEmails = []TestSet{ {"ok@example.com", true}, {"need@domain", false}, @@ -173,10 +180,13 @@ var testHostPorts = []TestSet{ {"host.example.com:23", true}, {"thissegmentiswaytoolongbecauseitshouldnotbemorethansixtythreecharacters.foo.com:36334", false}, {"underscores_are.not.valid.com:3453", false}, + {":2453", true}, + {"hostname:stringsNotValid", false}, } func TestValidateHostList(t *testing.T) { for i, testSet := range testHostPorts { + // Test allow blank hostname result := ValidateHostList([]string{testSet.TestValue}) assert.Equalf(t, testSet.Result, result, "Test %v - Expected '%v' to return %v, not %v", i, testSet.TestValue, testSet.Result, result) } diff --git a/core/internal/helpers/zookeeper.go b/core/internal/helpers/zookeeper.go index f2fa04e0..943f58f0 100644 --- a/core/internal/helpers/zookeeper.go +++ b/core/internal/helpers/zookeeper.go @@ -56,6 +56,11 @@ func (z *BurrowZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.E return z.client.GetW(path) } +// Exists returns a boolean stating whether or not the specified path exists. +func (z *BurrowZookeeperClient) Exists(path string) (bool, *zk.Stat, error) { + return z.client.Exists(path) +} + // ExistsW returns a boolean stating whether or not the specified path exists. This method also sets a watch on the node // (exists if it does not currently exist, or a data watch otherwise), providing an event channel that will receive a // message when the watch fires @@ -115,6 +120,12 @@ func (m *MockZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Eve return args.Get(0).([]byte), args.Get(1).(*zk.Stat), args.Get(2).(<-chan zk.Event), args.Error(3) } +// Exists mocks protocol.ZookeeperClient.Exists +func (m *MockZookeeperClient) Exists(path string) (bool, *zk.Stat, error) { + args := m.Called(path) + return args.Bool(0), args.Get(1).(*zk.Stat), args.Error(2) +} + // ExistsW mocks protocol.ZookeeperClient.ExistsW func (m *MockZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) { args := m.Called(path) diff --git a/core/internal/zookeeper/coordinator.go b/core/internal/zookeeper/coordinator.go index 43b98c5c..09e0a127 100644 --- a/core/internal/zookeeper/coordinator.go +++ b/core/internal/zookeeper/coordinator.go @@ -121,10 +121,16 @@ func (zc *Coordinator) createRecursive(path string) error { parts := strings.Split(path, "/") for i := 2; i <= len(parts); i++ { - _, err := zc.App.Zookeeper.Create(strings.Join(parts[:i], "/"), []byte{}, 0, zk.WorldACL(zk.PermAll)) - // Ignore when the node exists already - if (err != nil) && (err != zk.ErrNodeExists) { - return err + // If the rootpath exists, skip the Create process to avoid "zk: not authenticated" error + exist, _, errExists := zc.App.Zookeeper.Exists(strings.Join(parts[:i], "/")) + if !exist { + _, err := zc.App.Zookeeper.Create(strings.Join(parts[:i], "/"), []byte{}, 0, zk.WorldACL(zk.PermAll)) + // Ignore when the node exists already + if (err != nil) && (err != zk.ErrNodeExists) { + return err + } + } else { + return errExists } } return nil diff --git a/core/internal/zookeeper/coordinator_test.go b/core/internal/zookeeper/coordinator_test.go index 846549fb..4fc65752 100644 --- a/core/internal/zookeeper/coordinator_test.go +++ b/core/internal/zookeeper/coordinator_test.go @@ -65,8 +65,12 @@ func TestCoordinator_StartStop(t *testing.T) { return &mockClient, eventChan, nil } + offsetStat := &zk.Stat{} + mockClient.On("Exists", "/test").Return(true, offsetStat, nil) mockClient.On("Create", "/test", []byte{}, int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNodeExists) + mockClient.On("Exists", "/test/path").Return(true, offsetStat, nil) mockClient.On("Create", "/test/path", []byte{}, int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNodeExists) + mockClient.On("Exists", "/test/path/burrow").Return(false, offsetStat, nil) mockClient.On("Create", "/test/path/burrow", []byte{}, int32(0), zk.WorldACL(zk.PermAll)).Return("", nil) mockClient.On("Close").Run(func(args mock.Arguments) { close(eventChan) }).Return() diff --git a/core/protocol/protocol.go b/core/protocol/protocol.go index 513d157e..05565945 100644 --- a/core/protocol/protocol.go +++ b/core/protocol/protocol.go @@ -141,13 +141,17 @@ type ZookeeperClient interface { // the children of the specified path, providing an event channel that will receive a message when the watch fires GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error) + // For the given path in Zookeeper, return a boolean stating whether or not the node exists. + // The method does not set watch on the node, but verifies existence of a node to avoid authentication error. + Exists(path string) (bool, *zk.Stat, error) + // For the given path in Zookeeper, return a boolean stating whether or not the node exists. This method also sets // a watch on the node (exists if it does not currently exist, or a data watch otherwise), providing an event // channel that will receive a message when the watch fires ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) // Create makes a new ZNode at the specified path with the contents set to the data byte-slice. Flags can be - // provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is\ + // provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is // desired, specify // zk.WorldACL(zk.PermAll) Create(string, []byte, int32, []zk.ACL) (string, error)