Skip to content

Commit 56a3b7e

Browse files
committed
[shards] shard locks+queues.
1 parent 28055c0 commit 56a3b7e

File tree

6 files changed

+155
-30
lines changed

6 files changed

+155
-30
lines changed

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ String tag() {
8686
public static final int DEFAULT_PROCESSOR_WORKERS = 1;
8787
public static final int DEFAULT_SENDER_WORKERS = 1;
8888
public static final int DEFAULT_DOGSTATSD_PORT = 8125;
89+
public static final int DEFAULT_LOCK_SHARD_GRAIN = 4;
8990
public static final int SOCKET_TIMEOUT_MS = 100;
9091
public static final int SOCKET_BUFFER_BYTES = -1;
9192
public static final boolean DEFAULT_ENABLE_TELEMETRY = true;
@@ -202,6 +203,9 @@ private static String format(ThreadLocal<NumberFormat> formatter, double value)
202203
* The number of processor worker threads assembling buffers for submission.
203204
* @param senderWorkers
204205
* The number of sender worker threads submitting buffers to the socket.
206+
* @param lockShardGrain
207+
* The granularity for the lock sharding - sharding is based of thread id
208+
* so value should not be greater than the application thread count..
205209
* @param blocking
206210
* Blocking or non-blocking implementation for statsd message queue.
207211
* @param enableTelemetry
@@ -215,8 +219,8 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[
215219
final StatsDClientErrorHandler errorHandler, Callable<SocketAddress> addressLookup,
216220
final int timeout, final int bufferSize, final int maxPacketSizeBytes,
217221
String entityID, final int poolSize, final int processorWorkers,
218-
final int senderWorkers, boolean blocking, final boolean enableTelemetry,
219-
final int telemetryFlushInterval)
222+
final int senderWorkers, final int lockShardGrain, boolean blocking,
223+
final boolean enableTelemetry, final int telemetryFlushInterval)
220224
throws StatsDClientException {
221225
if ((prefix != null) && (!prefix.isEmpty())) {
222226
this.prefix = prefix + ".";
@@ -272,7 +276,8 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[
272276
transportType = "udp";
273277
}
274278

275-
statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize, processorWorkers, blocking);
279+
statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize,
280+
processorWorkers, lockShardGrain, blocking);
276281

277282
Properties properties = new Properties();
278283
properties.load(getClass().getClassLoader().getResourceAsStream("version.properties"));
@@ -299,11 +304,14 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[
299304
}
300305

301306
protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler,
302-
final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, boolean blocking) throws Exception {
307+
final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final int lockShardGrain,
308+
boolean blocking) throws Exception {
303309
if (blocking) {
304-
return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, workers);
310+
return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes,
311+
bufferPoolSize, workers, lockShardGrain);
305312
} else {
306-
return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, workers);
313+
return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes,
314+
bufferPoolSize, workers, lockShardGrain);
307315
}
308316
}
309317

src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class NonBlockingStatsDClientBuilder {
2424
public int maxPacketSizeBytes = NonBlockingStatsDClient.DEFAULT_MAX_PACKET_SIZE_BYTES;
2525
public int processorWorkers = NonBlockingStatsDClient.DEFAULT_PROCESSOR_WORKERS;
2626
public int senderWorkers = NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS;
27+
public int lockShardGrain = NonBlockingStatsDClient.DEFAULT_LOCK_SHARD_GRAIN;
2728
public boolean enableTelemetry = NonBlockingStatsDClient.DEFAULT_ENABLE_TELEMETRY;
2829
public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL;
2930
public boolean blocking;
@@ -79,6 +80,11 @@ public NonBlockingStatsDClientBuilder senderWorkers(int val) {
7980
return this;
8081
}
8182

83+
public NonBlockingStatsDClientBuilder lockShardGrain(int val) {
84+
lockShardGrain = val;
85+
return this;
86+
}
87+
8288
public NonBlockingStatsDClientBuilder blocking(boolean val) {
8389
blocking = val;
8490
return this;
@@ -132,8 +138,8 @@ public NonBlockingStatsDClient build() throws StatsDClientException {
132138
if (addressLookup != null) {
133139
return new NonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler,
134140
addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID,
135-
bufferPoolSize, processorWorkers, senderWorkers, blocking, enableTelemetry,
136-
telemetryFlushInterval);
141+
bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking,
142+
enableTelemetry, telemetryFlushInterval);
137143
} else {
138144
return new NonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler,
139145
staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes,

src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212
public class StatsDBlockingProcessor extends StatsDProcessor {
1313

14-
private final BlockingQueue<Message> messages;
14+
private final BlockingQueue<Message>[] messages;
15+
private final BlockingQueue<Integer>[] processorWorkQueue;
1516

1617
private class ProcessingTask extends StatsDProcessor.ProcessingTask {
1718

@@ -27,11 +28,12 @@ public void run() {
2728
return;
2829
}
2930

30-
while (!(shutdown && messages.isEmpty())) {
31+
while (!(shutdown && processorWorkQueue[this.processorQueueId].isEmpty())) {
3132

3233
try {
3334

34-
final Message message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS);
35+
final int messageQueueIdx = processorWorkQueue[this.processorQueueId].poll();
36+
final Message message = messages[messageQueueIdx].poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS);
3537
if (message != null) {
3638

3739
builder.setLength(0);
@@ -84,11 +86,22 @@ public void run() {
8486
}
8587

8688
StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
87-
final int maxPacketSizeBytes, final int poolSize, final int workers)
88-
throws Exception {
89+
final int maxPacketSizeBytes, final int poolSize, final int workers,
90+
final int lockShardGrain) throws Exception {
91+
92+
super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain);
93+
94+
this.messages = new ArrayBlockingQueue[lockShardGrain];
95+
for (int i = 0 ; i < lockShardGrain ; i++) {
96+
this.messages[i] = new ArrayBlockingQueue<Message>(queueSize);
97+
}
98+
99+
this.processorWorkQueue = new ArrayBlockingQueue[workers];
100+
for (int i = 0 ; i < workers ; i++) {
101+
this.processorWorkQueue[i] = new ArrayBlockingQueue<Integer>(queueSize);
102+
}
89103

90104
super(queueSize, handler, maxPacketSizeBytes, poolSize, workers);
91-
this.messages = new ArrayBlockingQueue<>(queueSize);
92105
}
93106

94107
@Override
@@ -99,8 +112,15 @@ protected ProcessingTask createProcessingTask() {
99112
@Override
100113
protected boolean send(final Message message) {
101114
try {
115+
long threadId = Thread.currentThread().getId();
116+
// modulo reduction alternative to: long shard = threadID % this.lockShardGrain;
117+
// ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
118+
int shard = (int)((threadId * (long)this.lockShardGrain) >> 32);
119+
int processQueue = (int)((threadId * (long)this.workers) >> 32);
120+
102121
if (!shutdown) {
103-
messages.put(message);
122+
messages[shard].put(message);
123+
processorWorkQueue[processQueue].put(shard);
104124
return true;
105125
}
106126
} catch (InterruptedException e) {

src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java

Lines changed: 93 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,75 @@
1111

1212
public class StatsDNonBlockingProcessor extends StatsDProcessor {
1313

14-
private final Queue<Message> messages;
14+
private final Queue<Message>[] messages;
15+
private final Queue<Integer>[] processorWorkQueue;
16+
1517
private final int qcapacity;
16-
private final AtomicInteger qsize; // qSize will not reflect actual size, but a close estimate.
18+
private final AtomicInteger[] qsize; // qSize will not reflect actual size, but a close estimate.
19+
20+
private class ProcessingTask implements Runnable {
21+
private final int processorQueueId;
22+
23+
public ProcessingTask(int id) {
24+
this.processorQueueId = id;
25+
}
26+
27+
public void run() {
28+
boolean empty;
29+
ByteBuffer sendBuffer;
30+
31+
try {
32+
sendBuffer = bufferPool.borrow();
33+
} catch (final InterruptedException e) {
34+
handler.handle(e);
35+
return;
36+
}
37+
38+
while (!((empty = processorWorkQueue[this.processorQueueId].isEmpty()) && shutdown)) {
39+
40+
try {
41+
if (empty) {
42+
Thread.sleep(WAIT_SLEEP_MS);
43+
continue;
44+
}
45+
46+
if (Thread.interrupted()) {
47+
return;
48+
}
49+
50+
final int messageQueueIdx = processorWorkQueue[this.processorQueueId].poll();
51+
final String message = messages[messageQueueIdx].poll();
52+
if (message != null) {
53+
qsize[messageQueueIdx].decrementAndGet();
54+
final byte[] data = message.getBytes(MESSAGE_CHARSET);
55+
if (sendBuffer.capacity() < data.length) {
56+
throw new InvalidMessageException(MESSAGE_TOO_LONG, message);
57+
}
58+
if (sendBuffer.remaining() < (data.length + 1)) {
59+
outboundQueue.put(sendBuffer);
60+
sendBuffer = bufferPool.borrow();
61+
}
62+
if (sendBuffer.position() > 0) {
63+
sendBuffer.put((byte) '\n');
64+
}
65+
sendBuffer.put(data);
66+
if (null == processorWorkQueue[this.processorQueueId].peek()) {
67+
outboundQueue.put(sendBuffer);
68+
sendBuffer = bufferPool.borrow();
69+
}
70+
}
71+
} catch (final InterruptedException e) {
72+
if (shutdown) {
73+
endSignal.countDown();
74+
return;
75+
}
76+
} catch (final Exception e) {
77+
handler.handle(e);
78+
}
79+
}
80+
endSignal.countDown();
81+
}
82+
}
1783

1884
private class ProcessingTask extends StatsDProcessor.ProcessingTask {
1985

@@ -93,13 +159,23 @@ public void run() {
93159
}
94160

95161
StatsDNonBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
96-
final int maxPacketSizeBytes, final int poolSize, final int workers)
97-
throws Exception {
162+
final int maxPacketSizeBytes, final int poolSize, final int workers,
163+
final int lockShardGrain) throws Exception {
98164

99-
super(queueSize, handler, maxPacketSizeBytes, poolSize, workers);
100-
this.qsize = new AtomicInteger(0);
165+
super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain);
166+
this.qsize = new AtomicInteger[lockShardGrain];
101167
this.qcapacity = queueSize;
102-
this.messages = new ConcurrentLinkedQueue<>();
168+
this.messages = new ConcurrentLinkedQueue[lockShardGrain];
169+
for (int i = 0 ; i < lockShardGrain ; i++) {
170+
this.qsize[i] = new AtomicInteger();
171+
this.messages[i] = new ConcurrentLinkedQueue<Message>();
172+
this.qsize[i].set(0);
173+
}
174+
175+
this.processorWorkQueue = new ConcurrentLinkedQueue[workers];
176+
for (int i = 0 ; i < workers ; i++) {
177+
this.processorWorkQueue[i] = new ConcurrentLinkedQueue<Integer>();
178+
}
103179
}
104180

105181
@Override
@@ -110,9 +186,16 @@ protected ProcessingTask createProcessingTask() {
110186
@Override
111187
protected boolean send(final Message message) {
112188
if (!shutdown) {
113-
if (qsize.get() < qcapacity) {
114-
messages.offer(message);
115-
qsize.incrementAndGet();
189+
long threadId = Thread.currentThread().getId();
190+
// modulo reduction alternative to: long shard = threadID % [shard]this.lockShardGrain;
191+
// ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
192+
int shard = (int)((threadId * (long)this.lockShardGrain) >> 32);
193+
int processQueue = (int)((threadId * (long)this.workers) >> 32);
194+
195+
if (qsize[shard].get() < qcapacity) {
196+
messages[shard].offer(message);
197+
qsize[shard].incrementAndGet();
198+
processorWorkQueue[processQueue].offer(shard);
116199
return true;
117200
}
118201
}

src/main/java/com/timgroup/statsd/StatsDProcessor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public abstract class StatsDProcessor implements Runnable {
3535
protected final CountDownLatch endSignal;
3636

3737
protected final int workers;
38+
protected final int lockShardGrain;
3839

3940
protected volatile boolean shutdown;
4041

@@ -44,6 +45,11 @@ protected abstract class ProcessingTask implements Runnable {
4445
protected final CharsetEncoder utf8Encoder = MESSAGE_CHARSET.newEncoder()
4546
.onMalformedInput(CodingErrorAction.REPLACE)
4647
.onUnmappableCharacter(CodingErrorAction.REPLACE);
48+
protected final int processorQueueId;
49+
50+
public ProcessingTask(int id) {
51+
this.processorQueueId = id;
52+
}
4753

4854
public abstract void run();
4955

@@ -65,11 +71,13 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) {
6571

6672

6773
StatsDProcessor(final int queueSize, final StatsDClientErrorHandler handler,
68-
final int maxPacketSizeBytes, final int poolSize, final int workers)
74+
final int maxPacketSizeBytes, final int poolSize, final int workers,
75+
final int lockShardGrain)
6976
throws Exception {
7077

7178
this.handler = handler;
7279
this.workers = workers;
80+
this.lockShardGrain = lockShardGrain;
7381

7482
this.executor = Executors.newFixedThreadPool(workers);
7583
this.bufferPool = new BufferPool(poolSize, maxPacketSizeBytes, true);

src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -812,10 +812,10 @@ private static class SlowStatsDNonBlockingStatsDClient extends NonBlockingStatsD
812812
SlowStatsDNonBlockingStatsDClient(final String prefix, final int queueSize, String[] constantTags, final StatsDClientErrorHandler errorHandler,
813813
Callable<SocketAddress> addressLookup, final int timeout, final int bufferSize, final int maxPacketSizeBytes,
814814
String entityID, final int poolSize, final int processorWorkers, final int senderWorkers,
815-
boolean blocking)
815+
final int lockShardGrain, boolean blocking)
816816
throws StatsDClientException {
817817
super(prefix, queueSize, constantTags, errorHandler, addressLookup, timeout, bufferSize, maxPacketSizeBytes,
818-
entityID, poolSize, processorWorkers, senderWorkers, blocking, false, 0);
818+
entityID, poolSize, processorWorkers, senderWorkers, lockShardGrain, blocking, false, 0);
819819
lock = new CountDownLatch(1);
820820
}
821821

@@ -838,11 +838,11 @@ public SlowStatsDNonBlockingStatsDClient build() throws StatsDClientException {
838838
if (addressLookup != null) {
839839
return new SlowStatsDNonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler,
840840
addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID, bufferPoolSize,
841-
processorWorkers, senderWorkers, blocking);
841+
processorWorkers, senderWorkers, lockShardGrain, blocking);
842842
} else {
843843
return new SlowStatsDNonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler,
844844
staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes,
845-
entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking);
845+
entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking);
846846
}
847847
}
848848
}

0 commit comments

Comments
 (0)