Skip to content

Commit 6789947

Browse files
committed
[shards] shard locks+queues.
1 parent ad47973 commit 6789947

File tree

6 files changed

+210
-134
lines changed

6 files changed

+210
-134
lines changed

src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class NonBlockingStatsDClient implements StatsDClient {
6262
public static final int DEFAULT_PROCESSOR_WORKERS = 1;
6363
public static final int DEFAULT_SENDER_WORKERS = 1;
6464
public static final int DEFAULT_DOGSTATSD_PORT = 8125;
65+
public static final int DEFAULT_LOCK_SHARD_GRAIN = 4;
6566
public static final int SOCKET_TIMEOUT_MS = 100;
6667
public static final int SOCKET_BUFFER_BYTES = -1;
6768

@@ -171,6 +172,9 @@ protected NumberFormat initialValue() {
171172
* The number of processor worker threads assembling buffers for submission.
172173
* @param senderWorkers
173174
* The number of sender worker threads submitting buffers to the socket.
175+
* @param lockShardGrain
176+
* The granularity for the lock sharding - sharding is based of thread id
177+
* so value should not be greater than the application thread count..
174178
* @param blocking
175179
* Blocking or non-blocking implementation for statsd message queue.
176180
* @throws StatsDClientException
@@ -180,7 +184,7 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[
180184
final StatsDClientErrorHandler errorHandler, Callable<SocketAddress> addressLookup,
181185
final int timeout, final int bufferSize, final int maxPacketSizeBytes,
182186
String entityID, final int poolSize, final int processorWorkers,
183-
final int senderWorkers, boolean blocking)
187+
final int senderWorkers, final int lockShardGrain, boolean blocking)
184188
throws StatsDClientException {
185189
if ((prefix != null) && (!prefix.isEmpty())) {
186190
this.prefix = new StringBuilder(prefix).append(".").toString();
@@ -222,7 +226,8 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[
222226
clientChannel = DatagramChannel.open();
223227
}
224228

225-
statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize, processorWorkers, blocking);
229+
statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize,
230+
processorWorkers, lockShardGrain, blocking);
226231
statsDSender = createSender(addressLookup, handler, clientChannel,
227232
statsDProcessor.getBufferPool(), statsDProcessor.getOutboundQueue(), senderWorkers);
228233

@@ -235,11 +240,14 @@ public NonBlockingStatsDClient(final String prefix, final int queueSize, String[
235240
}
236241

237242
protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler,
238-
final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, boolean blocking) throws Exception {
243+
final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final int lockShardGrain,
244+
boolean blocking) throws Exception {
239245
if (blocking) {
240-
return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, workers);
246+
return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes,
247+
bufferPoolSize, workers, lockShardGrain);
241248
} else {
242-
return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, workers);
249+
return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes,
250+
bufferPoolSize, workers, lockShardGrain);
243251
}
244252
}
245253

src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class NonBlockingStatsDClientBuilder {
2424
public int maxPacketSizeBytes = NonBlockingStatsDClient.DEFAULT_MAX_PACKET_SIZE_BYTES;
2525
public int processorWorkers = NonBlockingStatsDClient.DEFAULT_PROCESSOR_WORKERS;
2626
public int senderWorkers = NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS;
27+
public int lockShardGrain = NonBlockingStatsDClient.DEFAULT_LOCK_SHARD_GRAIN;
2728
public boolean blocking;
2829

2930
public Callable<SocketAddress> addressLookup;
@@ -77,6 +78,11 @@ public NonBlockingStatsDClientBuilder senderWorkers(int val) {
7778
return this;
7879
}
7980

81+
public NonBlockingStatsDClientBuilder lockShardGrain(int val) {
82+
lockShardGrain = val;
83+
return this;
84+
}
85+
8086
public NonBlockingStatsDClientBuilder blocking(boolean val) {
8187
blocking = val;
8288
return this;
@@ -120,11 +126,12 @@ public NonBlockingStatsDClient build() throws StatsDClientException {
120126
if (addressLookup != null) {
121127
return new NonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler,
122128
addressLookup, timeout, socketBufferSize, maxPacketSizeBytes, entityID,
123-
bufferPoolSize, processorWorkers, senderWorkers, blocking);
129+
bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain, blocking);
124130
} else {
125131
return new NonBlockingStatsDClient(prefix, queueSize, constantTags, errorHandler,
126132
staticStatsDAddressResolution(hostname, port), timeout, socketBufferSize, maxPacketSizeBytes,
127-
entityID, bufferPoolSize, processorWorkers, senderWorkers, blocking);
133+
entityID, bufferPoolSize, processorWorkers, senderWorkers, lockShardGrain,
134+
blocking);
128135
}
129136
}
130137

src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java

Lines changed: 85 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.timgroup.statsd;
22

3+
import java.lang.Thread;
4+
35
import java.nio.ByteBuffer;
46

57
import java.util.concurrent.ArrayBlockingQueue;
@@ -8,21 +10,97 @@
810

911
public class StatsDBlockingProcessor extends StatsDProcessor {
1012

11-
private final BlockingQueue<String> messages;
13+
private final BlockingQueue<String>[] messages;
14+
private final BlockingQueue<Integer>[] processorWorkQueue;
15+
16+
private class ProcessingTask implements Runnable {
17+
private final int processorQueueId;
18+
19+
public ProcessingTask(int id) {
20+
this.processorQueueId = id;
21+
}
22+
23+
public void run() {
24+
boolean empty;
25+
ByteBuffer sendBuffer;
26+
27+
try {
28+
sendBuffer = bufferPool.borrow();
29+
} catch (final InterruptedException e) {
30+
handler.handle(e);
31+
return;
32+
}
33+
34+
while (!(processorWorkQueue[this.processorQueueId].isEmpty() && shutdown)) {
35+
36+
try {
37+
38+
if (Thread.interrupted()) {
39+
return;
40+
}
41+
42+
final int messageQueueIdx = processorWorkQueue[this.processorQueueId].poll();
43+
final String message = messages[messageQueueIdx].poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS);
44+
if (message != null) {
45+
final byte[] data = message.getBytes(MESSAGE_CHARSET);
46+
if (sendBuffer.capacity() < data.length) {
47+
throw new InvalidMessageException(MESSAGE_TOO_LONG, message);
48+
}
49+
if (sendBuffer.remaining() < (data.length + 1)) {
50+
outboundQueue.put(sendBuffer);
51+
sendBuffer = bufferPool.borrow();
52+
}
53+
if (sendBuffer.position() > 0) {
54+
sendBuffer.put((byte) '\n');
55+
}
56+
sendBuffer.put(data);
57+
if (null == processorWorkQueue[this.processorQueueId].peek()) {
58+
outboundQueue.put(sendBuffer);
59+
sendBuffer = bufferPool.borrow();
60+
}
61+
}
62+
} catch (final InterruptedException e) {
63+
if (shutdown) {
64+
endSignal.countDown();
65+
return;
66+
}
67+
} catch (final Exception e) {
68+
handler.handle(e);
69+
}
70+
}
71+
endSignal.countDown();
72+
}
73+
}
1274

1375
StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
14-
final int maxPacketSizeBytes, final int poolSize, final int workers)
15-
throws Exception {
76+
final int maxPacketSizeBytes, final int poolSize, final int workers,
77+
final int lockShardGrain) throws Exception {
78+
79+
super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, lockShardGrain);
80+
81+
this.messages = new ArrayBlockingQueue[lockShardGrain];
82+
for (int i = 0 ; i < lockShardGrain ; i++) {
83+
this.messages[i] = new ArrayBlockingQueue<String>(queueSize);
84+
}
1685

17-
super(queueSize, handler, maxPacketSizeBytes, poolSize, workers);
18-
this.messages = new ArrayBlockingQueue<String>(queueSize);
86+
this.processorWorkQueue = new ArrayBlockingQueue[workers];
87+
for (int i = 0 ; i < workers ; i++) {
88+
this.processorWorkQueue[i] = new ArrayBlockingQueue<Integer>(queueSize);
89+
}
1990
}
2091

2192
@Override
2293
boolean send(final String message) {
2394
try {
95+
long threadId = Thread.currentThread().getId();
96+
// modulo reduction alternative to: long shard = threadID % this.lockShardGrain;
97+
// ref: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
98+
int shard = (int)((threadId * (long)this.lockShardGrain) >> 32);
99+
int processQueue = (int)((threadId * (long)this.workers) >> 32);
100+
24101
if (!shutdown) {
25-
messages.put(message);
102+
messages[shard].put(message);
103+
processorWorkQueue[processQueue].put(shard);
26104
return true;
27105
}
28106
} catch (InterruptedException e) {
@@ -36,57 +114,7 @@ boolean send(final String message) {
36114
public void run() {
37115

38116
for (int i = 0 ; i < workers ; i++) {
39-
executor.submit(new Runnable() {
40-
public void run() {
41-
boolean empty;
42-
ByteBuffer sendBuffer;
43-
44-
try {
45-
sendBuffer = bufferPool.borrow();
46-
} catch (final InterruptedException e) {
47-
handler.handle(e);
48-
return;
49-
}
50-
51-
while (!(messages.isEmpty() && shutdown)) {
52-
53-
try {
54-
55-
if (Thread.interrupted()) {
56-
return;
57-
}
58-
59-
final String message = messages.poll(WAIT_SLEEP_MS, TimeUnit.MILLISECONDS);
60-
if (message != null) {
61-
final byte[] data = message.getBytes(MESSAGE_CHARSET);
62-
if (sendBuffer.capacity() < data.length) {
63-
throw new InvalidMessageException(MESSAGE_TOO_LONG, message);
64-
}
65-
if (sendBuffer.remaining() < (data.length + 1)) {
66-
outboundQueue.put(sendBuffer);
67-
sendBuffer = bufferPool.borrow();
68-
}
69-
if (sendBuffer.position() > 0) {
70-
sendBuffer.put((byte) '\n');
71-
}
72-
sendBuffer.put(data);
73-
if (null == messages.peek()) {
74-
outboundQueue.put(sendBuffer);
75-
sendBuffer = bufferPool.borrow();
76-
}
77-
}
78-
} catch (final InterruptedException e) {
79-
if (shutdown) {
80-
endSignal.countDown();
81-
return;
82-
}
83-
} catch (final Exception e) {
84-
handler.handle(e);
85-
}
86-
}
87-
endSignal.countDown();
88-
}
89-
});
117+
executor.submit(new ProcessingTask(i));
90118
}
91119

92120
boolean done = false;

0 commit comments

Comments
 (0)