diff --git a/commons/src/main/java/io/aiven/commons/collections/RingBuffer.java b/commons/src/main/java/io/aiven/commons/collections/RingBuffer.java new file mode 100644 index 000000000..87fb679b2 --- /dev/null +++ b/commons/src/main/java/io/aiven/commons/collections/RingBuffer.java @@ -0,0 +1,195 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.commons.collections; + +import java.util.Objects; + +import org.apache.commons.collections4.queue.CircularFifoQueue; +import org.apache.commons.collections4.queue.SynchronizedQueue; + +/** + * Implements a ring buffer of items. Items are inserted until maximum size is reached and then the earliest items are + * removed when newer items are added. + * + * @param + * the type of item in the queue. Must support equality check. + */ +public final class RingBuffer { + /** How to handle the duplicates in the buffer. */ + public enum DuplicateHandling { + /** Allow duplicates in the buffer. */ + ALLOW, + /** Reject (do not add) duplicates to the buffer. */ + REJECT, + /** Move the duplicate entry to the tail of the buffer. */ + DELETE + } + + /** The wrapped queue. */ + private final SynchronizedQueue queue; + + private final CircularFifoQueue wrappedQueue; + + /** Flag to indicate ring buffer should always be empty. */ + private final boolean alwaysEmpty; + + /** Flag to allow duplicates in the buffer. */ + private final DuplicateHandling duplicateHandling; + + /** + * Create a Ring Buffer of a maximum size that rejects duplicates. If the size is less than or equal to 0 then the + * buffer is always empty. + * + * @param size + * The maximum size of the ring buffer + * @see DuplicateHandling#REJECT + */ + public RingBuffer(final int size) { + this(size, DuplicateHandling.REJECT); + } + + /** + * Create a Ring Buffer of specified maximum size and potentially allowing duplicates. If the size is less than or + * equal to 0 then the buffer is always empty. + * + * @param size + * The maximum size of the ring buffer + * @param duplicateHandling + * defines how to handle duplicate values in the buffer. + */ + public RingBuffer(final int size, final DuplicateHandling duplicateHandling) { + wrappedQueue = new CircularFifoQueue<>(size > 0 ? size : 1); + queue = SynchronizedQueue.synchronizedQueue(wrappedQueue); + alwaysEmpty = size <= 0; + this.duplicateHandling = duplicateHandling; + } + + @Override + public String toString() { + return String.format("RingBuffer[%s, load %s/%s]", duplicateHandling, queue.size(), wrappedQueue.maxSize()); + } + + /** + * Adds a new item if it is not already present. + * + *
    + *
  • If the buffer is always empty the item is ignored and not enqueued. + *
  • If the buffer already contains the item it is ignored and not enqueued. + *
  • If the buffer is full the oldest entry in the buffer is ejected. + *
+ * + * @param item + * Item T which is to be added to the Queue + * @return The item that was ejected. May be {@code null}. + */ + public K add(final K item) { + Objects.requireNonNull(item, "item"); + if (!alwaysEmpty && checkDuplicates(item)) { + final K result = isFull() ? queue.poll() : null; + queue.add(item); + return result; + } + return null; + } + + /** + * Removes a single instance of the item from the buffer. + * + * @param item + * the item to remove. + */ + public void remove(final K item) { + queue.remove(item); + } + + /** + * Determines if the item is in the buffer. + * + * @param item + * the item to look for. + * @return {@code true} if the item is in the buffer, {@code false} othersie. + */ + public boolean contains(final K item) { + return queue.contains(item); + } + + /** + * Returns but does not remove the head of the buffer. + * + * @return the item at the head of the buffer. May be {@code null}. + */ + public K head() { + return queue.peek(); + } + + /** + * Returns but does not remove the teal of the buffer. + * + * @return the item at the tail of the buffer. May be {@code null}. + */ + public K tail() { + final int size = wrappedQueue.size(); + return size == 0 ? null : wrappedQueue.get(size - 1); + } + + private boolean checkDuplicates(final K item) { + switch (duplicateHandling) { + case ALLOW : + return true; + case REJECT : + return !queue.contains(item); + case DELETE : + queue.remove(item); + return true; + default : + throw new IllegalStateException("Unsupported duplicate handling: " + duplicateHandling); + } + } + + /** + * Returns {@code true} if the buffer is full. + * + * @return {@code true} if the buffer is full. + */ + public boolean isFull() { + return wrappedQueue.isAtFullCapacity(); + } + + /** + * Gets the next item to be ejected. If the buffer is full this will return the oldest value in the buffer. If the + * buffer is not full this method will return {@code null}. + * + * @return A value T from the last place in the buffer, returns null if buffer is not full. + */ + public K getNextEjected() { + return isFull() ? queue.peek() : null; + } + + @Override + public boolean equals(final Object object) { + if (object == this) { + return true; + } + return super.equals(object); + } + + @SuppressWarnings("PMD.UselessOverridingMethod") + @Override + public int hashCode() { + return super.hashCode(); + } +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java index 3ea05fdae..f0b6dd6ba 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java @@ -26,6 +26,7 @@ import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.commons.collections.RingBuffer; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; @@ -186,12 +187,12 @@ public AbstractSourceRecordIterator(final SourceCommonConfig sourceConfig, final final public boolean hasNext() { if (!outer.hasNext() && lastSeenNativeKey != null) { // update the buffer to contain this new objectKey - ringBuffer.enqueue(lastSeenNativeKey); + ringBuffer.add(lastSeenNativeKey); // Remove the last seen from the offsetmanager as the file has been completely processed. offsetManager.removeEntry(getOffsetManagerKey(lastSeenNativeKey)); } if (!inner.hasNext() && !outer.hasNext()) { - inner = getNativeItemStream(ringBuffer.getOldest()).map(fileMatching) + inner = getNativeItemStream(ringBuffer.getNextEjected()).map(fileMatching) .filter(taskAssignment) .filter(Optional::isPresent) .map(Optional::get) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/RingBuffer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/RingBuffer.java deleted file mode 100644 index c909487f0..000000000 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/RingBuffer.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright 2025 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.common.source; - -import org.apache.commons.collections4.queue.CircularFifoQueue; -import org.apache.commons.collections4.queue.SynchronizedQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Implements a ring buffer of items. - * - * @param - * the type of item in the queue. Must support equality check. - */ -public final class RingBuffer extends SynchronizedQueue { - private static final Logger LOGGER = LoggerFactory.getLogger(RingBuffer.class); - - /** - * Flag to indicate ring buffer should always be empty. - */ - private final boolean isEmpty; - - /** - * Create a Ring Buffer of a maximum Size. If the size is less than or equal to 0 then the buffer is always empty. - * - * @param size - * The size that the linked list should be. - */ - public RingBuffer(final int size) { - // TODO explore if this should be backed by some sort of LRU cache. - super(new CircularFifoQueue<>(size > 0 ? size : 1)); - isEmpty = size <= 0; - } - - /** - * Adds a new item if it is not already present. - *
    - *
  • If the buffer is always empty the item is ignored and not enqueued.
  • - *
  • If the buffer already contains the item it is ignored and not enqueued.
  • - *
  • If the buffer is full the oldest entry in the list is removed.
  • - *
- * - * @param item - * Item T which is to be added to the Queue - */ - public void enqueue(final K item) { - if (!isEmpty && item != null && !contains(item)) { - if (isFull()) { - LOGGER.debug("Ring buffer is full"); - poll(); - } - add(item); - LOGGER.debug("Ring buffer added item {} record count {}", item, size()); - } - } - - /** - * Returns {@code true} if the buffer is full. - * - * @return {@code true} if the buffer is full. - */ - public boolean isFull() { - return ((CircularFifoQueue) decorated()).isAtFullCapacity(); - } - /** - * Get the last value in the Ring buffer - * - * @return A value T from the last place in the list, returns null if list is not full. - */ - public K getOldest() { - final K oldest = isFull() ? peek() : null; - LOGGER.debug("Ring buffer getOldest {}", oldest); - return oldest; - } - - @Override - public boolean equals(final Object object) { - if (object == this) { - return true; - } - return super.equals(object); - } - - @SuppressWarnings("PMD.UselessOverridingMethod") - @Override - public int hashCode() { - return super.hashCode(); - } -} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/RingBufferTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/RingBufferTest.java index 4630c66ca..f53d19a7e 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/RingBufferTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/RingBufferTest.java @@ -18,6 +18,8 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.aiven.commons.collections.RingBuffer; + import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -32,12 +34,12 @@ void testRingBufferReturnsOldestEntryAndRemovesOldestEntry(final int size) { final RingBuffer buffer = new RingBuffer<>(size); for (int i = 0; i < size; i++) { - buffer.enqueue(OBJECT_KEY + i); + buffer.add(OBJECT_KEY + i); } - assertThat(buffer.getOldest()).isEqualTo("S3ObjectKey" + 0); + assertThat(buffer.getNextEjected()).isEqualTo("S3ObjectKey" + 0); // Add one more unique ObjectKey - buffer.enqueue(OBJECT_KEY); - assertThat(buffer.getOldest()).isEqualTo("S3ObjectKey" + 1); + buffer.add(OBJECT_KEY); + assertThat(buffer.getNextEjected()).isEqualTo("S3ObjectKey" + 1); } @ParameterizedTest @@ -47,11 +49,11 @@ void testRingBufferOnlyAddsEachItemOnce(final int size) { final RingBuffer buffer = new RingBuffer<>(size); for (int i = 0; i < size; i++) { // add the same objectKey every time, it should onl have one entry. - buffer.enqueue(OBJECT_KEY); + buffer.add(OBJECT_KEY); } // Buffer not filled so should return null - assertThat(buffer.getOldest()).isEqualTo(null); - assertThat(buffer.peek()).isEqualTo(OBJECT_KEY); + assertThat(buffer.getNextEjected()).isEqualTo(null); + assertThat(buffer.head()).isEqualTo(OBJECT_KEY); assertThat(buffer.contains(OBJECT_KEY)).isTrue(); } @@ -59,9 +61,9 @@ void testRingBufferOnlyAddsEachItemOnce(final int size) { void testRingBufferOfSizeOneOnlyRetainsOneEntry() { final RingBuffer buffer = new RingBuffer<>(1); - buffer.enqueue(OBJECT_KEY + 0); - assertThat(buffer.getOldest()).isEqualTo(OBJECT_KEY + 0); - buffer.enqueue(OBJECT_KEY + 1); - assertThat(buffer.getOldest()).isEqualTo(OBJECT_KEY + 1); + buffer.add(OBJECT_KEY + 0); + assertThat(buffer.getNextEjected()).isEqualTo(OBJECT_KEY + 0); + buffer.add(OBJECT_KEY + 1); + assertThat(buffer.getNextEjected()).isEqualTo(OBJECT_KEY + 1); } }