diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a386bc1f6..aee517b5c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -425,3 +425,225 @@ jobs: Write-Output "Verifying $($file.FullName) with mkvinfo (verbose and hexdump):" mkvinfo.exe -v -X "$($file.FullName)" } + + run-benchmarks: + needs: + - build-jni + + env: + STREAM_NAME_PREFIX: producer-java-benchmarking-java-11-ubuntu + JNI_FOLDER: ${{ github.workspace }}/jni + STREAM_COUNT: 7 + STREAM_COUNT_START_STOP: 5 + STREAM_INTERVAL_MS: 10000 + + runs-on: ubuntu-22.04 + permissions: + id-token: write + contents: read + + steps: + - name: Checkout the repository + uses: actions/checkout@v4 + + - name: Set up JDK 11 + uses: actions/setup-java@v4 + with: + java-version: 11 + distribution: 'adopt' + cache: maven + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install Python dependencies + working-directory: scripts/python/benchmarking + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Download JNI Library + uses: actions/download-artifact@v4 + with: + name: jni-library-ubuntu-22.04 + path: jni/ + + - name: Build with Maven + run: mvn clean compile assembly:single + + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secrets.AWS_ROLE_TO_ASSUME }} + aws-region: ${{ secrets.AWS_REGION }} + + - name: Run the benchmarking application + run: | + set +e + + JAR_FILE=$(find target -name '*jar-with-dependencies.jar' | head -n 1) + + if [ -z "$JAR_FILE" ]; then + echo "Error: JAR file not found!" + exit 1 + fi + + echo "Using JAR file: $JAR_FILE" + java -classpath "$JAR_FILE" \ + -Daws.accessKeyId=${AWS_ACCESS_KEY_ID} \ + -Daws.secretKey=${AWS_SECRET_ACCESS_KEY} \ + -Daws.sessionToken=${AWS_SESSION_TOKEN} \ + -Djava.library.path=${JNI_FOLDER} \ + -Dkvs-stream=${STREAM_NAME_PREFIX} \ + -Dlog4j.configurationFile=log4j2.xml \ + -Dstream-count=${STREAM_COUNT} \ + -Dstream-interval-ms=${STREAM_INTERVAL_MS} \ + com.amazonaws.kinesisvideo.demoapp.DemoAppBenchmarking & + JAVA_PID=$! + BENCHMARKING_FILE="process_${JAVA_PID}_metrics.txt" + echo "BENCHMARKING_FILE=$BENCHMARKING_FILE" >> $GITHUB_ENV + + python ./scripts/python/benchmarking/capture_rss_and_cpu.py $JAVA_PID & + MONITOR_PID=$! + + # Wait for maximum of STREAM_COUNT * STREAM_INTERVAL_MS + 30 seconds + TIMEOUT=$((STREAM_COUNT * STREAM_INTERVAL_MS / 1000 + 30)) + for ((i=1; i<=TIMEOUT; i++)); do + if ! kill -0 $JAVA_PID 2>/dev/null; then + break + fi + sleep 1 + if [ $i -eq $TIMEOUT ]; then + # Time's up, kill the process + kill -9 $JAVA_PID + fi + done + + wait $JAVA_PID + EXIT_CODE=$? + wait $MONITOR_PID + + # Run the benchmarking app in start/stop mode. + java -classpath "$JAR_FILE" \ + -Daws.accessKeyId=${AWS_ACCESS_KEY_ID} \ + -Daws.secretKey=${AWS_SECRET_ACCESS_KEY} \ + -Daws.sessionToken=${AWS_SESSION_TOKEN} \ + -Djava.library.path=${JNI_FOLDER} \ + -Dkvs-stream=${STREAM_NAME_PREFIX} \ + -Dlog4j.configurationFile=log4j2.xml \ + -Dstream-count=${STREAM_COUNT_START_STOP} \ + -Dstream-interval-ms=${STREAM_INTERVAL_MS} \ + -Ddo-start-stop=true \ + com.amazonaws.kinesisvideo.demoapp.DemoAppBenchmarking & + JAVA_PID=$! + BENCHMARKING_FILE_START_STOP="process_${JAVA_PID}_metrics.txt" + echo "BENCHMARKING_FILE_START_STOP=$BENCHMARKING_FILE_START_STOP" >> $GITHUB_ENV + + python ./scripts/python/benchmarking/capture_rss_and_cpu.py $JAVA_PID & + MONITOR_PID=$! + + # Wait for maximum of expected duration + 30 seconds. + # Note: Start/stop mode runs each stream for 3 intervals, and has an extra 120 seconds of sleep to allow for the streams to stabilize. + TIMEOUT=$((STREAM_COUNT_START_STOP * STREAM_INTERVAL_MS * 3 / 1000 + 120 + 30)) + for ((i=1; i<=TIMEOUT; i++)); do + if ! kill -0 $JAVA_PID 2>/dev/null; then + break + fi + sleep 1 + if [ $i -eq $TIMEOUT ]; then + # Time's up, kill the process + kill -9 $JAVA_PID + fi + done + + wait $JAVA_PID + EXIT_CODE=$? + wait $MONITOR_PID + + set -e + + # Check if the process was forcefully killed + if [ $EXIT_CODE -eq 124 ]; then + echo "Error: Benchmarking application exceeded time limit and was forcefully terminated." + exit 1 + fi + + # Preserve original exit code + echo "Process exited with code: $EXIT_CODE" + exit $EXIT_CODE + + shell: bash + + - name: Check uploaded media (benchmarking) + working-directory: scripts/python/getmediavalidation/bin + run: | + for ((i = 0; i < STREAM_COUNT; i++)); do + stream="${STREAM_NAME_PREFIX}_${i}" + echo "Validating stream: $stream" + + python ./fetch_fragment_info.py --stream-name "$stream" --last 5m + + python ./validate_media.py --stream-name "$stream" \ + --keyframe-interval 25 \ + -fps 25 \ + --frames-path "${{ github.workspace }}/src/main/resources/data/h264/*.h264" \ + --last 5m + done + + - name: Generate memory and CPU graph + run: | + COMMIT_HASH=$(git rev-parse --short HEAD) + + DATA_FILE="${BENCHMARKING_FILE}" + + if [ ! -f "$DATA_FILE" ]; then + echo "Error: Data file $DATA_FILE not found!" + exit 1 + fi + + # Key points for each stream + KEY_POINTS_ARGS=() + + for ((i = 0; i < STREAM_COUNT; i++)); do + TIME_SECONDS=$(awk "BEGIN { printf \"%.2f\", $i * $STREAM_INTERVAL_MS / 1000 }") + LABEL="Stream $((i + 1))" + KEY_POINTS_ARGS+=(--key-points "$TIME_SECONDS" "$LABEL") + done + + python ./scripts/python/benchmarking/plot_rss_and_cpu.py "$DATA_FILE" \ + --title "Memory and CPU Usage (Benchmarking ${COMMIT_HASH})\n Ubuntu, Java 11" \ + --output "benchmarking-mem-cpu.png" \ + "${KEY_POINTS_ARGS[@]}" + + + DATA_FILE="${BENCHMARKING_FILE_START_STOP}" + + if [ ! -f "$DATA_FILE" ]; then + echo "Error: Data file $DATA_FILE not found!" + exit 1 + fi + + # Key points for each stream + KEY_POINTS_ARGS=() + + python ./scripts/python/benchmarking/plot_rss_and_cpu.py "$DATA_FILE" \ + --title "Start/Stop Memory and CPU Usage (Benchmarking ${COMMIT_HASH})\n Ubuntu, Java 11" \ + --output "benchmarking-start-stop-mem-cpu.png" \ + + shell: bash + + - name: Upload memory and CPU graph (Mac and Linux) + uses: actions/upload-artifact@v4 + with: + name: benchmarking-mem-cpu.png + path: benchmarking-mem-cpu.png + retention-days: 7 + + - name: Upload start-stop memory and CPU graph (Mac and Linux) + uses: actions/upload-artifact@v4 + with: + name: benchmarking-start-stop-mem-cpu.png + path: benchmarking-start-stop-mem-cpu.png + retention-days: 7 diff --git a/src/main/demo/com/amazonaws/kinesisvideo/demoapp/DemoAppBenchmarking.java b/src/main/demo/com/amazonaws/kinesisvideo/demoapp/DemoAppBenchmarking.java new file mode 100644 index 000000000..e85ff2e6c --- /dev/null +++ b/src/main/demo/com/amazonaws/kinesisvideo/demoapp/DemoAppBenchmarking.java @@ -0,0 +1,140 @@ +package com.amazonaws.kinesisvideo.demoapp; + +import com.amazonaws.kinesisvideo.client.IPVersionFilter; +import com.amazonaws.kinesisvideo.client.KinesisVideoClient; +import com.amazonaws.kinesisvideo.internal.client.mediasource.MediaSource; +import com.amazonaws.kinesisvideo.common.exception.KinesisVideoException; +import com.amazonaws.kinesisvideo.demoapp.auth.AuthHelper; +import com.amazonaws.kinesisvideo.java.client.KinesisVideoJavaClientFactory; +import com.amazonaws.kinesisvideo.java.mediasource.file.ImageFileMediaSource; +import com.amazonaws.kinesisvideo.java.mediasource.file.ImageFileMediaSourceConfiguration; +import com.amazonaws.regions.Regions; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Demo Java Producer. + */ +public final class DemoAppBenchmarking { + + private static final Logger log = LogManager.getLogger(DemoAppBenchmarking.class); + + // Use a different stream name when testing audio/video sample + private static final String STREAM_NAME = Optional.ofNullable(System.getProperty("kvs-stream")).orElse(""); + private static final int FPS_25 = 25; + private static final String IMAGE_DIR = "src/main/resources/data/h264/"; + // CHECKSTYLE:SUPPRESS:LineLength + // Need to get key frame configured properly so the output can be decoded. h264 files can be decoded using gstreamer plugin + // gst-launch-1.0 rtspsrc location="YourRtspUri" short-header=TRUE protocols=tcp ! rtph264depay ! decodebin ! videorate ! videoscale ! vtenc_h264_hw allow-frame-reordering=FALSE max-keyframe-interval=25 bitrate=1024 realtime=TRUE ! video/x-h264,stream-format=avc,alignment=au,profile=baseline,width=640,height=480,framerate=1/25 ! multifilesink location=./frame-%03d.h264 index=1 + private static final String IMAGE_FILENAME_FORMAT = "frame-%03d.h264"; + private static final int START_FILE_INDEX = 1; + private static final int END_FILE_INDEX = 375; + + private static final int STREAM_COUNT = Integer.parseInt(System.getProperty("stream-count")); + // private static final int STREAM_INTERVALED_COUNT = Integer.parseInt(System.getProperty("stream-intervaled-count")); + private static final int STREAM_INTERVAL_MS = Integer.parseInt(System.getProperty("stream-interval-ms")); + private static final boolean DO_START_STOP = Boolean.parseBoolean(System.getProperty("do-start-stop", "false")); + + private DemoAppBenchmarking() { + throw new UnsupportedOperationException(); + } + + public static void main(final String[] args) { + try { + // create Kinesis Video high level client + final KinesisVideoClient kinesisVideoClient = KinesisVideoJavaClientFactory + .createKinesisVideoClient( + Regions.US_WEST_2, + AuthHelper.getSystemPropertiesCredentialsProvider(), + null, + true, IPVersionFilter.IPV4_AND_IPV6); + + // Create an array of media sources + final MediaSource[] mediaSources = new MediaSource[STREAM_COUNT]; + + for (int i = 0; i < mediaSources.length; i++) { + // create a stream + // create a media source. this class produces the data and pushes it into + // Kinesis Video Producer lower level components + mediaSources[i] = createImageFileMediaSource(String.valueOf(i)); + + // register media source with Kinesis Video Client + kinesisVideoClient.registerMediaSource(mediaSources[i]); + + // start streaming + mediaSources[i].start(); + + // sleep for the interval + log.warn("Sleeping for {} ms", STREAM_INTERVAL_MS); + Thread.sleep(STREAM_INTERVAL_MS); + + } + + // Stop and start streams if doing start/stop + if (DO_START_STOP) { + log.warn("Starting to stop and start streams"); + for (int i = 0; i < mediaSources.length; i++) { + log.warn("Stopping stream {}", i); + kinesisVideoClient.unregisterMediaSource(mediaSources[i]); + log.warn("Sleeping for {} ms", 10); + Thread.sleep(10); + } + + log.warn("Sleeping for 300 seconds to allow streams to stabilize"); + Thread.sleep(300000); + + for (int i = 0; i < mediaSources.length; i++) { + log.warn("Starting stream {}", i); + + mediaSources[i] = createImageFileMediaSource(String.valueOf(i)); + kinesisVideoClient.registerMediaSource(mediaSources[i]); + mediaSources[i].start(); + + log.warn("Sleeping for {} ms", 10); + Thread.sleep(10); + } + + log.warn("Done stopping and starting streams"); + log.warn("Sleeping for 300 seconds to allow streams to stabilize"); + Thread.sleep(300000); + } + + // Stop the streams + for (int i = 0; i < mediaSources.length; i++) { + log.warn("unregistering stream {}", i); + kinesisVideoClient.unregisterMediaSource(mediaSources[i]); + } + + log.warn("freeing client"); + kinesisVideoClient.free(); + log.warn("done freeing client"); + + } catch (final KinesisVideoException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * Create a MediaSource based on local sample H.264 frames. + * + * @return a MediaSource backed by local H264 frame files + */ + private static MediaSource createImageFileMediaSource(String streamNameSuffix) { + final ImageFileMediaSourceConfiguration configuration = + new ImageFileMediaSourceConfiguration.Builder() + .fps(FPS_25) + .dir(IMAGE_DIR) + .filenameFormat(IMAGE_FILENAME_FORMAT) + .startFileIndex(START_FILE_INDEX) + .endFileIndex(END_FILE_INDEX) + //.contentType("video/hevc") // for h265 + .allowStreamCreation(false) + .build(); + final ImageFileMediaSource mediaSource = new ImageFileMediaSource(STREAM_NAME + "_" + streamNameSuffix); + mediaSource.configure(configuration); + + return mediaSource; + } +} diff --git a/src/main/java/com/amazonaws/kinesisvideo/java/mediasource/file/ImageFileMediaSource.java b/src/main/java/com/amazonaws/kinesisvideo/java/mediasource/file/ImageFileMediaSource.java index db2ce1aef..fe58cec42 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/java/mediasource/file/ImageFileMediaSource.java +++ b/src/main/java/com/amazonaws/kinesisvideo/java/mediasource/file/ImageFileMediaSource.java @@ -60,7 +60,7 @@ public class ImageFileMediaSource implements MediaSource { private ImageFileMediaSourceConfiguration imageFileMediaSourceConfiguration; private MediaSourceState mediaSourceState; private MediaSourceSink mediaSourceSink; - private ImageFrameSource imageFrameSource; + private PreloadedSampleImageFrameSource preloadedSampleImageFrameSource; public ImageFileMediaSource(@Nonnull final String streamName) { this(streamName, new CompletableFuture<>()); @@ -133,15 +133,15 @@ public void configure(final MediaSourceConfiguration configuration) { @Override public void start() throws KinesisVideoException { mediaSourceState = MediaSourceState.RUNNING; - imageFrameSource = new ImageFrameSource(imageFileMediaSourceConfiguration); - imageFrameSource.onStreamDataAvailable(new DefaultOnStreamDataAvailable(mediaSourceSink)); - imageFrameSource.start(); + preloadedSampleImageFrameSource = new PreloadedSampleImageFrameSource(imageFileMediaSourceConfiguration); + preloadedSampleImageFrameSource.onStreamDataAvailable(new DefaultOnStreamDataAvailable(mediaSourceSink)); + preloadedSampleImageFrameSource.start(); } @Override public void stop() throws KinesisVideoException { - if (imageFrameSource != null) { - imageFrameSource.stop(); + if (preloadedSampleImageFrameSource != null) { + preloadedSampleImageFrameSource.stop(); } try { diff --git a/src/main/java/com/amazonaws/kinesisvideo/java/mediasource/file/PreloadedSampleImageFrameSource.java b/src/main/java/com/amazonaws/kinesisvideo/java/mediasource/file/PreloadedSampleImageFrameSource.java new file mode 100644 index 000000000..2e16ebcc5 --- /dev/null +++ b/src/main/java/com/amazonaws/kinesisvideo/java/mediasource/file/PreloadedSampleImageFrameSource.java @@ -0,0 +1,191 @@ +package com.amazonaws.kinesisvideo.java.mediasource.file; + +import com.amazonaws.kinesisvideo.common.exception.KinesisVideoException; +import com.amazonaws.kinesisvideo.internal.mediasource.OnStreamDataAvailable; + +import com.amazonaws.kinesisvideo.producer.KinesisVideoFrame; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.annotation.concurrent.NotThreadSafe; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.amazonaws.kinesisvideo.producer.FrameFlags.FRAME_FLAG_KEY_FRAME; +import static com.amazonaws.kinesisvideo.producer.FrameFlags.FRAME_FLAG_NONE; +import static com.amazonaws.kinesisvideo.producer.Time.HUNDREDS_OF_NANOS_IN_A_MILLISECOND; +import static com.amazonaws.kinesisvideo.producer.Time.NANOS_IN_A_MILLISECOND; + +/** + * Frame source backed by local image files to be loaded into static memory. + */ +@NotThreadSafe +public class PreloadedSampleImageFrameSource { + public static final int METADATA_INTERVAL = 8; + private static final long FRAME_DURATION_20_MS = 20L; + private final ExecutorService executor = Executors.newFixedThreadPool(1); + private final int fps; + private final ImageFileMediaSourceConfiguration configuration; + + private OnStreamDataAvailable mkvDataAvailableCallback; + private final AtomicBoolean isRunning = new AtomicBoolean(false); + private int frameCounter; + private final Log log = LogFactory.getLog(PreloadedSampleImageFrameSource.class); + private final String metadataName = "ImageLoop"; + private int metadataCount = 0; + private long currentFrameTimestampMs; + private final long executorShutdownTimeoutSeconds = 5L; + + private static final int FPS_25 = 25; + private static final String IMAGE_DIR = "src/main/resources/data/h264/"; + private static final String IMAGE_FILENAME_FORMAT = "frame-%03d.h264"; + private static final int START_FILE_INDEX = 1; + private static final int END_FILE_INDEX = 375; + + private static final List preloadedFrames; + + // Preload image frames into memory. + static { + preloadedFrames = new ArrayList<>(); + try { + final ImageFileMediaSourceConfiguration defaultConfig = new ImageFileMediaSourceConfiguration.Builder() + .fps(FPS_25) + .dir(IMAGE_DIR) + .filenameFormat(IMAGE_FILENAME_FORMAT) + .startFileIndex(START_FILE_INDEX) + .endFileIndex(END_FILE_INDEX) + .allowStreamCreation(false) + .build(); + + for (int i = defaultConfig.getStartFileIndex(); i <= defaultConfig.getEndFileIndex(); i++) { + String filename = String.format(defaultConfig.getFilenameFormat(), i); + Path path = Paths.get(defaultConfig.getDir() + filename); + byte[] data = Files.readAllBytes(path); + preloadedFrames.add(ByteBuffer.wrap(data)); + } + } catch (IOException e) { + throw new RuntimeException("Failed to preload image frames", e); + } + } + + + public PreloadedSampleImageFrameSource(final ImageFileMediaSourceConfiguration configuration) { + this.configuration = configuration; + this.fps = configuration.getFps(); + this.currentFrameTimestampMs = configuration.getStartTimeMs(); + } + + public void start() { + if (isRunning.get()) { + throw new IllegalStateException("Frame source is already running"); + } + + isRunning.set(true); + startFrameGenerator(); + } + + public void stop() { + isRunning.set(false); + stopFrameGenerator(); + } + + public void onStreamDataAvailable(final OnStreamDataAvailable onMkvDataAvailable) { + this.mkvDataAvailableCallback = onMkvDataAvailable; + } + + private void startFrameGenerator() { + executor.execute(new Runnable() { + @Override + public void run() { + try { + generateFrameAndNotifyListener(); + } catch (final KinesisVideoException e) { + log.error("Failed to keep generating frames with Exception", e); + } + } + }); + } + + private void generateFrameAndNotifyListener() throws KinesisVideoException { + final double frameDurationMs = (double) Duration.ofSeconds(1L).toMillis() / fps; + long nextFrameTimeNs = System.nanoTime(); // to prevent time drift + + while (isRunning.get()) { + if (mkvDataAvailableCallback != null) { + mkvDataAvailableCallback.onFrameDataAvailable(createKinesisVideoFrameFromImage(frameCounter, currentFrameTimestampMs)); + if (isMetadataReady()) { + mkvDataAvailableCallback.onFragmentMetadataAvailable(metadataName + metadataCount, + Integer.toString(metadataCount++), false); + } + } + + frameCounter++; + currentFrameTimestampMs = configuration.getStartTimeMs() + Math.round(frameCounter * frameDurationMs); + nextFrameTimeNs += (long)(frameDurationMs * NANOS_IN_A_MILLISECOND); + + long sleepTimeMs = (nextFrameTimeNs - System.nanoTime()) / NANOS_IN_A_MILLISECOND; // Convert to Ms + if (sleepTimeMs > 0) { + try { + Thread.sleep(sleepTimeMs); + } catch (final InterruptedException e) { + log.error("Frame interval wait interrupted by Exception ", e); + } + } + } + } + + private boolean isMetadataReady() { + return frameCounter % METADATA_INTERVAL == 0; + } + + private KinesisVideoFrame createKinesisVideoFrameFromImage(final long index, final long timestampMs) { + final int flags = isKeyFrame() ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE; + int preloadIndex = (int) (index % preloadedFrames.size()); + ByteBuffer frameData = preloadedFrames.get(preloadIndex).duplicate(); // duplicate() used so each instance can track its own position. + + return new KinesisVideoFrame( + frameCounter, + flags, + timestampMs * HUNDREDS_OF_NANOS_IN_A_MILLISECOND, + timestampMs * HUNDREDS_OF_NANOS_IN_A_MILLISECOND, + FRAME_DURATION_20_MS * HUNDREDS_OF_NANOS_IN_A_MILLISECOND, + frameData); + } + + private boolean isKeyFrame() { + return frameCounter % configuration.getFps() == 0; + } + + + private void stopFrameGenerator() { + executor.shutdown(); + try { + if (!executor.awaitTermination(this.executorShutdownTimeoutSeconds, TimeUnit.SECONDS)) { + log.warn("Executor did not terminate in time. Forcing shutdown."); + final List droppedTasks = executor.shutdownNow(); + log.warn("Number of dropped tasks: " + droppedTasks.size()); + for (final Runnable task : droppedTasks) { + log.warn("Dropped task of type: " + task.getClass().getName()); + } + } + } catch (final InterruptedException e) { + log.error("Executor shutdown interrupted with Exception ", e); + final List droppedTasks = executor.shutdownNow(); + log.warn("Number of dropped tasks: " + droppedTasks.size()); + for (final Runnable task : droppedTasks) { + log.warn("Dropped task of type: " + task.getClass().getName()); + } + Thread.currentThread().interrupt(); + } + } +}