Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/frostbyte73/core"
"github.com/icholy/digest"
psdp "github.com/pion/sdp/v3"
"github.com/pkg/errors"

"github.com/livekit/media-sdk/dtmf"
Expand Down Expand Up @@ -859,6 +860,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncrypt
if err != nil {
return nil, err
}
c.cc.nextSDPVersion = answer.SDP.Origin.SessionVersion + 1
c.mon.SDPSize(len(answerData), false)
c.log.Debugw("SDP answer", "sdp", string(answerData))

Expand Down Expand Up @@ -1245,6 +1247,48 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, heade

}

func (c *inboundCall) holdCall(ctx context.Context) error {
c.log.Infow("holding inbound call")

// Disable media timeout during hold to prevent call termination
if c.media != nil {
c.media.EnableTimeout(false)
c.log.Infow("media timeout disabled for hold")
}

err := c.cc.holdCall(ctx)
if err != nil {
c.log.Infow("inbound call failed to hold", "error", err)
// Re-enable timeout if hold failed
if c.media != nil {
c.media.EnableTimeout(true)
}
return err
}

c.log.Infow("inbound call held")
return nil
}

func (c *inboundCall) unholdCall(ctx context.Context) error {
c.log.Infow("unholding inbound call")

err := c.cc.unholdCall(ctx)
if err != nil {
c.log.Infow("inbound call failed to unhold", "error", err)
return err
}

// Re-enable media timeout after unhold
if c.media != nil {
c.media.EnableTimeout(true)
c.log.Infow("media timeout re-enabled after unhold")
}

c.log.Infow("inbound call unheld")
return nil
}

func (s *Server) newInbound(log logger.Logger, id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound {
c := &sipInbound{
log: log,
Expand Down Expand Up @@ -1296,6 +1340,7 @@ type sipInbound struct {
ringing chan struct{}
acked core.Fuse
setHeaders setHeadersFunc
nextSDPVersion uint64
}

func (c *sipInbound) ValidateInvite() error {
Expand Down Expand Up @@ -1744,3 +1789,169 @@ func (c *sipInbound) CloseWithStatus(code sip.StatusCode, status string) {
c.drop()
}
}

func (c *sipInbound) setMediaDirection(sdpData []byte, direction string) ([]byte, error) {

if len(sdpData) == 0 {
return sdpData, nil
}

// Parse SDP using the base Parse function (works for both offers and answers)
desc, err := sdp.Parse(sdpData)
if err != nil {
return nil, fmt.Errorf("failed to parse SDP: %w", err)
}

// Modify direction attributes in each media description
for _, mediaDesc := range desc.SDP.MediaDescriptions {
if mediaDesc == nil {
continue
}

// Find and remove existing direction attributes
var newAttributes []psdp.Attribute
for _, attr := range mediaDesc.Attributes {
// Keep all attributes except direction-related ones
if attr.Key != "sendrecv" && attr.Key != "sendonly" &&
attr.Key != "recvonly" && attr.Key != "inactive" {
newAttributes = append(newAttributes, attr)
}
}

// Add the new direction attribute
newAttributes = append(newAttributes, psdp.Attribute{
Key: direction,
Value: "",
})

mediaDesc.Attributes = newAttributes
}

// Set session version to current value plus current unix timestamp
desc.SDP.Origin.SessionVersion = c.nextSDPVersion
c.nextSDPVersion += 1

// Marshal back to bytes
modifiedSDP, err := desc.SDP.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal modified SDP: %w", err)
}

return modifiedSDP, nil
}

func (c *sipInbound) holdCall(ctx context.Context) error {
c.mu.Lock()

if c.invite == nil || c.inviteOk == nil {
c.mu.Unlock()
return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't hold non established call")
}

// Create INVITE with SDP modified for hold (a=sendonly)
req := sip.NewRequest(sip.INVITE, c.invite.Recipient)
c.setCSeq(req)

// Copy headers from original INVITE
req.AppendHeader(c.invite.From())
req.AppendHeader(c.invite.To())
req.AppendHeader(c.invite.CallID())
req.AppendHeader(c.contact)
req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE"))

// Modify SDP to set direction to sendonly (hold)
sdpOffer := c.inviteOk.Body()
if len(sdpOffer) > 0 {
modifiedSDP, err := c.setMediaDirection(sdpOffer, "sendonly")
if err != nil {
return err
}
req.SetBody(modifiedSDP)
}

c.swapSrcDst(req)
c.mu.Unlock()

// Send the INVITE request
tx, err := c.Transaction(req)
if err != nil {
return err
}
defer tx.Terminate()

resp, err := sipResponse(ctx, tx, c.s.closing.Watch(), nil)
if err != nil {
return err
}

if resp.StatusCode != sip.StatusOK {
return &livekit.SIPStatus{Code: livekit.SIPStatusCode(resp.StatusCode)}
}

// Send ACK for the hold INVITE
ack := sip.NewAckRequest(req, resp, nil)
if err := c.WriteRequest(ack); err != nil {
return err
}

return nil
}

func (c *sipInbound) unholdCall(ctx context.Context) error {
c.mu.Lock()

if c.invite == nil || c.inviteOk == nil {
c.mu.Unlock()
return psrpc.NewErrorf(psrpc.FailedPrecondition, "can't unhold non established call")
}

// Create INVITE with SDP modified for unhold (a=sendrecv)
req := sip.NewRequest(sip.INVITE, c.invite.Recipient)
c.setCSeq(req)

// Copy headers from original INVITE
req.AppendHeader(c.invite.From())
req.AppendHeader(c.invite.To())
req.AppendHeader(c.invite.CallID())
req.AppendHeader(c.contact)
req.AppendHeader(sip.NewHeader("Content-Type", "application/sdp"))
req.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE"))

// Modify SDP to set direction to sendrecv (unhold)
sdpOffer := c.inviteOk.Body()
if len(sdpOffer) > 0 {
modifiedSDP, err := c.setMediaDirection(sdpOffer, "sendrecv")
if err != nil {
return err
}
req.SetBody(modifiedSDP)
}

c.swapSrcDst(req)
c.mu.Unlock()

// Send the INVITE request
tx, err := c.Transaction(req)
if err != nil {
return err
}
defer tx.Terminate()

resp, err := sipResponse(ctx, tx, c.s.closing.Watch(), nil)
if err != nil {
return err
}

if resp.StatusCode != sip.StatusOK {
return &livekit.SIPStatus{Code: livekit.SIPStatusCode(resp.StatusCode)}
}

// Send ACK for the unhold INVITE
ack := sip.NewAckRequest(req, resp, nil)
if err := c.WriteRequest(ack); err != nil {
return err
}

return nil
}
Loading
Loading