diff --git a/internal/handlers/processor.go b/internal/handlers/processor.go index 9621ddd..319a9bf 100644 --- a/internal/handlers/processor.go +++ b/internal/handlers/processor.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "strings" + "sync" "time" "github.com/steemit/jussi/internal/cache" @@ -13,6 +14,7 @@ import ( "github.com/steemit/jussi/internal/request" "github.com/steemit/jussi/internal/telemetry" "github.com/steemit/jussi/internal/upstream" + "github.com/steemit/jussi/internal/validators" "github.com/steemit/jussi/internal/ws" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -28,6 +30,89 @@ var ( BatchSize = telemetry.BatchSize ) +// Upstream timeout policy. +// +// defaultUpstreamTimeout is the fallback when the upstream config sets +// timeout=0 (no per-URN entry matched). 15s covers ~5 Steem blocks and +// keeps idempotent reads from hanging forever if a per-URN timeout was +// forgotten. +// +// broadcastMinimumTimeout is the lower bound for broadcast_transaction* +// methods regardless of config. Synchronous broadcast must wait for a +// block (~3s on Steem); when an upstream node is slow or hung, ALB will +// keep the connection open and jussi must wait until it succeeds or until +// the timeout fires. Legacy jussi had timeout=0 (no limit) here and used +// retry to mask hung backends — we keep a finite limit but raise it to +// 30s so the request can survive ~10 blocks of upstream slowness without +// being clipped, which is what surfaces as "broadcast timeout" to wallets. +const ( + defaultUpstreamTimeout = 15 * time.Second + broadcastMinimumTimeout = 30 * time.Second +) + +// selectUpstreamTimeout resolves the effective upstream timeout for a request. +// It honours the configured per-URN timeout, falls back to defaultUpstreamTimeout +// when unset, and raises broadcast_transaction* requests to at least +// broadcastMinimumTimeout regardless of config (since clipping a synchronous +// broadcast can leave the transaction in flight on the upstream while the +// caller retries with a fresh expiration). +// +// When the broadcast floor actually overrides a smaller configured value +// it logs once per method per broadcastFloorLogInterval — operators want +// to know a config typo (or a too-aggressive timeout) is being silently +// rescued by the safety net, but only once, not on every request. +func selectUpstreamTimeout(req *request.JSONRPCRequest) time.Duration { + var configured time.Duration + if req != nil && req.Upstream != nil { + configured = time.Duration(req.Upstream.Timeout) * time.Second + } + timeout := configured + if timeout <= 0 { + timeout = defaultUpstreamTimeout + } + if validators.IsBroadcastTransactionRequest(req) && timeout < broadcastMinimumTimeout { + method := "unknown" + if req != nil && req.URN != nil { + method = req.URN.Method + } + if shouldLogBroadcastFloor(method) { + slog.Info("broadcast upstream timeout floored", + "method", method, + "configured_s", configured.Seconds(), + "applied_s", broadcastMinimumTimeout.Seconds(), + ) + } + timeout = broadcastMinimumTimeout + } + return timeout +} + +// broadcastFloorLogInterval throttles "timeout floored" logs to one +// emission per method per interval. The point is observability, not a +// per-request audit trail. +const broadcastFloorLogInterval = time.Minute + +var broadcastFloorLastLog sync.Map // map[string]time.Time keyed by method + +func shouldLogBroadcastFloor(method string) bool { + now := time.Now() + for { + actual, loaded := broadcastFloorLastLog.Load(method) + if loaded { + if lastT, ok := actual.(time.Time); ok && now.Sub(lastT) < broadcastFloorLogInterval { + return false + } + if broadcastFloorLastLog.CompareAndSwap(method, actual, now) { + return true + } + } else { + if _, loaded := broadcastFloorLastLog.LoadOrStore(method, now); !loaded { + return true + } + } + } +} + // RequestProcessor processes JSON-RPC requests type RequestProcessor struct { cacheGroup *cache.CacheGroup @@ -306,6 +391,21 @@ func (p *RequestProcessor) ProcessSingleRequest(ctx context.Context, jsonrpcReq // Mark span as error telemetry.RecordSpanError(span, fmt.Errorf("upstream returned error: %v", errField["message"])) RequestsTotal.WithLabelValues(jsonrpcReq.URN.Namespace, jsonrpcReq.URN.Method, "error").Inc() + + // For broadcast methods, emit a structured warn log alongside the + // span. Broadcast traffic is low-volume and these errors are the + // signal operators actually need to correlate "wallets are seeing + // failed transactions" with an upstream incident — without paging + // through jaeger trace by trace. + if validators.IsBroadcastTransactionRequest(jsonrpcReq) { + slog.Warn("broadcast upstream returned error", + "method", jsonrpcReq.URN.Method, + "namespace", jsonrpcReq.URN.Namespace, + "upstream_url", upstreamURL, + "jussi_request_id", jsonrpcReq.JussiRequestID, + "upstream_message", fmt.Sprintf("%v", errField["message"]), + ) + } } else { telemetry.SetSpanSuccess(span) RequestsTotal.WithLabelValues(jsonrpcReq.URN.Namespace, jsonrpcReq.URN.Method, "success").Inc() @@ -322,24 +422,29 @@ func getProtocol(url string) string { return "http" } -// callHTTPUpstream calls HTTP upstream +// callHTTPUpstream calls HTTP upstream with policy tuned to the request type: +// - broadcast_transaction* methods are non-idempotent and MUST NOT be +// retried (a transient transport error could otherwise cause the same +// transaction to be submitted twice). They get a single attempt with +// the broadcast-minimum timeout enforced by selectUpstreamTimeout. +// - All other methods are treated as idempotent and use RequestWithRetry +// with DefaultRetryConfig (≤2 attempts, ~100-500ms backoff). This +// mirrors what legacy jussi did to mask transient ALB/steemd blips, +// but with a tight enough budget to avoid the expiration-on-retry +// pattern that motivated commit 9cf36ea. func (p *RequestProcessor) callHTTPUpstream(ctx context.Context, jsonrpcReq *request.JSONRPCRequest, url string) (map[string]interface{}, error) { payload := jsonrpcReq.ToUpstreamRequest() headers := jsonrpcReq.UpstreamHeaders() - // Create timeout context - // A timeout of 0 in the upstream config means "use default". - // Previously 0 meant "no timeout" which could cause requests to - // hang indefinitely (e.g. broadcast_transaction_synchronous). - timeout := time.Duration(jsonrpcReq.Upstream.Timeout) * time.Second - if timeout == 0 { - timeout = 3 * time.Second // safe default for all requests - } + timeout := selectUpstreamTimeout(jsonrpcReq) var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() - return p.httpClient.Request(ctx, url, payload, headers) + if validators.IsBroadcastTransactionRequest(jsonrpcReq) { + return p.httpClient.Request(ctx, url, payload, headers) + } + return p.httpClient.RequestWithRetry(ctx, url, payload, headers, upstream.DefaultRetryConfig()) } // TODO: WebSocket support - temporarily disabled diff --git a/internal/handlers/processor_timeout_test.go b/internal/handlers/processor_timeout_test.go new file mode 100644 index 0000000..23956ab --- /dev/null +++ b/internal/handlers/processor_timeout_test.go @@ -0,0 +1,166 @@ +package handlers + +import ( + "sync" + "testing" + "time" + + "github.com/steemit/jussi/internal/request" + "github.com/steemit/jussi/internal/urn" +) + +func TestSelectUpstreamTimeout(t *testing.T) { + mkReq := func(method string, cfgTimeoutSec int) *request.JSONRPCRequest { + return &request.JSONRPCRequest{ + URN: &urn.URN{ + Namespace: "appbase", + API: "condenser_api", + Method: method, + }, + Upstream: &request.UpstreamConfig{ + Timeout: cfgTimeoutSec, + }, + } + } + + tests := []struct { + name string + req *request.JSONRPCRequest + want time.Duration + }{ + { + name: "non-broadcast with explicit 5s uses configured value", + req: mkReq("get_block", 5), + want: 5 * time.Second, + }, + { + name: "non-broadcast with 0 falls back to default", + req: mkReq("get_block", 0), + want: defaultUpstreamTimeout, + }, + { + name: "broadcast_transaction_synchronous with 0 raised to broadcast minimum", + req: mkReq("broadcast_transaction_synchronous", 0), + want: broadcastMinimumTimeout, + }, + { + name: "broadcast_transaction with 0 raised to broadcast minimum", + req: mkReq("broadcast_transaction", 0), + want: broadcastMinimumTimeout, + }, + { + name: "broadcast with sub-minimum configured value raised to broadcast minimum", + req: mkReq("broadcast_transaction_synchronous", 3), + want: broadcastMinimumTimeout, + }, + { + name: "broadcast with above-minimum configured value preserved", + req: mkReq("broadcast_transaction_synchronous", 60), + want: 60 * time.Second, + }, + { + name: "broadcast with configured value equal to minimum preserved", + req: mkReq("broadcast_transaction_synchronous", 30), + want: broadcastMinimumTimeout, + }, + { + name: "broadcast with previous-15s config raised to new 30s minimum", + req: mkReq("broadcast_transaction_synchronous", 15), + want: broadcastMinimumTimeout, + }, + { + name: "broadcast_block is detected as broadcast method", + req: mkReq("broadcast_block", 0), + want: broadcastMinimumTimeout, + }, + { + name: "nil request falls back to default", + req: nil, + want: defaultUpstreamTimeout, + }, + { + name: "request with nil URN falls back to default", + req: &request.JSONRPCRequest{Upstream: &request.UpstreamConfig{Timeout: 0}}, + want: defaultUpstreamTimeout, + }, + { + name: "request with nil Upstream falls back to default", + req: &request.JSONRPCRequest{URN: &urn.URN{Namespace: "appbase", Method: "get_block"}, Upstream: nil}, + want: defaultUpstreamTimeout, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := selectUpstreamTimeout(tt.req) + if got != tt.want { + t.Errorf("selectUpstreamTimeout = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSelectUpstreamTimeoutSafetyConstants(t *testing.T) { + // Both constants must cover at least one Steem block (~3s) plus network + // and confirmation overhead. The production upstream config sets + // broadcast timeouts to 15s; keeping the safety net at the same value + // avoids surprise regressions when config and code drift. + const blockPlusHeadroom = 10 * time.Second + if defaultUpstreamTimeout < blockPlusHeadroom { + t.Errorf("defaultUpstreamTimeout=%v is too small; must cover at least one block (~3s) plus headroom", defaultUpstreamTimeout) + } + if broadcastMinimumTimeout < blockPlusHeadroom { + t.Errorf("broadcastMinimumTimeout=%v is too small; must cover at least one block (~3s) plus headroom", broadcastMinimumTimeout) + } +} + +func TestShouldLogBroadcastFloor(t *testing.T) { + // Reset state before test + broadcastFloorLastLog.Clear() + + // First call should always log. + if !shouldLogBroadcastFloor("broadcast_transaction") { + t.Error("first call should return true") + } + // Immediate second call should be suppressed. + if shouldLogBroadcastFloor("broadcast_transaction") { + t.Error("second call within interval should return false") + } + // Different method should log independently. + if !shouldLogBroadcastFloor("broadcast_transaction_synchronous") { + t.Error("different method should log independently") + } +} + +func TestShouldLogBroadcastFloorConcurrent(t *testing.T) { + // Reset state + broadcastFloorLastLog.Clear() + + // Hammer the same method from multiple goroutines. + // With LoadOrStore, at most one goroutine gets true on the first + // wave; the rest should see false. + const goroutines = 32 + results := make([]bool, goroutines) + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func(idx int) { + defer wg.Done() + results[idx] = shouldLogBroadcastFloor("concurrent_method") + }(i) + } + wg.Wait() + + trueCount := 0 + for _, r := range results { + if r { + trueCount++ + } + } + if trueCount == 0 { + t.Error("expected at least one goroutine to log") + } + if trueCount > 3 { + t.Errorf("expected at most 3 true results under contention (got %d); LoadOrStore should suppress most duplicates", trueCount) + } +} diff --git a/internal/upstream/http.go b/internal/upstream/http.go index 966dc7f..7d9e36c 100644 --- a/internal/upstream/http.go +++ b/internal/upstream/http.go @@ -3,6 +3,7 @@ package upstream import ( "bytes" "context" + "crypto/tls" "encoding/json" "fmt" "io" @@ -18,6 +19,17 @@ type HTTPClient struct { client *http.Client } +// UpstreamStatusError is a typed error carrying the HTTP status code from +// an upstream response (>= 500). Callers can use errors.As to reliably +// extract the code without string matching. +type UpstreamStatusError struct { + StatusCode int +} + +func (e *UpstreamStatusError) Error() string { + return fmt.Sprintf("upstream server error: %d", e.StatusCode) +} + // NewHTTPClient creates a new HTTP client func NewHTTPClient() *HTTPClient { return &HTTPClient{ @@ -26,9 +38,22 @@ func NewHTTPClient() *HTTPClient { // context.WithTimeout in callHTTPUpstream to avoid nested // timeout conflicts between transport and context. Transport: &http.Transport{ + // Force HTTP/1.1 to avoid HTTP/2 multiplexing which causes + // all requests to share a single TCP connection to the ALB. + // With HTTP/1.1, the connection pool distributes requests + // across multiple connections for better load balancing. + ForceAttemptHTTP2: false, + TLSClientConfig: &tls.Config{ + NextProtos: []string{"http/1.1"}, + }, MaxIdleConns: 100, MaxIdleConnsPerHost: 10, - IdleConnTimeout: 90 * time.Second, + // Limit total connections per host to force connection + // rotation and prevent sticky connections to a single + // backend instance behind the ALB. + MaxConnsPerHost: 20, + IdleConnTimeout: 90 * time.Second, + DisableKeepAlives: false, }, }, } @@ -74,7 +99,7 @@ func (c *HTTPClient) Request(ctx context.Context, url string, payload map[string // Check for server errors if resp.StatusCode >= 500 { - return nil, fmt.Errorf("upstream server error: %d", resp.StatusCode) + return nil, &UpstreamStatusError{StatusCode: resp.StatusCode} } // Read response @@ -92,6 +117,45 @@ func (c *HTTPClient) Request(ctx context.Context, url string, payload map[string return result, nil } +// RequestWithRetry sends a request with bounded-attempt retry, but only +// retries when IsRetriableUpstreamError returns true for the most recent +// error. The total wall-clock budget is still bounded by ctx — once ctx +// is Done we stop immediately, even if attempts remain. +// +// This is for IDEMPOTENT requests only. Broadcast methods must call +// Request directly so a transient transport error never causes a duplicate +// submission. +func (c *HTTPClient) RequestWithRetry( + ctx context.Context, + url string, + payload map[string]interface{}, + headers map[string]string, + cfg RetryConfig, +) (map[string]interface{}, error) { + if cfg.MaxAttempts < 1 { + cfg.MaxAttempts = 1 + } + var lastErr error + for attempt := 1; attempt <= cfg.MaxAttempts; attempt++ { + if attempt > 1 { + select { + case <-time.After(cfg.BackoffFor(attempt - 1)): + case <-ctx.Done(): + return nil, ctx.Err() + } + } + result, err := c.Request(ctx, url, payload, headers) + if err == nil { + return result, nil + } + lastErr = err + if !IsRetriableUpstreamError(err) { + return nil, err + } + } + return nil, lastErr +} + // Close closes the HTTP client func (c *HTTPClient) Close() error { // HTTP client doesn't need explicit closing diff --git a/internal/upstream/http_retry_test.go b/internal/upstream/http_retry_test.go new file mode 100644 index 0000000..b490c30 --- /dev/null +++ b/internal/upstream/http_retry_test.go @@ -0,0 +1,111 @@ +package upstream + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" +) + +// newRetryTestClient returns an HTTPClient pointed at the given test +// server URL with retry/backoff tuned to make tests fast. +func newRetryTestClient() *HTTPClient { + c := NewHTTPClient() + c.client.Timeout = 2 * time.Second + return c +} + +func TestRequestWithRetrySucceedsAfterTransient500(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&attempts, 1) + if n == 1 { + w.WriteHeader(http.StatusBadGateway) + return + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"jsonrpc": "2.0", "result": "ok"}) + })) + defer srv.Close() + + c := newRetryTestClient() + cfg := RetryConfig{MaxAttempts: 2, InitialBackoff: time.Millisecond, MaxBackoff: time.Millisecond} + resp, err := c.RequestWithRetry(context.Background(), srv.URL, map[string]interface{}{"x": 1}, nil, cfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := resp["result"]; got != "ok" { + t.Errorf("got result=%v, want ok", got) + } + if got := atomic.LoadInt32(&attempts); got != 2 { + t.Errorf("attempts=%d, want 2", got) + } +} + +func TestRequestWithRetryStopsOnNonRetriable4xx(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(map[string]interface{}{"error": "bad"}) + })) + defer srv.Close() + + c := newRetryTestClient() + cfg := RetryConfig{MaxAttempts: 3, InitialBackoff: time.Millisecond, MaxBackoff: time.Millisecond} + resp, err := c.RequestWithRetry(context.Background(), srv.URL, map[string]interface{}{"x": 1}, nil, cfg) + // 4xx is returned as a parsed response (not an error) by HTTPClient.Request + // because Request only treats >=500 as errors. No retry should happen. + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp == nil { + t.Fatal("expected non-nil response") + } + if got := atomic.LoadInt32(&attempts); got != 1 { + t.Errorf("attempts=%d, want 1 (no retry on 4xx)", got) + } +} + +func TestRequestWithRetryGivesUpAfterMaxAttempts(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer srv.Close() + + c := newRetryTestClient() + cfg := RetryConfig{MaxAttempts: 3, InitialBackoff: time.Millisecond, MaxBackoff: time.Millisecond} + _, err := c.RequestWithRetry(context.Background(), srv.URL, map[string]interface{}{"x": 1}, nil, cfg) + if err == nil { + t.Fatal("expected error after maxAttempts exhausted") + } + if got := atomic.LoadInt32(&attempts); got != 3 { + t.Errorf("attempts=%d, want 3", got) + } +} + +func TestRequestWithRetryRespectsContextCancellation(t *testing.T) { + var attempts int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attempts, 1) + w.WriteHeader(http.StatusServiceUnavailable) + })) + defer srv.Close() + + c := newRetryTestClient() + cfg := RetryConfig{MaxAttempts: 5, InitialBackoff: 50 * time.Millisecond, MaxBackoff: 50 * time.Millisecond} + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond) + defer cancel() + _, err := c.RequestWithRetry(ctx, srv.URL, map[string]interface{}{"x": 1}, nil, cfg) + if err == nil { + t.Fatal("expected context error") + } + // We should have stopped well before exhausting MaxAttempts. + if got := atomic.LoadInt32(&attempts); got >= 5 { + t.Errorf("attempts=%d, want < 5 (context should stop us)", got) + } +} diff --git a/internal/upstream/retry.go b/internal/upstream/retry.go new file mode 100644 index 0000000..ed04391 --- /dev/null +++ b/internal/upstream/retry.go @@ -0,0 +1,110 @@ +package upstream + +import ( + "context" + "errors" + "io" + "math/rand" + "net" + "strings" + "time" +) + +// RetryConfig configures bounded-attempt retry for idempotent upstream +// requests. Non-idempotent paths (notably broadcast_transaction*) must +// NOT use retry — see handlers.RequestProcessor.callHTTPUpstream. +type RetryConfig struct { + // MaxAttempts is the total number of attempts including the first + // (so MaxAttempts=2 means up to 1 retry after the original failure). + // Values < 1 are treated as 1 (no retry). + MaxAttempts int + + // InitialBackoff is the sleep before the first retry. Subsequent + // retries double up to MaxBackoff. + InitialBackoff time.Duration + + // MaxBackoff caps the per-attempt sleep. + MaxBackoff time.Duration + + // JitterFraction adds up to (JitterFraction * backoff) of random + // extra sleep to avoid thundering-herd on transient upstream blips. + JitterFraction float64 +} + +// DefaultRetryConfig is intentionally tighter than legacy jussi's +// (3 retries / multi-second backoff) so the worst-case retry budget +// stays well under typical wallet timeouts. +func DefaultRetryConfig() RetryConfig { + return RetryConfig{ + MaxAttempts: 2, + InitialBackoff: 100 * time.Millisecond, + MaxBackoff: 500 * time.Millisecond, + JitterFraction: 0.25, + } +} + +// IsRetriableUpstreamError reports whether an error from HTTPClient.Request +// indicates a transient transport-layer or upstream-side failure that +// is safe to retry for an idempotent request. +// +// We deliberately do NOT retry on context cancellation or deadline: +// the caller's budget is gone; another attempt cannot succeed within it +// and just burns context for nothing. +func IsRetriableUpstreamError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return true + } + // Match UpstreamStatusError via types — avoids fragile string matching. + var statusErr *UpstreamStatusError + if errors.As(err, &statusErr) { + return statusErr.StatusCode >= 500 && statusErr.StatusCode < 600 + } + msg := err.Error() + for _, s := range []string{ + "connection reset", + "connection refused", + "broken pipe", + "no such host", + "i/o timeout", + "network is unreachable", + } { + if strings.Contains(msg, s) { + return true + } + } + return false +} + +// BackoffFor returns the sleep duration before the n-th retry (1-based). +// Capped at MaxBackoff; adds up to JitterFraction * backoff jitter. +func (c RetryConfig) BackoffFor(retryNum int) time.Duration { + if retryNum < 1 { + retryNum = 1 + } + base := c.InitialBackoff + for i := 1; i < retryNum; i++ { + base *= 2 + if base > c.MaxBackoff || base < 0 { + base = c.MaxBackoff + break + } + } + if base > c.MaxBackoff { + base = c.MaxBackoff + } + if c.JitterFraction <= 0 { + return base + } + jitter := time.Duration(float64(base) * c.JitterFraction * rand.Float64()) + return base + jitter +} diff --git a/internal/upstream/retry_test.go b/internal/upstream/retry_test.go new file mode 100644 index 0000000..b0c04d3 --- /dev/null +++ b/internal/upstream/retry_test.go @@ -0,0 +1,116 @@ +package upstream + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "testing" + "time" +) + +type fakeTimeoutErr struct{} + +func (fakeTimeoutErr) Error() string { return "fake timeout" } +func (fakeTimeoutErr) Timeout() bool { return true } +func (fakeTimeoutErr) Temporary() bool { return true } + +var _ net.Error = fakeTimeoutErr{} + +func TestIsRetriableUpstreamError(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"context canceled", context.Canceled, false}, + {"context deadline exceeded", context.DeadlineExceeded, false}, + {"io.EOF bare", io.EOF, true}, + {"io.EOF wrapped", fmt.Errorf("upstream request failed: %w", io.EOF), true}, + {"io.ErrUnexpectedEOF", io.ErrUnexpectedEOF, true}, + {"net timeout", fakeTimeoutErr{}, true}, + {"connection reset string", errors.New("read tcp: connection reset by peer"), true}, + {"connection refused string", errors.New("dial tcp: connection refused"), true}, + {"i/o timeout string", errors.New("Get url: net/http: i/o timeout"), true}, + {"no such host string", errors.New("dial: no such host"), true}, + {"server error 500 via typed error", &UpstreamStatusError{StatusCode: 500}, true}, + {"server error 502 via typed error", &UpstreamStatusError{StatusCode: 502}, true}, + {"server error 503 via typed error", &UpstreamStatusError{StatusCode: 503}, true}, + {"server error 504 via typed error", &UpstreamStatusError{StatusCode: 504}, true}, + {"server error 4xx not retried via typed error", &UpstreamStatusError{StatusCode: 401}, false}, + {"server error 599 edge via typed error", &UpstreamStatusError{StatusCode: 599}, true}, + {"server error 600 not retried via typed error", &UpstreamStatusError{StatusCode: 600}, false}, + {"context deadline wrapped not retried", fmt.Errorf("upstream request failed: %w", context.DeadlineExceeded), false}, + {"unknown error not retried", errors.New("parse error: unexpected character"), false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := IsRetriableUpstreamError(tt.err) + if got != tt.want { + t.Errorf("IsRetriableUpstreamError(%q) = %v, want %v", tt.err, got, tt.want) + } + }) + } +} + +func TestRetryConfigBackoffExponentialCapped(t *testing.T) { + cfg := RetryConfig{ + MaxAttempts: 5, + InitialBackoff: 100 * time.Millisecond, + MaxBackoff: 500 * time.Millisecond, + JitterFraction: 0, // deterministic + } + tests := []struct { + retryNum int + want time.Duration + }{ + {1, 100 * time.Millisecond}, + {2, 200 * time.Millisecond}, + {3, 400 * time.Millisecond}, + {4, 500 * time.Millisecond}, + {5, 500 * time.Millisecond}, + {10, 500 * time.Millisecond}, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("retry%d", tt.retryNum), func(t *testing.T) { + got := cfg.BackoffFor(tt.retryNum) + if got != tt.want { + t.Errorf("BackoffFor(%d) = %v, want %v", tt.retryNum, got, tt.want) + } + }) + } +} + +func TestRetryConfigBackoffJitterBoundedAndPositive(t *testing.T) { + cfg := RetryConfig{ + MaxAttempts: 3, + InitialBackoff: 100 * time.Millisecond, + MaxBackoff: 500 * time.Millisecond, + JitterFraction: 0.25, + } + for range 32 { + got := cfg.BackoffFor(1) + if got < 100*time.Millisecond || got > 125*time.Millisecond { + t.Errorf("BackoffFor(1) with jitter = %v, want in [100ms, 125ms]", got) + } + } +} + +func TestDefaultRetryConfigSafe(t *testing.T) { + cfg := DefaultRetryConfig() + if cfg.MaxAttempts < 1 { + t.Errorf("MaxAttempts must be >= 1, got %d", cfg.MaxAttempts) + } + if cfg.MaxAttempts > 3 { + t.Errorf("MaxAttempts=%d is too aggressive; legacy 3-retry pattern caused expiration-on-retry (see commit 9cf36ea)", cfg.MaxAttempts) + } + worstCase := cfg.InitialBackoff + for i := 2; i <= cfg.MaxAttempts; i++ { + worstCase += cfg.BackoffFor(i) + } + if worstCase > 2*time.Second { + t.Errorf("worst-case retry budget %v exceeds 2s; risks pushing into wallet timeout", worstCase) + } +} diff --git a/internal/upstream/router.go b/internal/upstream/router.go index 4c0118e..33e24ce 100644 --- a/internal/upstream/router.go +++ b/internal/upstream/router.go @@ -2,6 +2,7 @@ package upstream import ( "fmt" + "log/slog" "github.com/steemit/jussi/internal/config" ) @@ -37,11 +38,34 @@ func NewRouter(upstreamConfig *config.UpstreamRawConfig) (*Router, error) { // Parse upstream configuration if available if upstreamConfig != nil { router.parseUpstreamConfig() + router.logConfigSummary() } return router, nil } +// logConfigSummary emits one slog.Info per upstream describing the parsed +// URL count and the broadcast/default timeouts that will actually take +// effect at runtime. Reading this at startup is the quickest way for an +// operator to confirm "is the config I shipped really the config that's +// running?", and it's cheap (runs once per process). +func (r *Router) logConfigSummary() { + if r.upstreamConfig == nil { + return + } + for _, u := range r.upstreamConfig.Upstreams { + bcTimeout := r.getTimeoutFromConfig(u.Name + ".network_broadcast_api") + nsTimeout := r.getTimeoutFromConfig(u.Name) + slog.Info("upstream registered", + "name", u.Name, + "url_count", len(u.URLs), + "translate_to_appbase", u.TranslateToAppbase, + "timeout_default_s", nsTimeout, + "timeout_broadcast_s", bcTimeout, + ) + } +} + // GetUpstream returns upstream information for a given URN // Returns false if no upstream configuration is found func (r *Router) GetUpstream(urn string) (*UpstreamInfo, bool) { diff --git a/internal/validators/validators.go b/internal/validators/validators.go index 95b0d0c..d635ba3 100644 --- a/internal/validators/validators.go +++ b/internal/validators/validators.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" "strconv" + "strings" "github.com/steemit/jussi/internal/errors" "github.com/steemit/jussi/internal/request" @@ -170,10 +171,16 @@ func isValidParamsType(v interface{}) bool { return false } -// Broadcast transaction method names -var BroadcastTransactionMethods = map[string]bool{ - "broadcast_transaction": true, - "broadcast_transaction_synchronous": true, +// IsBroadcastTransactionRequest checks if the request is a broadcast method +// (e.g. broadcast_transaction, broadcast_transaction_synchronous, broadcast_block). +// Uses prefix matching so that any future broadcast_* method is automatically +// covered without source changes — all such methods are non-idempotent and +// must never be retried. +func IsBroadcastTransactionRequest(req *request.JSONRPCRequest) bool { + if req == nil || req.URN == nil { + return false + } + return strings.HasPrefix(req.URN.Method, "broadcast_") } // IsGetBlockRequest checks if the request is a get_block request @@ -194,14 +201,6 @@ func IsGetBlockHeaderRequest(req *request.JSONRPCRequest) bool { req.URN.Method == "get_block_header" } -// IsBroadcastTransactionRequest checks if the request is a broadcast transaction request -func IsBroadcastTransactionRequest(req *request.JSONRPCRequest) bool { - if req == nil || req.URN == nil { - return false - } - return BroadcastTransactionMethods[req.URN.Method] -} - // BlockNumFromID extracts block number from block ID (first 8 hex digits) func BlockNumFromID(blockID string) (int, error) { if len(blockID) < 8 { @@ -494,4 +493,3 @@ func LimitBroadcastTransactionRequest(req *request.JSONRPCRequest, limits map[st return LimitCustomJSONAccount(customJSONOps, blacklistAccounts) } -