33
44import io .netty .buffer .ByteBuf ;
55import io .netty .buffer .ByteBufOutputStream ;
6+ import io .netty .buffer .PooledByteBufAllocator ;
67import io .netty .channel .ChannelHandlerContext ;
78import io .netty .handler .codec .ByteToMessageDecoder ;
9+ import io .netty .handler .codec .DecoderException ;
810import org .apache .logging .log4j .LogManager ;
911import org .apache .logging .log4j .Logger ;
1012
1416import java .util .HashMap ;
1517import java .util .List ;
1618import java .util .Map ;
19+ import java .util .concurrent .TimeUnit ;
1720import java .util .zip .Inflater ;
1821import java .util .zip .InflaterOutputStream ;
1922
2023
2124public class BeatsParser extends ByteToMessageDecoder {
2225 private final static Logger logger = LogManager .getLogger (BeatsParser .class );
26+ private final static long maxDirectMemory = io .netty .util .internal .PlatformDependent .maxDirectMemory ();
2327
2428 private Batch batch ;
2529
@@ -45,15 +49,18 @@ private enum States {
4549 private int requiredBytes = 0 ;
4650 private int sequence = 0 ;
4751 private boolean decodingCompressedBuffer = false ;
52+ private long usedDirectMemory ;
53+ private boolean closeCalled = false ;
4854
4955 @ Override
5056 protected void decode (ChannelHandlerContext ctx , ByteBuf in , List <Object > out ) throws InvalidFrameProtocolException , IOException {
51- if (!hasEnoughBytes (in )) {
52- if (decodingCompressedBuffer ){
57+ if (!hasEnoughBytes (in )) {
58+ if (decodingCompressedBuffer ) {
5359 throw new InvalidFrameProtocolException ("Insufficient bytes in compressed content to decode: " + currentState );
5460 }
5561 return ;
5662 }
63+ usedDirectMemory = ((PooledByteBufAllocator ) ctx .alloc ()).metric ().usedDirectMemory ();
5764
5865 switch (currentState ) {
5966 case READ_HEADER : {
@@ -178,6 +185,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
178185
179186 case READ_COMPRESSED_FRAME : {
180187 logger .trace ("Running: READ_COMPRESSED_FRAME" );
188+
189+ if (usedDirectMemory + requiredBytes > maxDirectMemory * 0.90 ) {
190+ ctx .channel ().config ().setAutoRead (false );
191+ ctx .close ();
192+ closeCalled = true ;
193+ throw new IOException ("not enough memory to decompress this from " + ctx .channel ().id ());
194+ }
181195 inflateCompressedFrame (ctx , in , (buffer ) -> {
182196 transition (States .READ_HEADER );
183197
@@ -188,16 +202,20 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
188202 }
189203 } finally {
190204 decodingCompressedBuffer = false ;
205+ ctx .channel ().config ().setAutoRead (false );
206+ ctx .channel ().eventLoop ().schedule (() -> {
207+ ctx .channel ().config ().setAutoRead (true );
208+ }, 5 , TimeUnit .MILLISECONDS );
191209 transition (States .READ_HEADER );
192210 }
193211 });
194212 break ;
195213 }
196214 case READ_JSON : {
197215 logger .trace ("Running: READ_JSON" );
198- ((V2Batch )batch ).addMessage (sequence , in , requiredBytes );
199- if (batch .isComplete ()) {
200- if (logger .isTraceEnabled ()) {
216+ ((V2Batch ) batch ).addMessage (sequence , in , requiredBytes );
217+ if (batch .isComplete ()) {
218+ if (logger .isTraceEnabled ()) {
201219 logger .trace ("Sending batch size: " + this .batch .size () + ", windowSize: " + batch .getBatchSize () + " , seq: " + sequence );
202220 }
203221 out .add (batch );
@@ -256,6 +274,62 @@ private void batchComplete() {
256274 batch = null ;
257275 }
258276
277+ @ Override
278+ public void channelRead (ChannelHandlerContext ctx , Object msg ) throws Exception {
279+ //System.out.println("channelRead(" + ctx.channel().isActive() + ": " + ctx.channel().id() + ":" + currentState + ":" + decodingCompressedBuffer);
280+ if (closeCalled ) {
281+ ((ByteBuf ) msg ).release ();
282+ //if(batch != null) batch.release();
283+ return ;
284+ }
285+ usedDirectMemory = ((PooledByteBufAllocator ) ctx .alloc ()).metric ().usedDirectMemory ();
286+
287+ // If we're just beginning a new frame on this channel,
288+ // don't accumulate more data for 25 ms if usage of direct memory is above 20%
289+ //
290+ // The goal here is to avoid thundering herd: many beats connecting and sending data
291+ // at the same time. As some channels progress to other states they'll use more memory
292+ // but also give it back once a full batch is read.
293+ if ((!decodingCompressedBuffer ) && (this .currentState != States .READ_COMPRESSED_FRAME )) {
294+ if (usedDirectMemory > (maxDirectMemory * 0.40 )) {
295+ ctx .channel ().config ().setAutoRead (false );
296+ //System.out.println("pausing reads on " + ctx.channel().id());
297+ ctx .channel ().eventLoop ().schedule (() -> {
298+ //System.out.println("resuming reads on " + ctx.channel().id());
299+ ctx .channel ().config ().setAutoRead (true );
300+ }, 200 , TimeUnit .MILLISECONDS );
301+ } else {
302+ //System.out.println("no need to pause reads on " + ctx.channel().id());
303+ }
304+ } else if (usedDirectMemory > maxDirectMemory * 0.90 ) {
305+ ctx .channel ().config ().setAutoRead (false );
306+ ctx .close ();
307+ closeCalled = true ;
308+ ((ByteBuf ) msg ).release ();
309+ if (batch != null ) batch .release ();
310+ throw new IOException ("about to explode, cut them all down " + ctx .channel ().id ());
311+ }
312+ super .channelRead (ctx , msg );
313+ }
314+
315+ @ Override
316+ public void exceptionCaught (ChannelHandlerContext ctx , Throwable cause ) throws Exception {
317+ System .out .println (cause .getClass ().toString () + ":" + ctx .channel ().id ().toString () + ":" + this .currentState + "|" + cause .getMessage ());
318+ if (cause instanceof DecoderException ) {
319+ ctx .channel ().config ().setAutoRead (false );
320+ if (!closeCalled ) ctx .close ();
321+ } else if (cause instanceof OutOfMemoryError ) {
322+ cause .printStackTrace ();
323+ ctx .channel ().config ().setAutoRead (false );
324+ if (!closeCalled ) ctx .close ();
325+ } else if (cause instanceof IOException ) {
326+ ctx .channel ().config ().setAutoRead (false );
327+ if (!closeCalled ) ctx .close ();
328+ } else {
329+ super .exceptionCaught (ctx , cause );
330+ }
331+ }
332+
259333 @ FunctionalInterface
260334 private interface CheckedConsumer <T > {
261335 void accept (T t ) throws IOException ;
0 commit comments