Java,Netty,官方案例,弃包、Echo和时间服务通信代码案例分享

[复制链接]
查看5262 | 回复0 | 2022-3-14 04:08:06 | 显示全部楼层 |阅读模式
介绍
; p; @. \, g0 Q* z9 j; }$ t% l0 {( l2 E
前端时间用了不少Netty的内容,但总也是理解的不够深入,今天打开官网,从官网最简单的案例开始,这里并进行一个记录,进行一个分享。5 d3 [7 V7 [- T  B: {: Y
之前的参考:0 m5 y( f. P& K* L5 `
Netty,事件驱动,阻塞和非阻塞通信=>https://www.toutiao.com/a6946449742759019015/) Q. j, l, V4 \
NIO的ByteBuffer,Netty缓冲区ByteBuf及内部结构设计=>https://www.toutiao.com/a6950090347397726727/1 a5 g+ ^  s/ E% J$ t
Netty,Socket客户端与Netty服务器端通信,只传输字节案例=>https://www.toutiao.com/a6956565963378639368/$ v$ C5 g7 F  B, O: Y: L, |
Netty,实现HTTP服务器案例,实现简单的TCP通信案例=>https://www.toutiao.com/a6957860713264120333/
) P2 {  k$ g* FChannelInboundHandlerAdapter
/ S+ G5 g/ H6 E4 G# bChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现,ChannelInboundHandler提供了各种可以重写的事件处理程序方法。
8 _7 L- ~/ K9 n/ {3 `% c+ C5 Z代码共享$ e$ d" |$ W# c  ^7 ^. N4 i0 V
0 T% g; D& X* ]7 W8 a  u) o
弃包案例' ^$ k5 X: ^, O
官方说明:世界上最简单的协议不是:'Hello, World!',而是:DISCARD,没有任何响应的情况下丢弃任何接收到的数据。
2 c6 j  @5 h5 Q9 j6 }+ Himport io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * 丢弃任何传入数据 */public class DiscardServer {    private int port;    public DiscardServer(int port) {        this.port = port;    }    public void run() throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap(); // (2)            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class) // (3)                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ch.pipeline().addLast(new DiscardServerHandler());                        }                    })                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)            // Bind and start to accept incoming connections.            ChannelFuture f = b.bind(port).sync(); // (7)            // Wait until the server socket is closed.            // In this example, this does not happen, but you can do that to gracefully            // shut down your server.            f.channel().closeFuture().sync();        } finally {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception {        new DiscardServer(8080).run();    }}import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现 * ChannelInboundHandler提供了各种可以重写的事件处理程序方法 */public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)        // 重写channelRead()事件处理程序方法        // 每当从客户机接收到新数据时,就会使用接收到的消息调用此方法        // 接收到的消息的类型为ByteBuf        // 以静默方式丢弃接收到的数据        // ByteBuf是一个引用计数对象,release()方法显式释放        ((ByteBuf) msg).release(); // (3)    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)        // I/O错误或处理程序实现因处理事件时引发的异常而引发异常时,        // exceptionCaught()事件处理程序方法将使用Throwable调用。        // Close the connection when an exception is raised.        cause.printStackTrace();        ctx.close();    }}Echo交互8 I2 o. P; {" ^! H+ s
当连接打开时发送一条消息,并将收到的任何数据回显到服务器,echo客户端通过向服务器发送第一条消息来启动echo客户端和服务器之间的乒乓通信。( ?$ E2 m7 e5 l+ z% q- _
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.ssl.SslContext;import io.netty.handler.ssl.SslContextBuilder;import io.netty.handler.ssl.util.SelfSignedCertificate;/** * 回显从客户端接收到的任何数据 */public final class EchoServer {    static final boolean SSL = true;    static final int PORT = 8007;    public static void main(String[] args) throws Exception {        // Configure SSL.        final SslContext sslCtx;        if (SSL) {            SelfSignedCertificate ssc = new SelfSignedCertificate();            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();        } else {            sslCtx = null;        }        // Configure the server.        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG, 100)                    .handler(new LoggingHandler(LogLevel.INFO))                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline p = ch.pipeline();                            if (sslCtx != null) {                                p.addLast(sslCtx.newHandler(ch.alloc()));                            }                            //p.addLast(new LoggingHandler(LogLevel.INFO));                            p.addLast(new EchoServerHandler());                        }                    });            // Start the server.            ChannelFuture f = b.bind(PORT).sync();            // Wait until the server socket is closed.            f.channel().closeFuture().sync();        } finally {            // Shut down all event loops to terminate all threads.            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}import io.netty.channel.ChannelHandler.Sharable;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * Handler implementation for the echo server. */@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        System.out.println(msg);        ctx.write(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // 引发异常时关闭连接.        cause.printStackTrace();        ctx.close();    }}import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.ssl.SslContext;import io.netty.handler.ssl.SslContextBuilder;import io.netty.handler.ssl.util.InsecureTrustManagerFactory;/** * 当连接打开时发送一条消息,并将收到的任何数据回显到服务器。 * echo客户端通过向服务器发送第一条消息来启动echo客户端和服务器之间的乒乓通信。 */public final class EchoClient {    static final boolean SSL = true;    static final String HOST = "127.0.0.1";    static final int PORT = 8007;    static final int SIZE = 256;    public static void main(String[] args) throws Exception {        // Configure SSL.git        final SslContext sslCtx;        if (SSL) {            sslCtx = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();        } else {            sslCtx = null;        }        // Configure the client.        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap();            b.group(group)                    .channel(NioSocketChannel.class)                    .option(ChannelOption.TCP_NODELAY, true)                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ChannelPipeline p = ch.pipeline();                            if (sslCtx != null) {                                p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));                            }                            //p.addLast(new LoggingHandler(LogLevel.INFO));                            p.addLast(new EchoClientHandler());                        }                    });            // Start the client.            ChannelFuture f = b.connect(HOST, PORT).sync();            // Wait until the connection is closed.            f.channel().closeFuture().sync();        } finally {            // Shut down the event loop to terminate all threads.            group.shutdownGracefully();        }    }}import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * echo客户端的处理程序实现 */public class EchoClientHandler extends ChannelInboundHandlerAdapter {    private final ByteBuf firstMessage;    /**     * 创建客户端处理程序     */    public EchoClientHandler() {        firstMessage = Unpooled.buffer(EchoClient.SIZE);        for (int i = 0; i < firstMessage.capacity(); i++) {            firstMessage.writeByte((byte) i);        }    }    @Override    public void channelActive(ChannelHandlerContext ctx) {        // 第一次执行        System.out.println("已发送:" + firstMessage);        ctx.writeAndFlush(firstMessage);    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("已接收:" + msg);        ctx.write(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // 引发异常时关闭连接        cause.printStackTrace();        ctx.close();    }}时间服务通信! y9 D! }' C  `- q' A* q
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;/** * 编写时间服务器 */public class TimeServer {    private int port;    public TimeServer(int port) {        this.port = port;    }    public void run() throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            ServerBootstrap b = new ServerBootstrap(); // (2)            b.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class) // (3)                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)                        @Override                        public void initChannel(SocketChannel ch) throws Exception {                            ch.pipeline().addLast(new TimeServerHandler());                        }                    })                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)            // Bind and start to accept incoming connections.            ChannelFuture f = b.bind(port).sync(); // (7)            // Wait until the server socket is closed.            // In this example, this does not happen, but you can do that to gracefully            // shut down your server.            f.channel().closeFuture().sync();        } finally {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }    }    public static void main(String[] args) throws Exception {        new TimeServer(44456).run();    }}import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.ReferenceCountUtil;/** * ChannelInboundHandlerAdapter是ChannelInboundHandler的一个实现 * ChannelInboundHandler提供了各种可以重写的事件处理程序方法 */public class TimeServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(final ChannelHandlerContext ctx) { // (1)        final ByteBuf time = ctx.alloc().buffer(4); // (2)        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));        final ChannelFuture f = ctx.writeAndFlush(time); // (3)        f.addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) {                assert f == future;                ctx.close();            }        }); // (4)    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }}import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/** * 编写时间客户端 */public class TimeClient {    public static void main(String[] args) throws Exception {        String host = "127.0.0.1";        int port = 44456;        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap(); // (1)            b.group(workerGroup); // (2)            b.channel(NioSocketChannel.class); // (3)            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)            b.handler(new ChannelInitializer<SocketChannel>() {                @Override                public void initChannel(SocketChannel ch) throws Exception {                    ch.pipeline().addLast(new TimeClientHandler());                }            });            // 启动客户端.            ChannelFuture f = b.connect(host, port).sync(); // (5)            // 等待连接关闭            f.channel().closeFuture().sync();        } finally {            workerGroup.shutdownGracefully();        }    }}import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.Date;public class TimeClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        ByteBuf m = (ByteBuf) msg; // (1)        try {            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;            System.out.println(new Date(currentTimeMillis));            ctx.close();        } finally {            m.release();        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }}
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

14

金钱

0

收听

0

听众
性别

新手上路

金钱
14 元