From 4c3f06d566974eaa5a1181b29f065c482d878b96 Mon Sep 17 00:00:00 2001 From: Steven Sklar Date: Mon, 29 Jul 2024 16:19:40 -0400 Subject: [PATCH 1/2] feat(client): add ping method to http/s sender --- http_integration_test.go | 54 ++++++++++++++++++++++++++++++++++++ http_sender.go | 60 ++++++++++++++++++++++++++++++++-------- sender.go | 6 ++++ tcp_sender.go | 4 +++ tcp_sender_test.go | 13 +++++++++ test/haproxy.cfg | 4 ++- 6 files changed, 128 insertions(+), 13 deletions(-) diff --git a/http_integration_test.go b/http_integration_test.go index 246e6cb..2e44997 100644 --- a/http_integration_test.go +++ b/http_integration_test.go @@ -122,3 +122,57 @@ func (suite *integrationTestSuite) TestServerSideError() { sender.Close(ctx) questdbC.Stop(ctx) } + +func (suite *integrationTestSuite) TestHttpPingSuccess() { + if testing.Short() { + suite.T().Skip("skipping integration test") + } + + ctx := context.Background() + + var ( + sender qdb.LineSender + err error + ) + + questdbC, err := setupQuestDB(ctx, noAuth) + assert.NoError(suite.T(), err) + + sender, err = qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(questdbC.httpAddress)) + assert.NoError(suite.T(), err) + + assert.Eventually(suite.T(), func() bool { + return sender.Ping(ctx) != nil + }, time.Second, 100*time.Millisecond) + + sender.Close(ctx) + questdbC.Stop(ctx) + +} + +func (suite *integrationTestSuite) TestHttpsBasicAuthPingSuccess() { + if testing.Short() { + suite.T().Skip("skipping integration test") + } + + ctx := context.Background() + + var ( + sender qdb.LineSender + err error + ) + + questdbC, err := setupQuestDB(ctx, httpBasicAuth) + assert.NoError(suite.T(), err) + + sender, err = qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(questdbC.httpAddress)) + assert.NoError(suite.T(), err) + + assert.Eventually(suite.T(), func() bool { + return sender.Ping(ctx) != nil + }, time.Second, 100*time.Millisecond) + + sender.Close(ctx) + questdbC.Stop(ctx) + +} diff --git a/http_sender.go b/http_sender.go index 0e86230..c36e9a2 100644 --- a/http_sender.go +++ b/http_sender.go @@ -107,9 +107,10 @@ type httpLineSender struct { pass string token string - client http.Client - uri string - closed bool + client http.Client + uri string + pingUri string + closed bool // Global transport is used unless a custom transport was provided. globalTransport *globalHttpTransport @@ -156,11 +157,13 @@ func newHttpLineSender(conf *lineSenderConfig) (*httpLineSender, error) { s.globalTransport.RegisterClient() } - s.uri = "http" + var protocol = "http" if conf.tlsMode != tlsDisabled { - s.uri += "s" + protocol += "s" } - s.uri += fmt.Sprintf("://%s/write", s.address) + + s.uri = fmt.Sprintf("%s://%s/write", protocol, s.address) + s.pingUri = fmt.Sprintf("%s://%s/ping", protocol, s.address) return s, nil } @@ -337,23 +340,56 @@ func (s *httpLineSender) At(ctx context.Context, ts time.Time) error { return nil } -// makeRequest returns a boolean if we need to retry the request -func (s *httpLineSender) makeRequest(ctx context.Context) (bool, error) { +func (s *httpLineSender) Ping(ctx context.Context) error { + buf := bytes.Buffer{} + success := http.StatusNoContent + req, err := http.NewRequest( http.MethodPost, - s.uri, - bytes.NewReader(s.buf.Bytes()), + s.pingUri, + &buf, ) if err != nil { - return false, err + return err } - req.ContentLength = int64(s.BufLen()) + s.setAuth(req) + + resp, err := s.client.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != success { + return fmt.Errorf("ping returned status %d, expected %d", resp.StatusCode, success) + } + + return nil + +} + +func (s *httpLineSender) setAuth(req *http.Request) { if s.user != "" && s.pass != "" { req.SetBasicAuth(s.user, s.pass) } else if s.token != "" { req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", s.token)) } +} + +// makeRequest returns a boolean if we need to retry the request +func (s *httpLineSender) makeRequest(ctx context.Context) (bool, error) { + req, err := http.NewRequest( + http.MethodPost, + s.uri, + bytes.NewReader(s.buf.Bytes()), + ) + if err != nil { + return false, err + } + + s.setAuth(req) + + req.ContentLength = int64(s.BufLen()) // reqTimeout = ( request.len() / request_min_throughput ) + request_timeout // nb: conversion from int to time.Duration is in milliseconds diff --git a/sender.go b/sender.go index d768673..ae40bcd 100644 --- a/sender.go +++ b/sender.go @@ -148,6 +148,12 @@ type LineSender interface { // If auto-flush is enabled, the client will flush any remaining buffered // messages before closing itself. Close(ctx context.Context) error + + // Ping will send an HTTP request to the server's /ping path + // to test connectivity. + // + // Valid only for HTTP senders. TCP senders will return an error + Ping(ctx context.Context) error } const ( diff --git a/tcp_sender.go b/tcp_sender.go index 8fde732..599ed4c 100644 --- a/tcp_sender.go +++ b/tcp_sender.go @@ -241,6 +241,10 @@ func (s *tcpLineSender) At(ctx context.Context, ts time.Time) error { return nil } +func (s *tcpLineSender) Ping(ctx context.Context) error { + return errors.New("ping not supported for tcp senders") +} + // Messages returns a copy of accumulated ILP messages that are not // flushed to the TCP connection yet. Useful for debugging purposes. func (s *tcpLineSender) Messages() string { diff --git a/tcp_sender_test.go b/tcp_sender_test.go index b528d26..986e701 100644 --- a/tcp_sender_test.go +++ b/tcp_sender_test.go @@ -284,6 +284,19 @@ func TestErrorOnContextDeadline(t *testing.T) { t.Fail() } +func TestTcpPingReturnsError(t *testing.T) { + ctx := context.Background() + srv, err := newTestTcpServer(readAndDiscard) + assert.NoError(t, err) + defer srv.Close() + + sender, err := qdb.NewLineSender(ctx, qdb.WithTcp(), qdb.WithAddress(srv.Addr())) + assert.NoError(t, err) + defer sender.Close(ctx) + + assert.ErrorContains(t, sender.Ping(ctx), "ping not supported") +} + func BenchmarkLineSenderBatch1000(b *testing.B) { ctx := context.Background() diff --git a/test/haproxy.cfg b/test/haproxy.cfg index eac3e50..a229fc0 100644 --- a/test/haproxy.cfg +++ b/test/haproxy.cfg @@ -1,3 +1,6 @@ +global + maxconn 256 + frontend httpfront bind 0.0.0.0:8888 mode http @@ -26,4 +29,3 @@ frontend httpbasicauthfront mode http http-request auth unless { http_auth(httpcreds) } default_backend http - From 36803c2f7278227849038a869fc30bb9453eb8d5 Mon Sep 17 00:00:00 2001 From: Steven Sklar Date: Tue, 30 Jul 2024 12:07:49 -0400 Subject: [PATCH 2/2] tests fail because we need to retry ping... --- http_integration_test.go | 54 ---------------------------------------- http_sender.go | 15 ++++++----- http_sender_test.go | 10 ++++---- sender.go | 6 ----- tcp_sender.go | 4 --- tcp_sender_test.go | 14 ----------- utils_test.go | 6 +++++ 7 files changed, 18 insertions(+), 91 deletions(-) diff --git a/http_integration_test.go b/http_integration_test.go index 2e44997..246e6cb 100644 --- a/http_integration_test.go +++ b/http_integration_test.go @@ -122,57 +122,3 @@ func (suite *integrationTestSuite) TestServerSideError() { sender.Close(ctx) questdbC.Stop(ctx) } - -func (suite *integrationTestSuite) TestHttpPingSuccess() { - if testing.Short() { - suite.T().Skip("skipping integration test") - } - - ctx := context.Background() - - var ( - sender qdb.LineSender - err error - ) - - questdbC, err := setupQuestDB(ctx, noAuth) - assert.NoError(suite.T(), err) - - sender, err = qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(questdbC.httpAddress)) - assert.NoError(suite.T(), err) - - assert.Eventually(suite.T(), func() bool { - return sender.Ping(ctx) != nil - }, time.Second, 100*time.Millisecond) - - sender.Close(ctx) - questdbC.Stop(ctx) - -} - -func (suite *integrationTestSuite) TestHttpsBasicAuthPingSuccess() { - if testing.Short() { - suite.T().Skip("skipping integration test") - } - - ctx := context.Background() - - var ( - sender qdb.LineSender - err error - ) - - questdbC, err := setupQuestDB(ctx, httpBasicAuth) - assert.NoError(suite.T(), err) - - sender, err = qdb.NewLineSender(ctx, qdb.WithHttp(), qdb.WithAddress(questdbC.httpAddress)) - assert.NoError(suite.T(), err) - - assert.Eventually(suite.T(), func() bool { - return sender.Ping(ctx) != nil - }, time.Second, 100*time.Millisecond) - - sender.Close(ctx) - questdbC.Stop(ctx) - -} diff --git a/http_sender.go b/http_sender.go index c36e9a2..8dcad5d 100644 --- a/http_sender.go +++ b/http_sender.go @@ -107,10 +107,9 @@ type httpLineSender struct { pass string token string - client http.Client - uri string - pingUri string - closed bool + client http.Client + uri string + closed bool // Global transport is used unless a custom transport was provided. globalTransport *globalHttpTransport @@ -163,9 +162,9 @@ func newHttpLineSender(conf *lineSenderConfig) (*httpLineSender, error) { } s.uri = fmt.Sprintf("%s://%s/write", protocol, s.address) - s.pingUri = fmt.Sprintf("%s://%s/ping", protocol, s.address) - return s, nil + var pingUri = fmt.Sprintf("%s://%s/ping", protocol, s.address) + return s, s.ping(pingUri) } func (s *httpLineSender) Flush(ctx context.Context) error { @@ -340,13 +339,13 @@ func (s *httpLineSender) At(ctx context.Context, ts time.Time) error { return nil } -func (s *httpLineSender) Ping(ctx context.Context) error { +func (s *httpLineSender) ping(uri string) error { buf := bytes.Buffer{} success := http.StatusNoContent req, err := http.NewRequest( http.MethodPost, - s.pingUri, + uri, &buf, ) if err != nil { diff --git a/http_sender_test.go b/http_sender_test.go index f1e1f4b..05e4dd8 100644 --- a/http_sender_test.go +++ b/http_sender_test.go @@ -93,19 +93,19 @@ func TestHttpHappyCasesFromConf(t *testing.T) { } func TestHttpHappyCasesFromEnv(t *testing.T) { - var ( - addr = "localhost:1111" - ) + // Set up test server to handle pings from constructor + s, err := newTestServerWithProtocol(readAndDiscard, "http") + assert.NoError(t, err) testCases := []httpConfigTestCase{ { name: "addr only", - config: fmt.Sprintf("http::addr=%s", addr), + config: fmt.Sprintf("http::addr=%s", s.addr), }, { name: "auto flush", config: fmt.Sprintf("http::addr=%s;auto_flush_rows=100;auto_flush_interval=1000;", - addr), + s.addr), }, } diff --git a/sender.go b/sender.go index ae40bcd..d768673 100644 --- a/sender.go +++ b/sender.go @@ -148,12 +148,6 @@ type LineSender interface { // If auto-flush is enabled, the client will flush any remaining buffered // messages before closing itself. Close(ctx context.Context) error - - // Ping will send an HTTP request to the server's /ping path - // to test connectivity. - // - // Valid only for HTTP senders. TCP senders will return an error - Ping(ctx context.Context) error } const ( diff --git a/tcp_sender.go b/tcp_sender.go index 599ed4c..8fde732 100644 --- a/tcp_sender.go +++ b/tcp_sender.go @@ -241,10 +241,6 @@ func (s *tcpLineSender) At(ctx context.Context, ts time.Time) error { return nil } -func (s *tcpLineSender) Ping(ctx context.Context) error { - return errors.New("ping not supported for tcp senders") -} - // Messages returns a copy of accumulated ILP messages that are not // flushed to the TCP connection yet. Useful for debugging purposes. func (s *tcpLineSender) Messages() string { diff --git a/tcp_sender_test.go b/tcp_sender_test.go index 986e701..501e850 100644 --- a/tcp_sender_test.go +++ b/tcp_sender_test.go @@ -283,20 +283,6 @@ func TestErrorOnContextDeadline(t *testing.T) { } t.Fail() } - -func TestTcpPingReturnsError(t *testing.T) { - ctx := context.Background() - srv, err := newTestTcpServer(readAndDiscard) - assert.NoError(t, err) - defer srv.Close() - - sender, err := qdb.NewLineSender(ctx, qdb.WithTcp(), qdb.WithAddress(srv.Addr())) - assert.NoError(t, err) - defer sender.Close(ctx) - - assert.ErrorContains(t, sender.Ping(ctx), "ping not supported") -} - func BenchmarkLineSenderBatch1000(b *testing.B) { ctx := context.Background() diff --git a/utils_test.go b/utils_test.go index 8ecef81..9823458 100644 --- a/utils_test.go +++ b/utils_test.go @@ -194,6 +194,12 @@ func (s *testServer) serveHttp() { err error ) + // handle ping + if r.URL.Path == "/ping" { + w.WriteHeader(http.StatusNoContent) + return + } + switch s.serverType { case failFirstThenSendToBackChannel: if atomic.AddInt64(&reqs, 1) == 1 {