diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 481727b6..6521f980 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -1,21 +1,19 @@ package com.timgroup.statsd; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; public class StatsDNonBlockingProcessor extends StatsDProcessor { private final Queue messages; - private final AtomicInteger qsize; // qSize will not reflect actual size, but a close estimate. private class ProcessingTask extends StatsDProcessor.ProcessingTask { @Override protected Message getMessage() throws InterruptedException { final Message message = messages.poll(); if (message != null) { - qsize.decrementAndGet(); return message; } @@ -50,8 +48,11 @@ protected boolean haveMessages() { aggregatorFlushInterval, aggregatorShards, threadFactory); - this.qsize = new AtomicInteger(0); - this.messages = new ConcurrentLinkedQueue<>(); + if (qcapacity <= 8192) { + this.messages = new ArrayBlockingQueue<>(qcapacity); + } else { + this.messages = new LinkedBlockingQueue<>(qcapacity); + } } @Override @@ -62,11 +63,7 @@ protected ProcessingTask createProcessingTask() { @Override protected boolean send(final Message message) { if (!shutdown) { - if (qsize.get() < qcapacity) { - messages.offer(message); - qsize.incrementAndGet(); - return true; - } + return messages.offer(message); } return false;