diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java index 5af4b8bd6..e683dcad7 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java @@ -143,39 +143,59 @@ public void start() { * @return void */ public Future startAsync() { - log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory()); + logServerInfo(); + initGroups(); + ServerBootstrap serverBootstrap = setupServerBootstrap(); - pipelineFactory.start(configCopy, namespacesHub); + InetSocketAddress addr = createInetSocketAddress(); + return bindServerBootstrap(serverBootstrap, addr); + } + + private void logServerInfo() { + log.info("Session store / pubsub factory used: {}", configCopy.getStoreFactory()); + } + + private ServerBootstrap setupServerBootstrap() { + ServerBootstrap b = new ServerBootstrap(); Class channelClass = NioServerSocketChannel.class; if (configCopy.isUseLinuxNativeEpoll()) { channelClass = EpollServerSocketChannel.class; } - - ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) - .channel(channelClass) - .childHandler(pipelineFactory); + .channel(channelClass) + .childHandler(pipelineFactory); applyConnectionOptions(b); + return b; + } + private InetSocketAddress createInetSocketAddress() { InetSocketAddress addr = new InetSocketAddress(configCopy.getPort()); if (configCopy.getHostname() != null) { addr = new InetSocketAddress(configCopy.getHostname(), configCopy.getPort()); } + return addr; + } - return b.bind(addr).addListener(new FutureListener() { + private Future bindServerBootstrap(ServerBootstrap serverBootstrap, InetSocketAddress addr) { + return serverBootstrap.bind(addr).addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { - if (future.isSuccess()) { - log.info("SocketIO server started at port: {}", configCopy.getPort()); - } else { - log.error("SocketIO server start failed at port: {}!", configCopy.getPort()); - } + handleBindResult(future); } }); } + private void handleBindResult(Future future) { + if (future.isSuccess()) { + log.info("SocketIO server started at port: {}", configCopy.getPort()); + } else { + log.error("SocketIO server start failed at port: {}!", configCopy.getPort()); + } + } + + protected void applyConnectionOptions(ServerBootstrap bootstrap) { SocketConfig config = configCopy.getSocketConfig(); bootstrap.childOption(ChannelOption.TCP_NODELAY, config.isTcpNoDelay()); diff --git a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java index 950c7e7a8..2f950ca6c 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java @@ -229,45 +229,60 @@ private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext c break; } - ByteBuf out = encoder.allocateBuffer(ctx.alloc()); - encoder.encodePacket(packet, out, ctx.alloc(), true); + handlePacket(msg, ctx, packet, writeFutureList); + handleAttachments(msg, ctx, packet, writeFutureList); + } + } - if (log.isTraceEnabled()) { - log.trace("Out message: {} sessionId: {}", out.toString(CharsetUtil.UTF_8), msg.getSessionId()); - } - if (out.isReadable() && out.readableBytes() > configuration.getMaxFramePayloadLength()) { - ByteBuf dstStart = out.readSlice(FRAME_BUFFER_SIZE); - dstStart.retain(); - WebSocketFrame start = new TextWebSocketFrame(false, 0, dstStart); - ctx.channel().write(start); - while (out.isReadable()) { - int re = Math.min(out.readableBytes(), FRAME_BUFFER_SIZE); - ByteBuf dst = out.readSlice(re); - dst.retain(); - WebSocketFrame res = new ContinuationWebSocketFrame(!out.isReadable(), 0, dst); - ctx.channel().write(res); - } - out.release(); - ctx.channel().flush(); - } else if (out.isReadable()){ - WebSocketFrame res = new TextWebSocketFrame(out); - ctx.channel().writeAndFlush(res); - } else { - out.release(); - } + private void handlePacket(final OutPacketMessage msg, ChannelHandlerContext ctx, Packet packet, ChannelFutureList writeFutureList) throws IOException { + ByteBuf out = encoder.allocateBuffer(ctx.alloc()); + encoder.encodePacket(packet, out, ctx.alloc(), true); - for (ByteBuf buf : packet.getAttachments()) { - ByteBuf outBuf = encoder.allocateBuffer(ctx.alloc()); - outBuf.writeByte(4); - outBuf.writeBytes(buf); - if (log.isTraceEnabled()) { - log.trace("Out attachment: {} sessionId: {}", ByteBufUtil.hexDump(outBuf), msg.getSessionId()); - } - writeFutureList.add(ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf))); + if (log.isTraceEnabled()) { + log.trace("Out message: {} sessionId: {}", out.toString(CharsetUtil.UTF_8), msg.getSessionId()); + } + + if (out.isReadable() && out.readableBytes() > configuration.getMaxFramePayloadLength()) { + handleLargePayload(ctx, out); + } else if (out.isReadable()) { + WebSocketFrame res = new TextWebSocketFrame(out); + ctx.channel().writeAndFlush(res); + } else { + out.release(); + } + } + + private void handleLargePayload(ChannelHandlerContext ctx, ByteBuf out) { + ByteBuf dstStart = out.readSlice(FRAME_BUFFER_SIZE); + dstStart.retain(); + WebSocketFrame start = new TextWebSocketFrame(false, 0, dstStart); + ctx.channel().write(start); + + while (out.isReadable()) { + int re = Math.min(out.readableBytes(), FRAME_BUFFER_SIZE); + ByteBuf dst = out.readSlice(re); + dst.retain(); + WebSocketFrame res = new ContinuationWebSocketFrame(!out.isReadable(), 0, dst); + ctx.channel().write(res); + } + + out.release(); + ctx.channel().flush(); + } + + private void handleAttachments(final OutPacketMessage msg, ChannelHandlerContext ctx, Packet packet, ChannelFutureList writeFutureList) { + for (ByteBuf buf : packet.getAttachments()) { + ByteBuf outBuf = encoder.allocateBuffer(ctx.alloc()); + outBuf.writeByte(4); + outBuf.writeBytes(buf); + if (log.isTraceEnabled()) { + log.trace("Out attachment: {} sessionId: {}", ByteBufUtil.hexDump(outBuf), msg.getSessionId()); } + writeFutureList.add(ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf))); } } + private void handleHTTP(OutPacketMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) throws IOException { Channel channel = ctx.channel(); Attribute attr = channel.attr(WRITE_ONCE); diff --git a/src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java b/src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java index 075733d10..8076925f5 100644 --- a/src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java +++ b/src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java @@ -56,10 +56,25 @@ public ByteBuf allocateBuffer(ByteBufAllocator allocator) { return allocator.heapBuffer(); } - +/*alterdo */ public void encodeJsonP(Integer jsonpIndex, Queue packets, ByteBuf out, ByteBufAllocator allocator, int limit) throws IOException { boolean jsonpMode = jsonpIndex != null; + ByteBuf buf = buildPacketsBuffer(packets, allocator, limit); + + if (jsonpMode) { + writeJsonPHeader(out, jsonpIndex); + } + + processUtf8(buf, out, jsonpMode); + buf.release(); + + if (jsonpMode) { + writeJsonPEnd(out); + } + } + + private ByteBuf buildPacketsBuffer(Queue packets, ByteBufAllocator allocator, int limit) throws IOException { ByteBuf buf = allocateBuffer(allocator); int i = 0; @@ -69,41 +84,52 @@ public void encodeJsonP(Integer jsonpIndex, Queue packets, ByteBuf out, break; } - ByteBuf packetBuf = allocateBuffer(allocator); - encodePacket(packet, packetBuf, allocator, true); - - int packetSize = packetBuf.writerIndex(); - buf.writeBytes(toChars(packetSize)); - buf.writeBytes(B64_DELIMITER); - buf.writeBytes(packetBuf); - + ByteBuf packetBuf = buildPacketBuffer(packet, allocator); + appendPacketToBuffer(packetBuf, buf); packetBuf.release(); - i++; + appendAttachmentsToBuffer(packet, buf); - for (ByteBuf attachment : packet.getAttachments()) { - ByteBuf encodedBuf = Base64.encode(attachment, Base64Dialect.URL_SAFE); - buf.writeBytes(toChars(encodedBuf.readableBytes() + 2)); - buf.writeBytes(B64_DELIMITER); - buf.writeBytes(BINARY_HEADER); - buf.writeBytes(encodedBuf); - } + i++; } - if (jsonpMode) { - out.writeBytes(JSONP_HEAD); - out.writeBytes(toChars(jsonpIndex)); - out.writeBytes(JSONP_START); - } + return buf; + } - processUtf8(buf, out, jsonpMode); - buf.release(); + private ByteBuf buildPacketBuffer(Packet packet, ByteBufAllocator allocator) throws IOException { + ByteBuf packetBuf = allocateBuffer(allocator); + encodePacket(packet, packetBuf, allocator, true); + return packetBuf; + } - if (jsonpMode) { - out.writeBytes(JSONP_END); + private void appendPacketToBuffer(ByteBuf packetBuf, ByteBuf buf) { + int packetSize = packetBuf.writerIndex(); + buf.writeBytes(toChars(packetSize)); + buf.writeBytes(B64_DELIMITER); + buf.writeBytes(packetBuf); + } + + private void appendAttachmentsToBuffer(Packet packet, ByteBuf buf) throws IOException { + for (ByteBuf attachment : packet.getAttachments()) { + ByteBuf encodedBuf = Base64.encode(attachment, Base64Dialect.URL_SAFE); + buf.writeBytes(toChars(encodedBuf.readableBytes() + 2)); + buf.writeBytes(B64_DELIMITER); + buf.writeBytes(BINARY_HEADER); + buf.writeBytes(encodedBuf); } } + private void writeJsonPHeader(ByteBuf out, Integer jsonpIndex) { + out.writeBytes(JSONP_HEAD); + out.writeBytes(toChars(jsonpIndex)); + out.writeBytes(JSONP_START); + } + + private void writeJsonPEnd(ByteBuf out) { + out.writeBytes(JSONP_END); + } + + private void processUtf8(ByteBuf in, ByteBuf out, boolean jsonpMode) { while (in.isReadable()) { short value = (short) (in.readByte() & 0xFF); diff --git a/src/main/java/com/corundumstudio/socketio/transport/PollingTransport.java b/src/main/java/com/corundumstudio/socketio/transport/PollingTransport.java index 9b3b1f19f..22dbda147 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/PollingTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/PollingTransport.java @@ -66,53 +66,69 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception List transport = queryDecoder.parameters().get("transport"); if (transport != null && NAME.equals(transport.get(0))) { - List sid = queryDecoder.parameters().get("sid"); - List j = queryDecoder.parameters().get("j"); - List b64 = queryDecoder.parameters().get("b64"); - - String origin = req.headers().get(HttpHeaderNames.ORIGIN); - ctx.channel().attr(EncoderHandler.ORIGIN).set(origin); - - String userAgent = req.headers().get(HttpHeaderNames.USER_AGENT); - ctx.channel().attr(EncoderHandler.USER_AGENT).set(userAgent); - - if (j != null && j.get(0) != null) { - Integer index = Integer.valueOf(j.get(0)); - ctx.channel().attr(EncoderHandler.JSONP_INDEX).set(index); - } - if (b64 != null && b64.get(0) != null) { - String flag = b64.get(0); - if ("true".equals(flag)) { - flag = "1"; - } else if ("false".equals(flag)) { - flag = "0"; - } - Integer enable = Integer.valueOf(flag); - ctx.channel().attr(EncoderHandler.B64).set(enable == 1); - } - - try { - if (sid != null && sid.get(0) != null) { - final UUID sessionId = UUID.fromString(sid.get(0)); - handleMessage(req, sessionId, queryDecoder, ctx); - } else { - // first connection - ClientHead client = ctx.channel().attr(ClientHead.CLIENT).get(); - if (client != null) { - handleMessage(req, client.getSessionId(), queryDecoder, ctx); - } - } - } finally { - req.release(); - } + handleTransportParameters(ctx, req, queryDecoder); return; } } ctx.fireChannelRead(msg); } - private void handleMessage(FullHttpRequest req, UUID sessionId, QueryStringDecoder queryDecoder, ChannelHandlerContext ctx) - throws IOException { + private void handleTransportParameters(ChannelHandlerContext ctx, FullHttpRequest req, QueryStringDecoder queryDecoder) { + List sid = queryDecoder.parameters().get("sid"); + List j = queryDecoder.parameters().get("j"); + List b64 = queryDecoder.parameters().get("b64"); + + String origin = req.headers().get(HttpHeaderNames.ORIGIN); + ctx.channel().attr(EncoderHandler.ORIGIN).set(origin); + + String userAgent = req.headers().get(HttpHeaderNames.USER_AGENT); + ctx.channel().attr(EncoderHandler.USER_AGENT).set(userAgent); + + handleJParameter(ctx, j); + handleB64Parameter(ctx, b64); + + try { + handleSidParameter(ctx, req, sid, queryDecoder); + } finally { + req.release(); + } + } + + private void handleJParameter(ChannelHandlerContext ctx, List j) { + if (j != null && j.get(0) != null) { + Integer index = Integer.valueOf(j.get(0)); + ctx.channel().attr(EncoderHandler.JSONP_INDEX).set(index); + } + } + + private void handleB64Parameter(ChannelHandlerContext ctx, List b64) { + if (b64 != null && b64.get(0) != null) { + String flag = b64.get(0); + if ("true".equals(flag)) { + flag = "1"; + } else if ("false".equals(flag)) { + flag = "0"; + } + Integer enable = Integer.valueOf(flag); + ctx.channel().attr(EncoderHandler.B64).set(enable == 1); + } + } + + private void handleSidParameter(ChannelHandlerContext ctx, FullHttpRequest req, List sid, QueryStringDecoder queryDecoder) { + if (sid != null && sid.get(0) != null) { + final UUID sessionId = UUID.fromString(sid.get(0)); + handleMessage(req, sessionId, queryDecoder, ctx); + } else { + // first connection + ClientHead client = ctx.channel().attr(ClientHead.CLIENT).get(); + if (client != null) { + handleMessage(req, client.getSessionId(), queryDecoder, ctx); + } + } + } + + private void handleMessage(FullHttpRequest req, UUID sessionId, QueryStringDecoder queryDecoder, ChannelHandlerContext ctx) { + try { String origin = req.headers().get(HttpHeaderNames.ORIGIN); if (queryDecoder.parameters().containsKey("disconnect")) { ClientHead client = clientsBox.get(sessionId); @@ -128,6 +144,10 @@ private void handleMessage(FullHttpRequest req, UUID sessionId, QueryStringDecod log.error("Wrong {} method invocation for {}", req.method(), sessionId); sendError(ctx); } + } catch (IOException e) { + log.error("Exception handling message for session {}: {}", sessionId, e.getMessage(), e); + sendError(ctx); + } } private void onOptions(UUID sessionId, ChannelHandlerContext ctx, String origin) {