Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions internal/ingestion/rtp/integration_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build integration
// +build integration

package rtp_test

Expand All @@ -20,6 +19,7 @@ import (
"github.com/zsiec/mirror/internal/ingestion/registry"
rtpListener "github.com/zsiec/mirror/internal/ingestion/rtp"
"github.com/zsiec/mirror/internal/ingestion/testdata"
"github.com/zsiec/mirror/internal/logger"
"github.com/zsiec/mirror/tests"
)

Expand All @@ -30,15 +30,15 @@ func TestRTPIntegration_StreamIngestion(t *testing.T) {

// Setup
ctx := context.Background()
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)
logrusLogger := logrus.New()
logrusLogger.SetLevel(logrus.DebugLevel)

// Create test Redis registry
redisClient := tests.SetupTestRedis(t)
reg := registry.NewRedisRegistry(redisClient, logger)
reg := registry.NewRedisRegistry(redisClient, logrusLogger)

// Create buffer pool
bufferPool := buffer.NewBufferPool(1024*1024, 10, logger) // 1MB buffers
bufferPool := buffer.NewBufferPool(1024*1024, 10, logrusLogger) // 1MB buffers

// Create RTP listener config
cfg := &config.RTPConfig{
Expand All @@ -55,7 +55,9 @@ func TestRTPIntegration_StreamIngestion(t *testing.T) {
Supported: []string{"h264", "hevc"},
Preferred: "hevc",
}
listener := rtpListener.NewListener(cfg, codecsCfg, reg, bufferPool, logger)

log := logger.NewLogrusAdapter(logrus.NewEntry(logrusLogger))
listener := rtpListener.NewListener(cfg, codecsCfg, reg, log)
err := listener.Start()
require.NoError(t, err)
defer listener.Stop()
Expand Down Expand Up @@ -228,7 +230,8 @@ func TestRTPIntegration_StreamIngestion(t *testing.T) {
Supported: []string{"h264", "hevc"},
Preferred: "hevc",
}
testListener := rtpListener.NewListener(testCfg, codecsCfg, reg, bufferPool, logger)

testListener := rtpListener.NewListener(testCfg, codecsCfg, reg, log)
testListener.SetTestTimeouts(500*time.Millisecond, 1*time.Second) // Cleanup every 500ms, timeout after 1s

err := testListener.Start()
Expand Down Expand Up @@ -330,14 +333,11 @@ func TestRTPIntegration_Performance(t *testing.T) {
}

// Setup
logger := logrus.New()
logrusLogger := logrus.New()

// Create test Redis registry
redisClient := tests.SetupTestRedis(t)
reg := registry.NewRedisRegistry(redisClient, logger)

// Create buffer pool
bufferPool := buffer.NewBufferPool(1024*1024, 10, logger)
reg := registry.NewRedisRegistry(redisClient, logrusLogger)

// Create RTP listener
cfg := &config.RTPConfig{
Expand All @@ -353,7 +353,10 @@ func TestRTPIntegration_Performance(t *testing.T) {
Supported: []string{"h264", "hevc"},
Preferred: "hevc",
}
listener := rtpListener.NewListener(cfg, codecsCfg, reg, bufferPool, logger)

log := logger.NewLogrusAdapter(logrus.NewEntry(logrusLogger))

listener := rtpListener.NewListener(cfg, codecsCfg, reg, log)
err := listener.Start()
require.NoError(t, err)
defer listener.Stop()
Expand Down
37 changes: 20 additions & 17 deletions internal/ingestion/srt/integration_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build integration
// +build integration

package srt_test

Expand All @@ -20,6 +19,7 @@ import (
"github.com/zsiec/mirror/internal/ingestion/registry"
"github.com/zsiec/mirror/internal/ingestion/srt"
"github.com/zsiec/mirror/internal/ingestion/testdata"
"github.com/zsiec/mirror/internal/logger"
"github.com/zsiec/mirror/tests"
)

Expand All @@ -30,15 +30,15 @@ func TestSRTIntegration_StreamIngestion(t *testing.T) {

// Setup
ctx := context.Background()
logger := logrus.New()
logger.SetLevel(logrus.DebugLevel)
logrusLogger := logrus.New()
logrusLogger.SetLevel(logrus.DebugLevel)

// Create test Redis registry
redisClient := tests.SetupTestRedis(t)
reg := registry.NewRedisRegistry(redisClient, logger)
reg := registry.NewRedisRegistry(redisClient, logrusLogger)

// Create buffer pool
bufferPool := buffer.NewBufferPool(1024*1024, 10, logger) // 1MB buffers
bufferPool := buffer.NewBufferPool(1024*1024, 10, logrusLogger) // 1MB buffers

// Create SRT listener config
cfg := &config.SRTConfig{
Expand All @@ -58,9 +58,12 @@ func TestSRTIntegration_StreamIngestion(t *testing.T) {
Supported: []string{"h264", "hevc"},
Preferred: "hevc",
}
listener := srt.NewListener(cfg, codecsCfg, reg, bufferPool, logger)
// Set faster stats update for testing (500ms instead of 5s)
listener.SetTestStatsInterval(500 * time.Millisecond)

adapter := srt.NewHaivisionAdapter()

log := logger.NewLogrusAdapter(logrus.NewEntry(logrusLogger))
listener := srt.NewListenerWithAdapter(cfg, codecsCfg, reg, adapter, log)

err := listener.Start()
require.NoError(t, err)
defer listener.Stop()
Expand Down Expand Up @@ -278,14 +281,11 @@ func TestSRTIntegration_Metrics(t *testing.T) {

// Setup
ctx := context.Background()
logger := logrus.New()
logrusLogger := logrus.New()

// Create test Redis registry
redisClient := tests.SetupTestRedis(t)
reg := registry.NewRedisRegistry(redisClient, logger)

// Create buffer pool
bufferPool := buffer.NewBufferPool(1024*1024, 10, logger)
reg := registry.NewRedisRegistry(redisClient, logrusLogger)

// Create SRT listener
cfg := &config.SRTConfig{
Expand All @@ -304,9 +304,11 @@ func TestSRTIntegration_Metrics(t *testing.T) {
Supported: []string{"h264", "hevc"},
Preferred: "hevc",
}
listener := srt.NewListener(cfg, codecsCfg, reg, bufferPool, logger)
// Set faster stats update for testing (500ms instead of 5s)
listener.SetTestStatsInterval(500 * time.Millisecond)
adapter := srt.NewHaivisionAdapter()

log := logger.NewLogrusAdapter(logrus.NewEntry(logrusLogger))
listener := srt.NewListenerWithAdapter(cfg, codecsCfg, reg, adapter, log)

err := listener.Start()
require.NoError(t, err)
defer listener.Stop()
Expand Down Expand Up @@ -354,6 +356,7 @@ func TestSRTIntegration_Metrics(t *testing.T) {
assert.Greater(t, stream.PacketsReceived, int64(0))

// Check listener metrics
assert.Equal(t, 1, listener.GetActiveSessions())
assert.Equal(t, 1, listener.GetActiveConnections())

})
}
Loading