Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions channel_modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ const (
ModeMixed ChannelMode = "mixed"
)

// DefaultChannelMode is applied to channels that do not yet have an explicit
// configuration. Mixed lets the dashboard dispatch locally while still
// forwarding client frames to the live target when one is configured.
const DefaultChannelMode = ModeMixed

type ChannelConfig struct {
Channel string `json:"channel"`
Mode ChannelMode `json:"mode"`
Expand Down Expand Up @@ -72,7 +77,7 @@ func (r *ChannelModeRegistry) Get(channel string) ChannelConfig {
if ok {
return cfg
}
return ChannelConfig{Channel: channel, Mode: ModeMock}
return ChannelConfig{Channel: channel, Mode: DefaultChannelMode}
}

func (r *ChannelModeRegistry) Set(cfg ChannelConfig) error {
Expand Down Expand Up @@ -101,7 +106,7 @@ func (r *ChannelModeRegistry) Set(cfg ChannelConfig) error {
}
}
if cfg.Mode == "" {
cfg.Mode = ModeMock
cfg.Mode = DefaultChannelMode
}
r.channels[cfg.Channel] = cfg
snapshot := r.snapshotLocked()
Expand Down Expand Up @@ -140,7 +145,7 @@ func (r *ChannelModeRegistry) Delete(channel string) error {
r.mu.Unlock()

listenerCfg := previous
listenerCfg.Mode = ModeMock
listenerCfg.Mode = DefaultChannelMode
listenerCfg.UpdatedAt = now
eventCfg := listenerCfg

Expand Down
4 changes: 2 additions & 2 deletions channel_modes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func TestChannelModeRegistryGetSetSnapshotAndPersistence(t *testing.T) {
if err != nil {
t.Fatalf("NewChannelModeRegistry() error = %v", err)
}
if got := reg.Get("/scores").Mode; got != ModeMock {
t.Fatalf("default mode = %s, want mock", got)
if got := reg.Get("/scores").Mode; got != DefaultChannelMode {
t.Fatalf("default mode = %s, want %s", got, DefaultChannelMode)
}
if err := reg.Set(ChannelConfig{Channel: "/scores", Mode: ModeMixed, RateCapHz: 25}); err != nil {
t.Fatalf("Set() error = %v", err)
Expand Down
6 changes: 3 additions & 3 deletions frontend/src/components/Sidebar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ function SavedChannelsPanel({ showToast }: { showToast: (message: string, kind?:
const confirm = useConfirm()
const [adding, setAdding] = useState(false)
const [channel, setChannel] = useState('')
const [mode, setMode] = useState<ChannelMode>('mock')
const [mode, setMode] = useState<ChannelMode>('mixed')
const [saving, setSaving] = useState(false)

const savedChannels = useMemo(
Expand All @@ -339,7 +339,7 @@ function SavedChannelsPanel({ showToast }: { showToast: (message: string, kind?:

const resetForm = useCallback(() => {
setChannel('')
setMode('mock')
setMode('mixed')
setAdding(false)
}, [])

Expand Down Expand Up @@ -452,7 +452,7 @@ function SavedChannelsPanel({ showToast }: { showToast: (message: string, kind?:
{cfg.channel}
</span>
<span className="px-1.5 py-0.5 rounded-sm border border-line text-[10px] uppercase text-fg-2">
{cfg.mode || 'mock'}
{cfg.mode || 'mixed'}
</span>
<button
type="button"
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/components/SocketPanel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ export function SocketPanel({
) : (
<div className="channel-mode-list">
{visibleChannels.map(item => {
const current = channelModes[item]?.mode ?? 'mock'
const current = channelModes[item]?.mode ?? 'mixed'
return (
<div className="channel-mode-row" key={item}>
<span title={item}>{item}</span>
Expand Down
38 changes: 30 additions & 8 deletions frontend/src/index.css
Original file line number Diff line number Diff line change
Expand Up @@ -425,14 +425,21 @@ svg {
font-family: var(--font-mono);
font-size: 10px;
font-weight: 700;
padding: 2px 5px;
padding: 2px 7px;
border-radius: 3px;
letter-spacing: 0.02em;
min-width: 42px;
text-align: center;
display: inline-block;
white-space: nowrap;
/* Neutral fallback so any method without a specific variant still
renders as a tinted pill instead of bare, uncolored text. */
color: var(--fg-2);
background: color-mix(in oklch, var(--fg-2) 12%, transparent);
}
.method.GET {
.method.GET,
.method.HEAD,
.method.OPTIONS {
color: var(--proxy);
background: color-mix(in oklch, var(--proxy) 14%, transparent);
}
Expand Down Expand Up @@ -690,11 +697,11 @@ svg {
.log-row-head,
.log-row {
display: grid;
grid-template-columns: 78px 70px 62px 1fr 70px 80px 70px;
gap: 10px;
padding: 0 14px;
grid-template-columns: 78px 72px 104px 1fr 70px 80px 70px;
gap: 12px;
padding: 7px 16px;
align-items: center;
height: 32px;
min-height: 36px;
border-bottom: 1px solid color-mix(in oklch, var(--line) 60%, transparent);
}
.log-row-head {
Expand Down Expand Up @@ -745,12 +752,14 @@ svg {
.tag-type {
font-size: 10px;
font-weight: 700;
padding: 2px 6px;
padding: 2px 7px;
border-radius: 3px;
min-width: 52px;
text-align: center;
letter-spacing: 0.04em;
font-family: var(--font-mono);
display: inline-block;
white-space: nowrap;
}
.tag-type.MOCK {
color: var(--mock);
Expand All @@ -774,10 +783,23 @@ svg {
.method.SUBSCRIBE,
.method.UNSUBSCRIBE,
.method.DISPATCH,
.method.ERROR {
.method.DISPATCH_BURST {
color: var(--accent);
background: color-mix(in oklch, var(--accent) 14%, transparent);
}
.method.LIVE_CONNECT {
color: var(--ok);
background: color-mix(in oklch, var(--ok) 14%, transparent);
}
.method.LIVE_RECONNECT {
color: var(--warn);
background: color-mix(in oklch, var(--warn) 14%, transparent);
}
.method.DROP,
.method.ERROR {
color: var(--err);
background: color-mix(in oklch, var(--err) 14%, transparent);
}

.status-200,
.status-201,
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/stores/useChannelModeStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export const useChannelModeStore = create<ChannelModeStore>((set, get) => ({
throw err
}
},
addChannel: async (channel, mode = 'mock', rateCapHz = 0) => {
addChannel: async (channel, mode = 'mixed', rateCapHz = 0) => {
await get().setMode(channel, mode, rateCapHz)
},
deleteChannel: async (channel) => {
Expand Down
8 changes: 8 additions & 0 deletions recordings.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,14 @@ func decodeAppSyncEnvelope(schemas *SchemaRegistry, profile AdapterProfile, data
if err := json.Unmarshal(data, &outer); err != nil {
return nil, err.Error()
}
// AppSync emits periodic keep-alives and subscription-lifecycle frames
// (ka, connection_ack, subscribe, subscribe_success, complete, error, …)
// that carry no decodable payload. Surface them as control frames with the
// type as the label instead of reporting a spurious decode error. Only
// "data" frames carry a {t, e} payload to decode.
if typ, ok := outer["type"].(string); ok && typ != "" && typ != "data" {
return &DecodedFrame{Alias: typ}, ""
}
value := outer["event"]
if value == nil {
if payload, ok := outer["payload"].(map[string]any); ok {
Expand Down
21 changes: 21 additions & 0 deletions recordings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,27 @@ func TestAppSyncDecoderReportsMissingPayload(t *testing.T) {
}
}

func TestAppSyncDecoderTreatsControlFramesAsBenign(t *testing.T) {
rec, err := NewRecorder(t.TempDir(), nil, nil, nil, false)
if err != nil {
t.Fatalf("NewRecorder() error = %v", err)
}
// AppSync sends keep-alives and subscription-lifecycle frames that carry
// no payload; they must not be reported as decode errors.
for _, typ := range []string{"ka", "connection_ack", "subscribe", "subscribe_success", "complete"} {
decoded, decodeErr := rec.decodeAppSyncProfile(
AdapterProfile{BaseAdapter: "appsync"},
[]byte(`{"type":"`+typ+`"}`),
)
if decodeErr != "" {
t.Fatalf("type %q: decodeErr = %q, want empty (control frame)", typ, decodeErr)
}
if decoded == nil || decoded.Alias != typ || decoded.TypeName != "" || decoded.PayloadJSON != nil {
t.Fatalf("type %q: decoded = %#v, want control frame labelled by type", typ, decoded)
}
}
}

func TestMixedModeLocalDispatchIsRecorded(t *testing.T) {
bus := NewEventBus()
modes, err := NewChannelModeRegistry(t.TempDir(), bus, false)
Expand Down
47 changes: 45 additions & 2 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ type SocketClient struct {
closeOnce sync.Once
droppedToClient atomic.Uint64
upstreamHeaders http.Header
upstreamHost string

mu sync.RWMutex
subscriptions map[string]string
Expand Down Expand Up @@ -320,7 +321,7 @@ func IsWebSocketRequest(r *http.Request) bool {

// extractUpstreamHeaders clones the inbound client headers, dropping the ones
// that the WebSocket handshake or HTTP transport must control on the upstream
// dial. Everything else (Authorization, Cookie, X-* etc.) is forwarded
// dial. Everything else (Authorization, Cookie, Origin, X-* etc.) is forwarded
// verbatim so the upstream sees the same request the client would send directly.
func extractUpstreamHeaders(src http.Header) http.Header {
if len(src) == 0 {
Expand All @@ -336,7 +337,6 @@ func extractUpstreamHeaders(src http.Header) http.Header {
case "Connection",
"Upgrade",
"Host",
"Origin",
"Content-Length",
"X-Ditto-Ws-Mode":
continue
Expand All @@ -348,6 +348,48 @@ func extractUpstreamHeaders(src http.Header) http.Header {
return out
}

// applyForwardingHeaders annotates the headers Ditto sends to the live target
// with the standard proxy markers so the upstream can see the original client.
// Existing values are extended (X-Forwarded-For is appended) rather than
// replaced, matching common reverse-proxy behaviour.
func applyForwardingHeaders(headers http.Header, clientHost, clientAddr string, tls bool) http.Header {
if headers == nil {
headers = make(http.Header)
}
if ip := clientIPFromRemoteAddr(clientAddr); ip != "" {
if prev := headers.Get("X-Forwarded-For"); prev != "" {
headers.Set("X-Forwarded-For", prev+", "+ip)
} else {
headers.Set("X-Forwarded-For", ip)
}
if headers.Get("X-Real-IP") == "" {
headers.Set("X-Real-IP", ip)
}
}
if clientHost != "" && headers.Get("X-Forwarded-Host") == "" {
headers.Set("X-Forwarded-Host", clientHost)
}
if headers.Get("X-Forwarded-Proto") == "" {
if tls {
headers.Set("X-Forwarded-Proto", "wss")
} else {
headers.Set("X-Forwarded-Proto", "ws")
}
}
return headers
}

func clientIPFromRemoteAddr(addr string) string {
if addr == "" {
return ""
}
host, _, err := net.SplitHostPort(addr)
if err != nil {
return addr
}
return host
}

func shouldProxyWebSocket(r *http.Request) bool {
mode := strings.ToLower(strings.TrimSpace(r.Header.Get("X-Ditto-WS-Mode")))
queryMode := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("__ditto_ws")))
Expand Down Expand Up @@ -503,6 +545,7 @@ func (h *SocketHub) ServeHTTP(w http.ResponseWriter, r *http.Request) {
done: make(chan struct{}),
subscriptions: make(map[string]string),
upstreamHeaders: extractUpstreamHeaders(r.Header),
upstreamHost: r.Host,
}
h.addClient(client)
h.publishSocketEvent("CONNECT", r.URL.RequestURI(), http.StatusSwitchingProtocols, "", 0)
Expand Down
74 changes: 73 additions & 1 deletion ws_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,23 @@ func (ch *liveChannel) run() {
client := ch.firstClient()
var subprotocols []string
var headers http.Header
var clientHost, clientAddr string
if client != nil {
subprotocols = client.protocol.Subprotocols()
headers = client.upstreamHeaders
headers = cloneHeaders(client.upstreamHeaders)
clientHost = client.upstreamHost
clientAddr = client.remoteAddr
}
headers = applyForwardingHeaders(headers, clientHost, clientAddr, isSecureTarget(target))
ch.bridge.hub.publishSocketEventWithSource(
"LIVE_DIAL",
ch.channel,
0,
fmt.Sprintf("target=%s client_host=%q client_addr=%q subprotocols=%v headers=%s",
target, clientHost, clientAddr, subprotocols, summarizeHeaders(headers)),
0,
"live",
)
conn, _, err := websocket.Dial(ch.ctx, target, &websocket.DialOptions{
Subprotocols: subprotocols,
HTTPHeader: headers,
Expand Down Expand Up @@ -318,6 +331,65 @@ func nextLiveBackoff(current time.Duration) time.Duration {
return next
}

// cloneHeaders returns a shallow copy of h. Used before mutating headers that
// belong to the SocketClient so subsequent dial attempts start from the same
// baseline rather than accumulating forwarding markers.
func cloneHeaders(h http.Header) http.Header {
if h == nil {
return make(http.Header)
}
out := make(http.Header, len(h))
for k, vs := range h {
copied := make([]string, len(vs))
copy(copied, vs)
out[k] = copied
}
return out
}

// isSecureTarget reports whether the live target uses a TLS-encrypted scheme
// (wss/https) so the X-Forwarded-Proto header reflects the upstream transport.
func isSecureTarget(target string) bool {
u, err := url.Parse(target)
if err != nil {
return false
}
scheme := strings.ToLower(u.Scheme)
return scheme == "wss" || scheme == "https"
}

// summarizeHeaders returns a one-line redacted view of the headers Ditto
// forwards to the upstream live target. Authorization / Cookie / API key
// values are truncated so logs do not leak credentials but still indicate
// whether the header was present.
func summarizeHeaders(h http.Header) string {
if len(h) == 0 {
return "{}"
}
keys := make([]string, 0, len(h))
for k := range h {
keys = append(keys, k)
}
parts := make([]string, 0, len(keys))
for _, k := range keys {
vs := h.Values(k)
v := ""
if len(vs) > 0 {
v = vs[0]
}
kl := strings.ToLower(k)
if kl == "authorization" || kl == "cookie" || strings.Contains(kl, "api-key") || strings.Contains(kl, "token") {
if len(v) > 12 {
v = v[:8] + "…(" + fmt.Sprintf("%d", len(v)) + "b)"
} else if v != "" {
v = "…(" + fmt.Sprintf("%d", len(v)) + "b)"
}
}
parts = append(parts, fmt.Sprintf("%s=%q", k, v))
}
return "{" + strings.Join(parts, " ") + "}"
}

func RegisterLiveTargetRoutes(mux *http.ServeMux, manager *LiveTargetManager) {
if manager == nil {
return
Expand Down
Loading
Loading