当然应用层还有更多复杂的方式可以解决这个问题,这个就属于网络层的问题了,我们还是用java提供的方式来解决这个问题。Spring Boot 学习笔记分享给你,我们先看一个例子看看粘包是如何发生的。
- public class HelloWordServer {
- private int port;
- public HelloWordServer(int port) {
- this.port = port;
- }
- public void start(){
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workGroup = new NioEventLoopGroup();
- ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ServerChannelInitializer());
- try {
- ChannelFuture future = server.bind(port).sync();
- future.channel().closeFuture().sync();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally {
- bossGroup.shutdownGracefully();
- workGroup.shutdownGracefully();
- }
- }
- public static void main(String[] args) {
- HelloWordServer server = new HelloWordServer(7788);
- server.start();
- }
- }
- public class ServerChannelInitializer extends ChannelInitializer
{ - @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline pipeline = socketChannel.pipeline();
- // 字符串解码 和 编码
- pipeline.addLast("decoder", new StringDecoder());
- pipeline.addLast("encoder", new StringEncoder());
- // 自己的逻辑Handler
- pipeline.addLast("handler", new HelloWordServerHandler());
- }
- }
- public class HelloWordServerHandler extends ChannelInboundHandlerAdapter {
- private int counter;
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- String body = (String)msg;
- System.out.println("server receive order : " + body + ";the counter is: " + ++counter);
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- super.exceptionCaught(ctx, cause);
- }
- }
- public class HelloWorldClient {
- private int port;
- private String address;
- public HelloWorldClient(int port,String address) {
- this.port = port;
- this.address = address;
- }
- public void start(){
- EventLoopGroup group = new NioEventLoopGroup();
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(group)
- .channel(NioSocketChannel.class)
- .handler(new ClientChannelInitializer());
- try {
- ChannelFuture future = bootstrap.connect(address,port).sync();
- future.channel().closeFuture().sync();
- } catch (Exception e) {
- e.printStackTrace();
- }finally {
- group.shutdownGracefully();
- }
- }
- public static void main(String[] args) {
- HelloWorldClient client = new HelloWorldClient(7788,"");
- client.start();
- }
- }
- public class ClientChannelInitializer extends ChannelInitializer
{ - protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline pipeline = socketChannel.pipeline();
- pipeline.addLast("decoder", new StringDecoder());
- pipeline.addLast("encoder", new StringEncoder());
- // 客户端的逻辑
- pipeline.addLast("handler", new HelloWorldClientHandler());
- }
- }
- public class HelloWorldClientHandler extends ChannelInboundHandlerAdapter {
- private byte[] req;
- private int counter;
- public BaseClientHandler() {
- req = ("Unless required by applicable law or agreed to in writing, software\n" +
- " distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
- " WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
- " See the License for the specific language governing permissions and\n" +
- " limitations under the License.This connector uses the BIO implementation that requires the JSSE\n" +
- " style configuration. When using the APR/native implementation, the\n" +
- " penSSL style configuration is required as described in the APR/native\n" +
- " documentation.An Engine represents the entry point (within Catalina) that processes\n" +
- " every request. The Engine implementation for Tomcat stand alone\n" +
- " analyzes the HTTP headers included with the request, and passes them\n" +
- " on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software\n" +
- "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" +
- "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
- "# See the License for the specific language governing permissions and\n" +
- "# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log\n" +
- "# each component that extends LifecycleBase changing state:\n" +
- "#org.apache.catalina.util.LifecycleBase.level = FINE"
- ).getBytes();
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- ByteBuf message;
- //将上面的所有字符串作为一个消息体发送出去
- message = Unpooled.buffer(req.length);
- message.writeBytes(req);
- ctx.writeAndFlush(message);
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- String buf = (String)msg;
- System.out.println("Now is : " + buf + " ; the counter is : "+ (++counter));
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.close();
- }
- }
- message = Unpooled.buffer(req.length);
- message.writeBytes(req);
- ctx.writeAndFlush(message);
- for (int i = 0; i < 3; i++) {
- message = Unpooled.buffer(req.length);
- message.writeBytes(req);
- ctx.writeAndFlush(message);
- }
对于上面出现的粘包和拆包的问题,Netty已有考虑,并且有实施的方案:LineBasedFrameDecoder。另外,微信搜索Java技术栈,在后台回复:面试,可以获取我整理的 Java 系列面试题和答案。
- public class ServerChannelInitializer extends ChannelInitializer
{ - @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline pipeline = socketChannel.pipeline();
- pipeline.addLast(new LineBasedFrameDecoder(2048));
- // 字符串解码 和 编码
- pipeline.addLast("decoder", new StringDecoder());
- pipeline.addLast("encoder", new StringEncoder());
- // 自己的逻辑Handler
- pipeline.addLast("handler", new BaseServerHandler());
- }
- }
新增:pipeline.addLast(new LineBasedFrameDecoder(2048))。同时,我们还得对上面发送的消息进行改造BaseClientHandler:
- public class BaseClientHandler extends ChannelInboundHandlerAdapter {
- private byte[] req;
- private int counter;
- req = ("Unless required by applicable dfslaw or agreed to in writing, software" +
- " distributed under the License is distributed on an \"AS IS\" BASIS," +
- " WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." +
- " See the License for the specific language governing permissions and" +
- " limitations under the License.This connector uses the BIO implementation that requires the JSSE" +
- " style configuration. When using the APR/native implementation, the" +
- " penSSL style configuration is required as described in the APR/native" +
- " documentation.An Engine represents the entry point (within Catalina) that processes" +
- " every request. The Engine implementation for Tomcat stand alone" +
- " analyzes the HTTP headers included with the request, and passes them" +
- " on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software" +
- "# distributed under the License is distributed on an \"AS IS\" BASIS," +
- "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." +
- "# See the License for the specific language governing permissions and" +
- "# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log" +
- "# each component that extends LifecycleBase changing state:" +
- "#org.apache.catalina.util.LifecycleBase.level = FINE\n"
- ).getBytes();
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- ByteBuf message;
- message = Unpooled.buffer(req.length);
- message.writeBytes(req);
- ctx.writeAndFlush(message);
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- String buf = (String)msg;
- System.out.println("Now is : " + buf + " ; the counter is : "+ (++counter));
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.close();
- }
- }
LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf 中的可读字节,判断看是否有”\n” 或者” \r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器。支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。这个对于我们确定消息最大长度的应用场景还是很有帮助。
对于上面的判断看是否有”\n” 或者” \r\n”以此作为结束的标志我们可能回想,要是没有”\n” 或者” \r\n”那还有什么别的方式可以判断消息是否结束呢。别担心,Netty对于此已经有考虑,还有别的解码器可以帮助我们解决问题,另外,关注公众号Java技术栈,在后台回复:面试,可以获取我整理的 Java 系列面试题和答案,非常齐全。
