Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 115 additions & 10 deletions internal/handlers/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"strings"
"sync"
"time"

"github.com/steemit/jussi/internal/cache"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/steemit/jussi/internal/request"
"github.com/steemit/jussi/internal/telemetry"
"github.com/steemit/jussi/internal/upstream"
"github.com/steemit/jussi/internal/validators"
"github.com/steemit/jussi/internal/ws"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand All @@ -28,6 +30,89 @@ var (
BatchSize = telemetry.BatchSize
)

// Upstream timeout policy.
//
// defaultUpstreamTimeout is the fallback when the upstream config sets
// timeout=0 (no per-URN entry matched). 15s covers ~5 Steem blocks and
// keeps idempotent reads from hanging forever if a per-URN timeout was
// forgotten.
//
// broadcastMinimumTimeout is the lower bound for broadcast_transaction*
// methods regardless of config. Synchronous broadcast must wait for a
// block (~3s on Steem); when an upstream node is slow or hung, ALB will
// keep the connection open and jussi must wait until it succeeds or until
// the timeout fires. Legacy jussi had timeout=0 (no limit) here and used
// retry to mask hung backends — we keep a finite limit but raise it to
// 30s so the request can survive ~10 blocks of upstream slowness without
// being clipped, which is what surfaces as "broadcast timeout" to wallets.
const (
defaultUpstreamTimeout = 15 * time.Second
broadcastMinimumTimeout = 30 * time.Second
)

// selectUpstreamTimeout resolves the effective upstream timeout for a request.
// It honours the configured per-URN timeout, falls back to defaultUpstreamTimeout
// when unset, and raises broadcast_transaction* requests to at least
// broadcastMinimumTimeout regardless of config (since clipping a synchronous
// broadcast can leave the transaction in flight on the upstream while the
// caller retries with a fresh expiration).
//
// When the broadcast floor actually overrides a smaller configured value
// it logs once per method per broadcastFloorLogInterval — operators want
// to know a config typo (or a too-aggressive timeout) is being silently
// rescued by the safety net, but only once, not on every request.
func selectUpstreamTimeout(req *request.JSONRPCRequest) time.Duration {
var configured time.Duration
if req != nil && req.Upstream != nil {
configured = time.Duration(req.Upstream.Timeout) * time.Second
}
timeout := configured
if timeout <= 0 {
timeout = defaultUpstreamTimeout
}
if validators.IsBroadcastTransactionRequest(req) && timeout < broadcastMinimumTimeout {
method := "unknown"
if req != nil && req.URN != nil {
method = req.URN.Method
}
if shouldLogBroadcastFloor(method) {
slog.Info("broadcast upstream timeout floored",
"method", method,
"configured_s", configured.Seconds(),
"applied_s", broadcastMinimumTimeout.Seconds(),
)
}
timeout = broadcastMinimumTimeout
}
return timeout
}

// broadcastFloorLogInterval throttles "timeout floored" logs to one
// emission per method per interval. The point is observability, not a
// per-request audit trail.
const broadcastFloorLogInterval = time.Minute

var broadcastFloorLastLog sync.Map // map[string]time.Time keyed by method

func shouldLogBroadcastFloor(method string) bool {
now := time.Now()
for {
actual, loaded := broadcastFloorLastLog.Load(method)
if loaded {
if lastT, ok := actual.(time.Time); ok && now.Sub(lastT) < broadcastFloorLogInterval {
return false
}
if broadcastFloorLastLog.CompareAndSwap(method, actual, now) {
return true
}
} else {
if _, loaded := broadcastFloorLastLog.LoadOrStore(method, now); !loaded {
return true
}
}
}
}

// RequestProcessor processes JSON-RPC requests
type RequestProcessor struct {
cacheGroup *cache.CacheGroup
Expand Down Expand Up @@ -306,6 +391,21 @@ func (p *RequestProcessor) ProcessSingleRequest(ctx context.Context, jsonrpcReq
// Mark span as error
telemetry.RecordSpanError(span, fmt.Errorf("upstream returned error: %v", errField["message"]))
RequestsTotal.WithLabelValues(jsonrpcReq.URN.Namespace, jsonrpcReq.URN.Method, "error").Inc()

// For broadcast methods, emit a structured warn log alongside the
// span. Broadcast traffic is low-volume and these errors are the
// signal operators actually need to correlate "wallets are seeing
// failed transactions" with an upstream incident — without paging
// through jaeger trace by trace.
if validators.IsBroadcastTransactionRequest(jsonrpcReq) {
slog.Warn("broadcast upstream returned error",
"method", jsonrpcReq.URN.Method,
"namespace", jsonrpcReq.URN.Namespace,
"upstream_url", upstreamURL,
"jussi_request_id", jsonrpcReq.JussiRequestID,
"upstream_message", fmt.Sprintf("%v", errField["message"]),
)
}
} else {
telemetry.SetSpanSuccess(span)
RequestsTotal.WithLabelValues(jsonrpcReq.URN.Namespace, jsonrpcReq.URN.Method, "success").Inc()
Expand All @@ -322,24 +422,29 @@ func getProtocol(url string) string {
return "http"
}

// callHTTPUpstream calls HTTP upstream
// callHTTPUpstream calls HTTP upstream with policy tuned to the request type:
// - broadcast_transaction* methods are non-idempotent and MUST NOT be
// retried (a transient transport error could otherwise cause the same
// transaction to be submitted twice). They get a single attempt with
// the broadcast-minimum timeout enforced by selectUpstreamTimeout.
// - All other methods are treated as idempotent and use RequestWithRetry
// with DefaultRetryConfig (≤2 attempts, ~100-500ms backoff). This
// mirrors what legacy jussi did to mask transient ALB/steemd blips,
// but with a tight enough budget to avoid the expiration-on-retry
// pattern that motivated commit 9cf36ea.
func (p *RequestProcessor) callHTTPUpstream(ctx context.Context, jsonrpcReq *request.JSONRPCRequest, url string) (map[string]interface{}, error) {
payload := jsonrpcReq.ToUpstreamRequest()
headers := jsonrpcReq.UpstreamHeaders()

// Create timeout context
// A timeout of 0 in the upstream config means "use default".
// Previously 0 meant "no timeout" which could cause requests to
// hang indefinitely (e.g. broadcast_transaction_synchronous).
timeout := time.Duration(jsonrpcReq.Upstream.Timeout) * time.Second
if timeout == 0 {
timeout = 3 * time.Second // safe default for all requests
}
timeout := selectUpstreamTimeout(jsonrpcReq)
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()

return p.httpClient.Request(ctx, url, payload, headers)
if validators.IsBroadcastTransactionRequest(jsonrpcReq) {
return p.httpClient.Request(ctx, url, payload, headers)
}
return p.httpClient.RequestWithRetry(ctx, url, payload, headers, upstream.DefaultRetryConfig())
}

// TODO: WebSocket support - temporarily disabled
Expand Down
166 changes: 166 additions & 0 deletions internal/handlers/processor_timeout_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package handlers

import (
"sync"
"testing"
"time"

"github.com/steemit/jussi/internal/request"
"github.com/steemit/jussi/internal/urn"
)

func TestSelectUpstreamTimeout(t *testing.T) {
mkReq := func(method string, cfgTimeoutSec int) *request.JSONRPCRequest {
return &request.JSONRPCRequest{
URN: &urn.URN{
Namespace: "appbase",
API: "condenser_api",
Method: method,
},
Upstream: &request.UpstreamConfig{
Timeout: cfgTimeoutSec,
},
}
}

tests := []struct {
name string
req *request.JSONRPCRequest
want time.Duration
}{
{
name: "non-broadcast with explicit 5s uses configured value",
req: mkReq("get_block", 5),
want: 5 * time.Second,
},
{
name: "non-broadcast with 0 falls back to default",
req: mkReq("get_block", 0),
want: defaultUpstreamTimeout,
},
{
name: "broadcast_transaction_synchronous with 0 raised to broadcast minimum",
req: mkReq("broadcast_transaction_synchronous", 0),
want: broadcastMinimumTimeout,
},
{
name: "broadcast_transaction with 0 raised to broadcast minimum",
req: mkReq("broadcast_transaction", 0),
want: broadcastMinimumTimeout,
},
{
name: "broadcast with sub-minimum configured value raised to broadcast minimum",
req: mkReq("broadcast_transaction_synchronous", 3),
want: broadcastMinimumTimeout,
},
{
name: "broadcast with above-minimum configured value preserved",
req: mkReq("broadcast_transaction_synchronous", 60),
want: 60 * time.Second,
},
{
name: "broadcast with configured value equal to minimum preserved",
req: mkReq("broadcast_transaction_synchronous", 30),
want: broadcastMinimumTimeout,
},
{
name: "broadcast with previous-15s config raised to new 30s minimum",
req: mkReq("broadcast_transaction_synchronous", 15),
want: broadcastMinimumTimeout,
},
{
name: "broadcast_block is detected as broadcast method",
req: mkReq("broadcast_block", 0),
want: broadcastMinimumTimeout,
},
{
name: "nil request falls back to default",
req: nil,
want: defaultUpstreamTimeout,
},
{
name: "request with nil URN falls back to default",
req: &request.JSONRPCRequest{Upstream: &request.UpstreamConfig{Timeout: 0}},
want: defaultUpstreamTimeout,
},
{
name: "request with nil Upstream falls back to default",
req: &request.JSONRPCRequest{URN: &urn.URN{Namespace: "appbase", Method: "get_block"}, Upstream: nil},
want: defaultUpstreamTimeout,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := selectUpstreamTimeout(tt.req)
if got != tt.want {
t.Errorf("selectUpstreamTimeout = %v, want %v", got, tt.want)
}
})
}
}

func TestSelectUpstreamTimeoutSafetyConstants(t *testing.T) {
// Both constants must cover at least one Steem block (~3s) plus network
// and confirmation overhead. The production upstream config sets
// broadcast timeouts to 15s; keeping the safety net at the same value
// avoids surprise regressions when config and code drift.
const blockPlusHeadroom = 10 * time.Second
if defaultUpstreamTimeout < blockPlusHeadroom {
t.Errorf("defaultUpstreamTimeout=%v is too small; must cover at least one block (~3s) plus headroom", defaultUpstreamTimeout)
}
if broadcastMinimumTimeout < blockPlusHeadroom {
t.Errorf("broadcastMinimumTimeout=%v is too small; must cover at least one block (~3s) plus headroom", broadcastMinimumTimeout)
}
}

func TestShouldLogBroadcastFloor(t *testing.T) {
// Reset state before test
broadcastFloorLastLog.Clear()

// First call should always log.
if !shouldLogBroadcastFloor("broadcast_transaction") {
t.Error("first call should return true")
}
// Immediate second call should be suppressed.
if shouldLogBroadcastFloor("broadcast_transaction") {
t.Error("second call within interval should return false")
}
// Different method should log independently.
if !shouldLogBroadcastFloor("broadcast_transaction_synchronous") {
t.Error("different method should log independently")
}
}

func TestShouldLogBroadcastFloorConcurrent(t *testing.T) {
// Reset state
broadcastFloorLastLog.Clear()

// Hammer the same method from multiple goroutines.
// With LoadOrStore, at most one goroutine gets true on the first
// wave; the rest should see false.
const goroutines = 32
results := make([]bool, goroutines)
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(idx int) {
defer wg.Done()
results[idx] = shouldLogBroadcastFloor("concurrent_method")
}(i)
}
wg.Wait()

trueCount := 0
for _, r := range results {
if r {
trueCount++
}
}
if trueCount == 0 {
t.Error("expected at least one goroutine to log")
}
if trueCount > 3 {
t.Errorf("expected at most 3 true results under contention (got %d); LoadOrStore should suppress most duplicates", trueCount)
}
}
Loading
Loading