diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 95a55a94..cf21ad34 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -32,6 +32,7 @@ import ( "github.com/frostbyte73/core" "github.com/icholy/digest" + psdp "github.com/pion/sdp/v3" "github.com/pkg/errors" "github.com/livekit/media-sdk/dtmf" @@ -859,6 +860,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncrypt if err != nil { return nil, err } + c.cc.nextSDPVersion = answer.SDP.Origin.SessionVersion + 1 c.mon.SDPSize(len(answerData), false) c.log.Debugw("SDP answer", "sdp", string(answerData)) @@ -1245,6 +1247,48 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, heade } +func (c *inboundCall) holdCall(ctx context.Context) error { + c.log.Infow("holding inbound call") + + // Disable media timeout during hold to prevent call termination + if c.media != nil { + c.media.EnableTimeout(false) + c.log.Infow("media timeout disabled for hold") + } + + err := c.cc.holdCall(ctx) + if err != nil { + c.log.Infow("inbound call failed to hold", "error", err) + // Re-enable timeout if hold failed + if c.media != nil { + c.media.EnableTimeout(true) + } + return err + } + + c.log.Infow("inbound call held") + return nil +} + +func (c *inboundCall) unholdCall(ctx context.Context) error { + c.log.Infow("unholding inbound call") + + err := c.cc.unholdCall(ctx) + if err != nil { + c.log.Infow("inbound call failed to unhold", "error", err) + return err + } + + // Re-enable media timeout after unhold + if c.media != nil { + c.media.EnableTimeout(true) + c.log.Infow("media timeout re-enabled after unhold") + } + + c.log.Infow("inbound call unheld") + return nil +} + func (s *Server) newInbound(log logger.Logger, id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound { c := &sipInbound{ log: log, @@ -1296,6 +1340,7 @@ type sipInbound struct { ringing chan struct{} acked core.Fuse setHeaders setHeadersFunc + nextSDPVersion uint64 } func (c *sipInbound) ValidateInvite() error { @@ -1744,3 +1789,169 @@ func (c *sipInbound) CloseWithStatus(code sip.StatusCode, status string) { c.drop() } } + +func (c *sipInbound) setMediaDirection(sdpData []byte, direction string) ([]byte, error) { + + if len(sdpData) == 0 { + return sdpData, nil + } + + // Parse SDP using the base Parse function (works for both offers and answers) + desc, err := sdp.Parse(sdpData) + if err != nil { + return nil, fmt.Errorf("failed to parse SDP: %w", err) + } + + // Modify direction attributes in each media description + for _, mediaDesc := range desc.SDP.MediaDescriptions { + if mediaDesc == nil { + continue + } + + // Find and remove existing direction attributes + var newAttributes []psdp.Attribute + for _, attr := range mediaDesc.Attributes { + // Keep all attributes except direction-related ones + if attr.Key != "sendrecv" && attr.Key != "sendonly" && + attr.Key != "recvonly" && attr.Key != "inactive" { + newAttributes = append(newAttributes, attr) + } + } + + // Add the new direction attribute + newAttributes = append(newAttributes, psdp.Attribute{ + Key: direction, + Value: "", + }) + + mediaDesc.Attributes = newAttributes + } + + // Set session version to current value plus current unix timestamp + desc.SDP.Origin.SessionVersion = c.nextSDPVersion + c.nextSDPVersion += 1 + + // Marshal back to bytes + modifiedSDP, err := desc.SDP.Marshal() + if err != nil { + return nil, fmt.Errorf("failed to marshal modified SDP: %w", err) + } + + return modifiedSDP, nil +} + +func (c *sipInbound) holdCall(ctx context.Context) error { + c.mu.Lock() + + if c.invite == nil || c.inviteOk == nil { + c.mu.Unlock() + return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't hold non established call") + } + + // Create INVITE with SDP modified for hold (a=sendonly) + req := sip.NewRequest(sip.INVITE, c.invite.Recipient) + c.setCSeq(req) + + // Copy headers from original INVITE + req.AppendHeader(c.invite.From()) + req.AppendHeader(c.invite.To()) + req.AppendHeader(c.invite.CallID()) + req.AppendHeader(c.contact) + req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) + req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE")) + + // Modify SDP to set direction to sendonly (hold) + sdpOffer := c.inviteOk.Body() + if len(sdpOffer) > 0 { + modifiedSDP, err := c.setMediaDirection(sdpOffer, "sendonly") + if err != nil { + return err + } + req.SetBody(modifiedSDP) + } + + c.swapSrcDst(req) + c.mu.Unlock() + + // Send the INVITE request + tx, err := c.Transaction(req) + if err != nil { + return err + } + defer tx.Terminate() + + resp, err := sipResponse(ctx, tx, c.s.closing.Watch(), nil) + if err != nil { + return err + } + + if resp.StatusCode != sip.StatusOK { + return &livekit.SIPStatus{Code: livekit.SIPStatusCode(resp.StatusCode)} + } + + // Send ACK for the hold INVITE + ack := sip.NewAckRequest(req, resp, nil) + if err := c.WriteRequest(ack); err != nil { + return err + } + + return nil +} + +func (c *sipInbound) unholdCall(ctx context.Context) error { + c.mu.Lock() + + if c.invite == nil || c.inviteOk == nil { + c.mu.Unlock() + return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't unhold non established call") + } + + // Create INVITE with SDP modified for unhold (a=sendrecv) + req := sip.NewRequest(sip.INVITE, c.invite.Recipient) + c.setCSeq(req) + + // Copy headers from original INVITE + req.AppendHeader(c.invite.From()) + req.AppendHeader(c.invite.To()) + req.AppendHeader(c.invite.CallID()) + req.AppendHeader(c.contact) + req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) + req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE")) + + // Modify SDP to set direction to sendrecv (unhold) + sdpOffer := c.inviteOk.Body() + if len(sdpOffer) > 0 { + modifiedSDP, err := c.setMediaDirection(sdpOffer, "sendrecv") + if err != nil { + return err + } + req.SetBody(modifiedSDP) + } + + c.swapSrcDst(req) + c.mu.Unlock() + + // Send the INVITE request + tx, err := c.Transaction(req) + if err != nil { + return err + } + defer tx.Terminate() + + resp, err := sipResponse(ctx, tx, c.s.closing.Watch(), nil) + if err != nil { + return err + } + + if resp.StatusCode != sip.StatusOK { + return &livekit.SIPStatus{Code: livekit.SIPStatusCode(resp.StatusCode)} + } + + // Send ACK for the unhold INVITE + ack := sip.NewAckRequest(req, resp, nil) + if err := c.WriteRequest(ack); err != nil { + return err + } + + return nil +} diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index fc89206e..f8d5588d 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -24,6 +24,7 @@ import ( "github.com/frostbyte73/core" "github.com/icholy/digest" + psdp "github.com/pion/sdp/v3" "github.com/pkg/errors" "golang.org/x/exp/maps" @@ -524,6 +525,7 @@ func (c *outboundCall) sipSignal(ctx context.Context) error { } c.mon.SDPSize(len(sdpOfferData), true) c.log.Debugw("SDP offer", "sdp", string(sdpOfferData)) + c.cc.nextSDPVersion = sdpOffer.SDP.Origin.SessionVersion + 1 joinDur := c.mon.JoinDur() c.mon.InviteReq() @@ -649,6 +651,48 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string, head return nil } +func (c *outboundCall) holdCall(ctx context.Context) error { + c.log.Infow("holding outbound call") + + // Disable media timeout during hold to prevent call termination + if c.media != nil { + c.media.EnableTimeout(false) + c.log.Infow("media timeout disabled for hold") + } + + err := c.cc.holdCall(ctx) + if err != nil { + c.log.Infow("outbound call failed to hold", "error", err) + // Re-enable timeout if hold failed + if c.media != nil { + c.media.EnableTimeout(true) + } + return err + } + + c.log.Infow("outbound call held") + return nil +} + +func (c *outboundCall) unholdCall(ctx context.Context) error { + c.log.Infow("unholding outbound call") + + err := c.cc.unholdCall(ctx) + if err != nil { + c.log.Infow("outbound call failed to unhold", "error", err) + return err + } + + // Re-enable media timeout after unhold + if c.media != nil { + c.media.EnableTimeout(true) + c.log.Infow("media timeout re-enabled after unhold") + } + + c.log.Infow("outbound call unheld") + return nil +} + func (c *Client) newOutbound(log logger.Logger, id LocalTag, from, contact URI, displayName *string, getHeaders setHeadersFunc) *sipOutbound { from = from.Normalize() if displayName == nil { // Nothing specified, preserve legacy behavior @@ -694,6 +738,8 @@ type sipOutbound struct { referCseq uint32 referDone chan error + + nextSDPVersion uint64 } func (c *sipOutbound) From() sip.Uri { @@ -1042,3 +1088,177 @@ func (c *sipOutbound) Close() { c.drop() } } + +func (c *sipOutbound) setMediaDirection(sdpData []byte, direction string) ([]byte, error) { + + if len(sdpData) == 0 { + return sdpData, nil + } + + // Parse SDP using the base Parse function (works for both offers and answers) + desc, err := sdp.Parse(sdpData) + if err != nil { + return nil, fmt.Errorf("failed to parse SDP: %w", err) + } + + // Modify direction attributes in each media description + for _, mediaDesc := range desc.SDP.MediaDescriptions { + if mediaDesc == nil { + continue + } + + // Find and remove existing direction attributes + var newAttributes []psdp.Attribute + for _, attr := range mediaDesc.Attributes { + // Keep all attributes except direction-related ones + if attr.Key != "sendrecv" && attr.Key != "sendonly" && + attr.Key != "recvonly" && attr.Key != "inactive" { + newAttributes = append(newAttributes, attr) + } + } + + // Add the new direction attribute + newAttributes = append(newAttributes, psdp.Attribute{ + Key: direction, + Value: "", + }) + + mediaDesc.Attributes = newAttributes + } + + // Set session version to current value plus current unix timestamp + desc.SDP.Origin.SessionVersion = c.nextSDPVersion + c.nextSDPVersion += 1 + + // Marshal back to bytes + modifiedSDP, err := desc.SDP.Marshal() + if err != nil { + return nil, fmt.Errorf("failed to marshal modified SDP: %w", err) + } + + return modifiedSDP, nil +} + +func (c *sipOutbound) holdCall(ctx context.Context) error { + c.mu.Lock() + + if c.invite == nil || c.inviteOk == nil { + c.mu.Unlock() + return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't hold non established call") + } + + if c.c.closing.IsBroken() { + c.mu.Unlock() + return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't hold hung up call") + } + + // Create INVITE with SDP modified for hold (a=sendonly) + req := sip.NewRequest(sip.INVITE, c.invite.Recipient) + c.setCSeq(req) + + // Copy headers from original INVITE + req.AppendHeader(c.invite.From()) + req.AppendHeader(c.invite.To()) + req.AppendHeader(c.invite.CallID()) + req.AppendHeader(c.contact) + req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) + req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE")) + + // Modify SDP to set direction to sendrecv (hold) + sdpOffer := c.invite.Body() + if len(sdpOffer) > 0 { + modifiedSDP, err := c.setMediaDirection(sdpOffer, "sendonly") + if err != nil { + return err + } + req.SetBody(modifiedSDP) + } + + c.mu.Unlock() + + // Send the INVITE request + tx, err := c.Transaction(req) + if err != nil { + return err + } + defer tx.Terminate() + + resp, err := sipResponse(ctx, tx, c.c.closing.Watch(), nil) + if err != nil { + return err + } + + if resp.StatusCode != sip.StatusOK { + return &livekit.SIPStatus{Code: livekit.SIPStatusCode(resp.StatusCode)} + } + + // Send ACK for the hold INVITE + ack := sip.NewAckRequest(req, resp, nil) + if err := c.WriteRequest(ack); err != nil { + return err + } + + return nil +} + +func (c *sipOutbound) unholdCall(ctx context.Context) error { + c.mu.Lock() + + if c.invite == nil || c.inviteOk == nil { + c.mu.Unlock() + return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't unhold non established call") + } + + if c.c.closing.IsBroken() { + c.mu.Unlock() + return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't unhold hung up call") + } + + // Create INVITE with SDP modified for unhold (a=sendrecv) + req := sip.NewRequest(sip.INVITE, c.invite.Recipient) + c.setCSeq(req) + + // Copy headers from original INVITE + req.AppendHeader(c.invite.From()) + req.AppendHeader(c.invite.To()) + req.AppendHeader(c.invite.CallID()) + req.AppendHeader(c.contact) + req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp")) + req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE")) + + // Modify SDP to set direction to sendrecv (unhold) + sdpOffer := c.invite.Body() + if len(sdpOffer) > 0 { + modifiedSDP, err := c.setMediaDirection(sdpOffer, "sendrecv") + if err != nil { + return err + } + req.SetBody(modifiedSDP) + } + + c.mu.Unlock() + + // Send the INVITE request + tx, err := c.Transaction(req) + if err != nil { + return err + } + defer tx.Terminate() + + resp, err := sipResponse(ctx, tx, c.c.closing.Watch(), nil) + if err != nil { + return err + } + + if resp.StatusCode != sip.StatusOK { + return &livekit.SIPStatus{Code: livekit.SIPStatusCode(resp.StatusCode)} + } + + // Send ACK for the unhold INVITE + ack := sip.NewAckRequest(req, resp, nil) + if err := c.WriteRequest(ack); err != nil { + return err + } + + return nil +}