Skip to content

Commit a4097d9

Browse files
committed
AI: Extract audio jitter buffer to class AudioPacketCache
1 parent cb0122f commit a4097d9

File tree

2 files changed

+129
-71
lines changed

2 files changed

+129
-71
lines changed

trunk/src/app/srs_app_rtc_source.cpp

Lines changed: 107 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,7 +1205,7 @@ srs_error_t SrsRtcRtpBuilder::on_video(SrsSharedPtrMessage* msg)
12051205

12061206
// If merge Nalus, we pcakges all NALUs(samples) as one NALU, in a RTP or FUA packet.
12071207
vector<SrsRtpPacket*> pkts;
1208-
// auto free when exit
1208+
// TODO: FIXME: Should rename to pkts_disposer.
12091209
SrsUniquePtr<vector<SrsRtpPacket*>> pkts_ptr(&pkts, free_packets);
12101210

12111211
if (merge_nalus && nn_samples > 1) {
@@ -1593,12 +1593,107 @@ bool SrsRtcFrameBuilderVideoFrameDetector::is_lost_sn(uint16_t received)
15931593
return lost_sn_ == received;
15941594
}
15951595

1596+
SrsRtcFrameBuilderAudioPacketCache::SrsRtcFrameBuilderAudioPacketCache()
1597+
{
1598+
last_audio_seq_num_ = 0;
1599+
last_audio_process_time_ms_ = 0;
1600+
}
1601+
1602+
SrsRtcFrameBuilderAudioPacketCache::~SrsRtcFrameBuilderAudioPacketCache()
1603+
{
1604+
clear_all();
1605+
}
1606+
1607+
srs_error_t SrsRtcFrameBuilderAudioPacketCache::process_packet(SrsRtpPacket* src, std::vector<SrsRtpPacket*>& ready_packets)
1608+
{
1609+
srs_error_t err = srs_success;
1610+
1611+
uint16_t seq = src->header.get_sequence();
1612+
int64_t now = srs_get_system_time() / 1000;
1613+
1614+
// Initialize if this is the first packet
1615+
if (audio_buffer_.empty()) {
1616+
last_audio_seq_num_ = seq - 1;
1617+
last_audio_process_time_ms_ = now;
1618+
}
1619+
1620+
// Check if packet is too old (already processed)
1621+
if (srs_rtp_seq_distance(last_audio_seq_num_, seq) < 0) {
1622+
srs_warn("Discard late audio packet, seq=%u, last_seq=%u", seq, last_audio_seq_num_);
1623+
return err;
1624+
}
1625+
1626+
// Store packet in jitter buffer
1627+
audio_buffer_[seq] = src->copy();
1628+
1629+
// Try to process packets in the sliding window
1630+
bool force_process = audio_buffer_.size() >= AUDIO_JITTER_BUFFER_SIZE ||
1631+
(now - last_audio_process_time_ms_) > MAX_AUDIO_WAIT_MS;
1632+
uint16_t window_end = last_audio_seq_num_ + SLIDING_WINDOW_SIZE;
1633+
1634+
while (!audio_buffer_.empty()) {
1635+
std::map<uint16_t, SrsRtpPacket*>::iterator it = audio_buffer_.begin();
1636+
uint16_t next_seq = it->first;
1637+
1638+
// Check if the packet is within our sliding window
1639+
if (!force_process) {
1640+
// If packet is before window start (shouldn't happen normally)
1641+
if (srs_rtp_seq_distance(last_audio_seq_num_, next_seq) < 0) {
1642+
// Process it anyway as it's already late
1643+
srs_warn("Late audio packet, seq=%u, expected>=%u", next_seq, last_audio_seq_num_);
1644+
} else if (srs_rtp_seq_distance(next_seq, window_end) < 0) {
1645+
// If packet is beyond window end, stop processing
1646+
srs_warn("Audio packet beyond window end, seq=%u, window_end=%u", next_seq, window_end);
1647+
break;
1648+
} else if (srs_rtp_seq_distance(last_audio_seq_num_, next_seq) > 1) {
1649+
// If there's a gap and we haven't exceeded wait time, wait for missing packets
1650+
if ((now - last_audio_process_time_ms_) <= MAX_AUDIO_WAIT_MS) {
1651+
break;
1652+
}
1653+
srs_warn("Audio packet loss, expected=%u, got=%u", last_audio_seq_num_ + 1, next_seq);
1654+
}
1655+
}
1656+
1657+
// Take the packet from buffer
1658+
SrsRtpPacket* pkt = it->second;
1659+
audio_buffer_.erase(it);
1660+
1661+
// Update last sequence number
1662+
last_audio_seq_num_ = next_seq;
1663+
last_audio_process_time_ms_ = now;
1664+
1665+
// Add to ready packets for processing
1666+
ready_packets.push_back(pkt);
1667+
1668+
// Update window end for next iteration
1669+
window_end = last_audio_seq_num_ + SLIDING_WINDOW_SIZE;
1670+
}
1671+
1672+
// If buffer is getting too full, force process oldest packets
1673+
if (audio_buffer_.size() >= AUDIO_JITTER_BUFFER_SIZE * 0.8) {
1674+
srs_warn("Audio jitter buffer nearly full, size=%zu", audio_buffer_.size());
1675+
}
1676+
1677+
return err;
1678+
}
1679+
1680+
void SrsRtcFrameBuilderAudioPacketCache::clear_all()
1681+
{
1682+
std::map<uint16_t, SrsRtpPacket*>::iterator it;
1683+
for (it = audio_buffer_.begin(); it != audio_buffer_.end(); ++it) {
1684+
SrsRtpPacket* pkt = it->second;
1685+
srs_freep(pkt);
1686+
}
1687+
audio_buffer_.clear();
1688+
}
1689+
15961690
SrsRtcFrameBuilder::SrsRtcFrameBuilder(ISrsStreamBridge* bridge)
15971691
{
15981692
bridge_ = bridge;
15991693
is_first_audio_ = true;
16001694
audio_transcoder_ = NULL;
16011695
video_codec_ = SrsVideoCodecIdAVC;
1696+
audio_cache_ = new SrsRtcFrameBuilderAudioPacketCache();
16021697
video_cache_ = new SrsRtcFrameBuilderVideoPacketCache();
16031698
frame_detector_ = new SrsRtcFrameBuilderVideoFrameDetector(video_cache_);
16041699
sync_state_ = -1;
@@ -1608,6 +1703,7 @@ SrsRtcFrameBuilder::SrsRtcFrameBuilder(ISrsStreamBridge* bridge)
16081703
SrsRtcFrameBuilder::~SrsRtcFrameBuilder()
16091704
{
16101705
srs_freep(audio_transcoder_);
1706+
srs_freep(audio_cache_);
16111707
srs_freep(video_cache_);
16121708
srs_freep(frame_detector_);
16131709
srs_freep(obs_whip_vps_);
@@ -1648,6 +1744,7 @@ srs_error_t SrsRtcFrameBuilder::on_publish()
16481744

16491745
void SrsRtcFrameBuilder::on_unpublish()
16501746
{
1747+
audio_cache_->clear_all();
16511748
}
16521749

16531750
srs_error_t SrsRtcFrameBuilder::on_rtp(SrsRtpPacket *pkt)
@@ -1687,76 +1784,21 @@ srs_error_t SrsRtcFrameBuilder::packet_audio(SrsRtpPacket* src)
16871784
{
16881785
srs_error_t err = srs_success;
16891786

1690-
uint16_t seq = src->header.get_sequence();
1691-
int64_t now = srs_get_system_time() / 1000;
1692-
1693-
// Initialize if this is the first packet
1694-
if (audio_buffer_.empty()) {
1695-
last_audio_seq_num_ = seq - 1;
1696-
last_audio_process_time_ms_ = now;
1697-
}
1787+
std::vector<SrsRtpPacket*> ready_packets;
1788+
SrsUniquePtr<vector<SrsRtpPacket*>> pkts_disposer(&ready_packets, free_packets);
16981789

1699-
// Check if packet is too old (already processed)
1700-
if (srs_rtp_seq_distance(last_audio_seq_num_, seq) < 0) {
1701-
// Packet is older than what we've already processed, discard it
1702-
srs_warn("Discard late audio packet, seq=%u, last_seq=%u", seq, last_audio_seq_num_);
1703-
return err;
1790+
// Use audio cache to process packet through jitter buffer
1791+
if ((err = audio_cache_->process_packet(src, ready_packets)) != srs_success) {
1792+
return srs_error_wrap(err, "audio cache process");
17041793
}
17051794

1706-
// Store packet in jitter buffer
1707-
audio_buffer_[seq] = src->copy();
1795+
// Process all ready packets in order
1796+
for (size_t i = 0; i < ready_packets.size(); ++i) {
1797+
SrsRtpPacket* pkt = ready_packets[i];
17081798

1709-
// Try to process packets in the sliding window
1710-
bool force_process = audio_buffer_.size() >= AUDIO_JITTER_BUFFER_SIZE ||
1711-
(now - last_audio_process_time_ms_) > MAX_AUDIO_WAIT_MS;
1712-
uint16_t window_end = last_audio_seq_num_ + SLIDING_WINDOW_SIZE;
1713-
1714-
while (!audio_buffer_.empty()) {
1715-
auto it = audio_buffer_.begin();
1716-
uint16_t next_seq = it->first;
1717-
1718-
// Check if the packet is within our sliding window
1719-
if (!force_process) {
1720-
// If packet is before window start (shouldn't happen normally)
1721-
if (srs_rtp_seq_distance(last_audio_seq_num_, next_seq) < 0) {
1722-
// Process it anyway as it's already late
1723-
srs_warn("Late audio packet, seq=%u, expected>=%u", next_seq, last_audio_seq_num_);
1724-
} else if (srs_rtp_seq_distance(next_seq, window_end) < 0) {
1725-
// If packet is beyond window end, stop processing
1726-
srs_warn("Audio packet beyond window end, seq=%u, window_end=%u", next_seq, window_end);
1727-
break;
1728-
} else if (srs_rtp_seq_distance(last_audio_seq_num_, next_seq) > 1) {// If there's a gap and we haven't exceeded wait time, wait for missing packets
1729-
// If there's a gap and we haven't exceeded wait time, wait for missing packets
1730-
if ((now - last_audio_process_time_ms_) <= MAX_AUDIO_WAIT_MS) {
1731-
break;
1732-
}
1733-
srs_warn("Audio packet loss, expected=%u, got=%u", last_audio_seq_num_ + 1, next_seq);
1734-
}
1735-
}
1736-
1737-
// Process the packet
1738-
SrsRtpPacket* pkt = it->second;
1739-
audio_buffer_.erase(it);
1740-
1741-
// Update last sequence number
1742-
last_audio_seq_num_ = next_seq;
1743-
1744-
// Process the packet
17451799
if ((err = transcode_audio(pkt)) != srs_success) {
1746-
srs_freep(pkt);
17471800
return srs_error_wrap(err, "transcode audio");
17481801
}
1749-
1750-
srs_freep(pkt);
1751-
last_audio_process_time_ms_ = now;
1752-
1753-
// Update window end for next iteration
1754-
window_end = last_audio_seq_num_ + SLIDING_WINDOW_SIZE;
1755-
}
1756-
1757-
// If buffer is getting too full, force process oldest packets
1758-
if (audio_buffer_.size() >= AUDIO_JITTER_BUFFER_SIZE * 0.8) {
1759-
srs_warn("Audio jitter buffer nearly full, size=%zu", audio_buffer_.size());
17601802
}
17611803

17621804
return err;
@@ -1788,6 +1830,7 @@ srs_error_t SrsRtcFrameBuilder::transcode_audio(SrsRtpPacket *pkt)
17881830
is_first_audio_ = false;
17891831
}
17901832

1833+
// TODO: FIXME: Should use SrsUniquePtr to dispose it automatically.
17911834
std::vector<SrsAudioFrame*> out_pkts;
17921835
SrsRtpRawPayload *payload = dynamic_cast<SrsRtpRawPayload*>(pkt->payload());
17931836

trunk/src/app/srs_app_rtc_source.hpp

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,27 @@ class SrsRtcFrameBuilderVideoFrameDetector
385385
bool is_lost_sn(uint16_t received);
386386
};
387387

388+
// Audio packet cache for RTP packet jitter buffer management
389+
class SrsRtcFrameBuilderAudioPacketCache
390+
{
391+
private:
392+
// Audio jitter buffer, map sequence number to packet
393+
std::map<uint16_t, SrsRtpPacket*> audio_buffer_;
394+
// Last processed sequence number
395+
uint16_t last_audio_seq_num_;
396+
// Last time we processed the jitter buffer
397+
int64_t last_audio_process_time_ms_;
398+
public:
399+
SrsRtcFrameBuilderAudioPacketCache();
400+
virtual ~SrsRtcFrameBuilderAudioPacketCache();
401+
public:
402+
// Process audio packet through jitter buffer
403+
// Returns packets ready for transcoding in order
404+
srs_error_t process_packet(SrsRtpPacket* src, std::vector<SrsRtpPacket*>& ready_packets);
405+
// Clear all cached packets
406+
void clear_all();
407+
};
408+
388409
// Collect and build WebRTC RTP packets to AV frames.
389410
class SrsRtcFrameBuilder
390411
{
@@ -395,15 +416,9 @@ class SrsRtcFrameBuilder
395416
SrsAudioTranscoder *audio_transcoder_;
396417
SrsVideoCodecId video_codec_;
397418
private:
419+
SrsRtcFrameBuilderAudioPacketCache* audio_cache_;
398420
SrsRtcFrameBuilderVideoPacketCache* video_cache_;
399421
SrsRtcFrameBuilderVideoFrameDetector* frame_detector_;
400-
private:
401-
// Audio jitter buffer, map sequence number to packet
402-
std::map<uint16_t, SrsRtpPacket*> audio_buffer_;
403-
// Last processed sequence number
404-
uint16_t last_audio_seq_num_;
405-
// Last time we processed the jitter buffer
406-
int64_t last_audio_process_time_ms_;
407422
private:
408423
// The state for timestamp sync state. -1 for init. 0 not sync. 1 sync.
409424
int sync_state_;

0 commit comments

Comments
 (0)