diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index f4feb89e..0c868fd0 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -656,6 +656,20 @@ func (c *inboundCall) appendLogValues(kvs ...any) { c.setLog(c.log().WithValues(kvs...)) } +func (c *inboundCall) mediaTimeout() error { + if c.cc == nil { + c.closeWithTimeout(true) + return psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timeout") + } + if !c.cc.GotACK() { + c.log().Warnw("Media timeout after missing ACK", errNoACK) + c.closeWithNoACK() + return psrpc.NewError(psrpc.DeadlineExceeded, errNoACK) + } + c.closeWithTimeout(false) + return nil // logged as a warning in close +} + func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip.Request, trunkID string, conf *config.Config) error { c.mon.InviteAccept() c.mon.CallStart() @@ -857,7 +871,6 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip c.started.Break() - var noAck = false // Wait for the caller to terminate the call. Send regular keep alives ticker := time.NewTicker(stateUpdateTick) defer ticker.Stop() @@ -877,13 +890,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip c.close(false, callDropped, "removed") return nil case <-c.media.Timeout(): - if noAck { - c.log().Warnw("Media timeout after missing ACK", errNoACK) - c.closeWithNoACK() - return psrpc.NewError(psrpc.DeadlineExceeded, errNoACK) - } - c.closeWithTimeout() - return psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timeout") + return c.mediaTimeout() case <-ackReceived: ackTimeout = nil // all good, disable timeout ackReceived = nil @@ -892,7 +899,6 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip c.log().Warnw("Call accepted, but no ACK received", errNoACK) // We don't need to wait for a full media timeout initially, we already know something is not quite right. c.media.SetTimeout(min(inviteOkAckLateTimeout, c.s.conf.MediaTimeoutInitial), c.s.conf.MediaTimeout) - noAck = true } } } @@ -978,8 +984,7 @@ func (c *inboundCall) waitMedia(ctx context.Context) (bool, error) { c.closeWithHangup() return false, psrpc.NewErrorf(psrpc.Canceled, "room closed") case <-c.media.Timeout(): - c.closeWithTimeout() - return false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timed out") + return false, c.mediaTimeout() case <-c.media.Received(): case <-delay.C: } @@ -1002,8 +1007,7 @@ func (c *inboundCall) waitSubscribe(ctx context.Context, timeout time.Duration) c.closeWithHangup() return false, psrpc.NewErrorf(psrpc.Canceled, "room closed") case <-c.media.Timeout(): - c.closeWithTimeout() - return false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timed out") + return false, c.mediaTimeout() case <-timer.C: c.close(false, callDropped, "cannot-subscribe") return false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "room subscription timed out") @@ -1029,8 +1033,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD c.closeWithHangup() return disp, false, nil case <-c.media.Timeout(): - c.closeWithTimeout() - return disp, false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timeout") + return disp, false, c.mediaTimeout() case b, ok := <-c.dtmf: if !ok { c.Close() @@ -1095,7 +1098,8 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) { defer c.printStats(log) c.setStatus(status) c.mon.CallTerminate(reason) - if error { + isWarn := error || status == callHangupMedia + if isWarn { log.Warnw("Closing inbound call with error", nil) } else { log.Infow("Closing inbound call") @@ -1131,8 +1135,12 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) { c.cancel() } -func (c *inboundCall) closeWithTimeout() { - c.close(true, callDropped, "media-timeout") +func (c *inboundCall) closeWithTimeout(isError bool) { + status := callDropped + if !isError { + status = callHangupMedia + } + c.close(isError, status, "media-timeout") } func (c *inboundCall) closeWithNoACK() { @@ -1556,6 +1564,10 @@ func (c *sipInbound) stopRinging() { } } +func (c *sipInbound) GotACK() bool { + return c.acked.IsBroken() +} + func (c *sipInbound) InviteACK() <-chan struct{} { return c.acked.Watch() } diff --git a/pkg/sip/participant.go b/pkg/sip/participant.go index 85ba9bd1..dabf5dda 100644 --- a/pkg/sip/participant.go +++ b/pkg/sip/participant.go @@ -66,7 +66,7 @@ func (v CallStatus) Attribute() string { return "automation" case CallActive: return "active" - case CallHangup: + case CallHangup, callHangupMedia: return "hangup" } } @@ -75,7 +75,7 @@ func (v CallStatus) DisconnectReason() livekit.DisconnectReason { switch v { default: return livekit.DisconnectReason_UNKNOWN_REASON - case CallHangup: + case CallHangup, callHangupMedia: // It's the default that LK sets, but map it here explicitly to show the assumption. return livekit.DisconnectReason_CLIENT_INITIATED case callUnavailable: @@ -107,4 +107,5 @@ const ( callMediaFailed callAcceptFailed callNoACK + callHangupMedia )