Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,20 @@ func (c *inboundCall) appendLogValues(kvs ...any) {
c.setLog(c.log().WithValues(kvs...))
}

func (c *inboundCall) mediaTimeout() error {
if c.cc == nil {
c.closeWithTimeout(true)
return psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timeout")
}
if !c.cc.GotACK() {
c.log().Warnw("Media timeout after missing ACK", errNoACK)
c.closeWithNoACK()
return psrpc.NewError(psrpc.DeadlineExceeded, errNoACK)
}
c.closeWithTimeout(false)
return nil // logged as a warning in close
}

func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip.Request, trunkID string, conf *config.Config) error {
c.mon.InviteAccept()
c.mon.CallStart()
Expand Down Expand Up @@ -857,7 +871,6 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip

c.started.Break()

var noAck = false
// Wait for the caller to terminate the call. Send regular keep alives
ticker := time.NewTicker(stateUpdateTick)
defer ticker.Stop()
Expand All @@ -877,13 +890,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
c.close(false, callDropped, "removed")
return nil
case <-c.media.Timeout():
if noAck {
c.log().Warnw("Media timeout after missing ACK", errNoACK)
c.closeWithNoACK()
return psrpc.NewError(psrpc.DeadlineExceeded, errNoACK)
}
c.closeWithTimeout()
return psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timeout")
return c.mediaTimeout()
case <-ackReceived:
ackTimeout = nil // all good, disable timeout
ackReceived = nil
Expand All @@ -892,7 +899,6 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
c.log().Warnw("Call accepted, but no ACK received", errNoACK)
// We don't need to wait for a full media timeout initially, we already know something is not quite right.
c.media.SetTimeout(min(inviteOkAckLateTimeout, c.s.conf.MediaTimeoutInitial), c.s.conf.MediaTimeout)
noAck = true
}
}
}
Expand Down Expand Up @@ -978,8 +984,7 @@ func (c *inboundCall) waitMedia(ctx context.Context) (bool, error) {
c.closeWithHangup()
return false, psrpc.NewErrorf(psrpc.Canceled, "room closed")
case <-c.media.Timeout():
c.closeWithTimeout()
return false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timed out")
return false, c.mediaTimeout()
case <-c.media.Received():
case <-delay.C:
}
Expand All @@ -1002,8 +1007,7 @@ func (c *inboundCall) waitSubscribe(ctx context.Context, timeout time.Duration)
c.closeWithHangup()
return false, psrpc.NewErrorf(psrpc.Canceled, "room closed")
case <-c.media.Timeout():
c.closeWithTimeout()
return false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timed out")
return false, c.mediaTimeout()
case <-timer.C:
c.close(false, callDropped, "cannot-subscribe")
return false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "room subscription timed out")
Expand All @@ -1029,8 +1033,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD
c.closeWithHangup()
return disp, false, nil
case <-c.media.Timeout():
c.closeWithTimeout()
return disp, false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "media timeout")
return disp, false, c.mediaTimeout()
case b, ok := <-c.dtmf:
if !ok {
c.Close()
Expand Down Expand Up @@ -1095,7 +1098,8 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {
defer c.printStats(log)
c.setStatus(status)
c.mon.CallTerminate(reason)
if error {
isWarn := error || status == callHangupMedia
if isWarn {
log.Warnw("Closing inbound call with error", nil)
} else {
log.Infow("Closing inbound call")
Expand Down Expand Up @@ -1131,8 +1135,12 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {
c.cancel()
}

func (c *inboundCall) closeWithTimeout() {
c.close(true, callDropped, "media-timeout")
func (c *inboundCall) closeWithTimeout(isError bool) {
status := callDropped
if !isError {
status = callHangupMedia
}
c.close(isError, status, "media-timeout")
}

func (c *inboundCall) closeWithNoACK() {
Expand Down Expand Up @@ -1556,6 +1564,10 @@ func (c *sipInbound) stopRinging() {
}
}

func (c *sipInbound) GotACK() bool {
return c.acked.IsBroken()
}

func (c *sipInbound) InviteACK() <-chan struct{} {
return c.acked.Watch()
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sip/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (v CallStatus) Attribute() string {
return "automation"
case CallActive:
return "active"
case CallHangup:
case CallHangup, callHangupMedia:
return "hangup"
}
}
Expand All @@ -75,7 +75,7 @@ func (v CallStatus) DisconnectReason() livekit.DisconnectReason {
switch v {
default:
return livekit.DisconnectReason_UNKNOWN_REASON
case CallHangup:
case CallHangup, callHangupMedia:
// It's the default that LK sets, but map it here explicitly to show the assumption.
return livekit.DisconnectReason_CLIENT_INITIATED
case callUnavailable:
Expand Down Expand Up @@ -107,4 +107,5 @@ const (
callMediaFailed
callAcceptFailed
callNoACK
callHangupMedia
)