Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions commons/src/main/java/io/aiven/commons/collections/RingBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.commons.collections;

import java.util.Objects;

import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.collections4.queue.SynchronizedQueue;

/**
* Implements a ring buffer of items. Items are inserted until maximum size is reached and then the earliest items are
* removed when newer items are added.
*
* @param <K>
* the type of item in the queue. Must support equality check.
*/
public final class RingBuffer<K> {
/** How to handle the duplicates in the buffer. */
public enum DuplicateHandling {
/** Allow duplicates in the buffer. */
ALLOW,
/** Reject (do not add) duplicates to the buffer. */
REJECT,
/** Move the duplicate entry to the tail of the buffer. */
DELETE
}

/** The wrapped queue. */
private final SynchronizedQueue<K> queue;

private final CircularFifoQueue<K> wrappedQueue;

/** Flag to indicate ring buffer should always be empty. */
private final boolean alwaysEmpty;

/** Flag to allow duplicates in the buffer. */
private final DuplicateHandling duplicateHandling;

/**
* Create a Ring Buffer of a maximum size that rejects duplicates. If the size is less than or equal to 0 then the
* buffer is always empty.
*
* @param size
* The maximum size of the ring buffer
* @see DuplicateHandling#REJECT
*/
public RingBuffer(final int size) {
this(size, DuplicateHandling.REJECT);
}

/**
* Create a Ring Buffer of specified maximum size and potentially allowing duplicates. If the size is less than or
* equal to 0 then the buffer is always empty.
*
* @param size
* The maximum size of the ring buffer
* @param duplicateHandling
* defines how to handle duplicate values in the buffer.
*/
public RingBuffer(final int size, final DuplicateHandling duplicateHandling) {
wrappedQueue = new CircularFifoQueue<>(size > 0 ? size : 1);
queue = SynchronizedQueue.synchronizedQueue(wrappedQueue);
alwaysEmpty = size <= 0;
this.duplicateHandling = duplicateHandling;
}

@Override
public String toString() {
return String.format("RingBuffer[%s, load %s/%s]", duplicateHandling, queue.size(), wrappedQueue.maxSize());
}

/**
* Adds a new item if it is not already present.
*
* <ul>
* <li>If the buffer is always empty the item is ignored and not enqueued.
* <li>If the buffer already contains the item it is ignored and not enqueued.
* <li>If the buffer is full the oldest entry in the buffer is ejected.
* </ul>
*
* @param item
* Item T which is to be added to the Queue
* @return The item that was ejected. May be {@code null}.
*/
public K add(final K item) {
Objects.requireNonNull(item, "item");
if (!alwaysEmpty && checkDuplicates(item)) {
final K result = isFull() ? queue.poll() : null;
queue.add(item);
return result;
}
return null;
}

/**
* Removes a single instance of the item from the buffer.
*
* @param item
* the item to remove.
*/
public void remove(final K item) {
queue.remove(item);
}

/**
* Determines if the item is in the buffer.
*
* @param item
* the item to look for.
* @return {@code true} if the item is in the buffer, {@code false} othersie.
*/
public boolean contains(final K item) {
return queue.contains(item);
}

/**
* Returns but does not remove the head of the buffer.
*
* @return the item at the head of the buffer. May be {@code null}.
*/
public K head() {
return queue.peek();
}

/**
* Returns but does not remove the teal of the buffer.
*
* @return the item at the tail of the buffer. May be {@code null}.
*/
public K tail() {
final int size = wrappedQueue.size();
return size == 0 ? null : wrappedQueue.get(size - 1);
}

private boolean checkDuplicates(final K item) {
switch (duplicateHandling) {
case ALLOW :
return true;
case REJECT :
return !queue.contains(item);
case DELETE :
queue.remove(item);
return true;
default :
throw new IllegalStateException("Unsupported duplicate handling: " + duplicateHandling);
}
}

/**
* Returns {@code true} if the buffer is full.
*
* @return {@code true} if the buffer is full.
*/
public boolean isFull() {
return wrappedQueue.isAtFullCapacity();
}

/**
* Gets the next item to be ejected. If the buffer is full this will return the oldest value in the buffer. If the
* buffer is not full this method will return {@code null}.
*
* @return A value T from the last place in the buffer, returns null if buffer is not full.
*/
public K getNextEjected() {
return isFull() ? queue.peek() : null;
}

@Override
public boolean equals(final Object object) {
if (object == this) {
return true;
}
return super.equals(object);
}

@SuppressWarnings("PMD.UselessOverridingMethod")
@Override
public int hashCode() {
return super.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.commons.collections.RingBuffer;
import io.aiven.kafka.connect.common.config.SourceCommonConfig;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils;
Expand Down Expand Up @@ -186,12 +187,12 @@ public AbstractSourceRecordIterator(final SourceCommonConfig sourceConfig, final
final public boolean hasNext() {
if (!outer.hasNext() && lastSeenNativeKey != null) {
// update the buffer to contain this new objectKey
ringBuffer.enqueue(lastSeenNativeKey);
ringBuffer.add(lastSeenNativeKey);
// Remove the last seen from the offsetmanager as the file has been completely processed.
offsetManager.removeEntry(getOffsetManagerKey(lastSeenNativeKey));
}
if (!inner.hasNext() && !outer.hasNext()) {
inner = getNativeItemStream(ringBuffer.getOldest()).map(fileMatching)
inner = getNativeItemStream(ringBuffer.getNextEjected()).map(fileMatching)
.filter(taskAssignment)
.filter(Optional::isPresent)
.map(Optional::get)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.aiven.commons.collections.RingBuffer;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
Expand All @@ -32,12 +34,12 @@ void testRingBufferReturnsOldestEntryAndRemovesOldestEntry(final int size) {

final RingBuffer<String> buffer = new RingBuffer<>(size);
for (int i = 0; i < size; i++) {
buffer.enqueue(OBJECT_KEY + i);
buffer.add(OBJECT_KEY + i);
}
assertThat(buffer.getOldest()).isEqualTo("S3ObjectKey" + 0);
assertThat(buffer.getNextEjected()).isEqualTo("S3ObjectKey" + 0);
// Add one more unique ObjectKey
buffer.enqueue(OBJECT_KEY);
assertThat(buffer.getOldest()).isEqualTo("S3ObjectKey" + 1);
buffer.add(OBJECT_KEY);
assertThat(buffer.getNextEjected()).isEqualTo("S3ObjectKey" + 1);
}

@ParameterizedTest
Expand All @@ -47,21 +49,21 @@ void testRingBufferOnlyAddsEachItemOnce(final int size) {
final RingBuffer<String> buffer = new RingBuffer<>(size);
for (int i = 0; i < size; i++) {
// add the same objectKey every time, it should onl have one entry.
buffer.enqueue(OBJECT_KEY);
buffer.add(OBJECT_KEY);
}
// Buffer not filled so should return null
assertThat(buffer.getOldest()).isEqualTo(null);
assertThat(buffer.peek()).isEqualTo(OBJECT_KEY);
assertThat(buffer.getNextEjected()).isEqualTo(null);
assertThat(buffer.head()).isEqualTo(OBJECT_KEY);
assertThat(buffer.contains(OBJECT_KEY)).isTrue();
}

@Test
void testRingBufferOfSizeOneOnlyRetainsOneEntry() {

final RingBuffer<String> buffer = new RingBuffer<>(1);
buffer.enqueue(OBJECT_KEY + 0);
assertThat(buffer.getOldest()).isEqualTo(OBJECT_KEY + 0);
buffer.enqueue(OBJECT_KEY + 1);
assertThat(buffer.getOldest()).isEqualTo(OBJECT_KEY + 1);
buffer.add(OBJECT_KEY + 0);
assertThat(buffer.getNextEjected()).isEqualTo(OBJECT_KEY + 0);
buffer.add(OBJECT_KEY + 1);
assertThat(buffer.getNextEjected()).isEqualTo(OBJECT_KEY + 1);
}
}
Loading