在安卓中使用netty进行通信

共有四个文件 两个服务器的 两个客户端的
1/2服务器 NettyServerBootstrap
package com.example.c2534.myapplication2.netty_from8391.server;
import com.example.c2534.myapplication2.Const.Const;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServerBootstrap {
private int port;
public NettyServerBootstrap(int port) throws InterruptedException {
this.port = port;
bind();
}
private void bind() throws InterruptedException {
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 128);
//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
bootstrap.option(ChannelOption.TCP_NODELAY, true);
//保持长连接状态
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new NettyServerHandler());
}
});
ChannelFuture f= bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("服务器成功启动---------------");
}
}
//开启服务器端口监听
public static void main(String []args) throws InterruptedException {
NettyServerBootstrap bootstrap=new NettyServerBootstrap(Const.TCP_PORT);
}
}
2/2服务器 NettyServerHandle
package com.example.c2534.myapplication2.netty_from8391.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//channel失效,从Map中移除
//NettyChannelMap.remove((SocketChannel)ctx.channel());
System.out.println("我是服务器 侦测到一个不活跃的频道");
}
//这里是从客户端过来的消息
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object baseMsg) {
String msg1=((ByteBuf)baseMsg).toString(CharsetUtil.UTF_8).trim();
System.out.println("我是服务器 收到= "+msg1);
ByteBuf bb = Unpooled.wrappedBuffer("pong".getBytes(CharsetUtil.UTF_8));
System.out.println("我是服务器 发送= pong");
channelHandlerContext.writeAndFlush(bb);
//ReferenceCountUtil.release(baseMsg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
super.exceptionCaught(ctx, cause);
System.out.println("我是服务器 出现异常了!!!");
}
}
1/2客户端 NettyClientBootstrap
package com.example.c2534.myapplication2.netty_from8391;
import com.example.c2534.myapplication2.Const.Const;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
public class NettyClientBootstrap {
private int port= Const.TCP_PORT;
private String host= Const.HOST;
public SocketChannel socketChannel;
public void startNetty() throws InterruptedException {
System.out.println("长链接开始");
if(start()){
System.out.println("长链接成功");
ByteBuf bb = Unpooled.wrappedBuffer(("我是客户端,连接成功后我发送这一条信息".getBytes(CharsetUtil.UTF_8)));
socketChannel.writeAndFlush(bb);
}
}
public void send2server(String msg){
ByteBuf bb = Unpooled.wrappedBuffer((msg.getBytes(CharsetUtil.UTF_8)));
socketChannel.writeAndFlush(bb);
}
private Boolean start() throws InterruptedException {
EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
Bootstrap bootstrap=new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(host, port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(20, 10, 0));
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future = null ;
try {
future =bootstrap.connect(new InetSocketAddress(host,port)).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel)future.channel();
System.out.println("连接服务器 成功---------");
return true;
}else{
System.out.println("连接服务器 失败---------");
startNetty();
return false;
}
} catch (Exception e) {
System.out.println("无法连接----------------5秒后重试");
//这里最好暂停一下。不然会基本属于毫秒时间内执行很多次。
//造成重连失败
TimeUnit.SECONDS.sleep(5);
startNetty();
return false;
}
}
}
2/2客户端 NettyClientHandle
package com.example.c2534.myapplication2.netty_from8391;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
public class NettyClientHandler extends SimpleChannelInboundHandler<Object> {
//设置心跳时间 开始
public static final int MIN_CLICK_DELAY_TIME = 1000*30;
private long lastClickTime =0;
//设置心跳时间 结束
//利用写空闲发送心跳检测消息
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case WRITER_IDLE:
long currentTime = System.currentTimeMillis();
if(currentTime - lastClickTime > MIN_CLICK_DELAY_TIME){
lastClickTime = System.currentTimeMillis();
ByteBuf bb = Unpooled.wrappedBuffer("ping".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(bb);
System.out.println("向服务器发送 ping ");
}
break;
default:
break;
}
}
}
//这里是接受服务端发送过来的消息
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object baseMsg) throws Exception {
String msg1=((ByteBuf)baseMsg).toString(CharsetUtil.UTF_8).trim();
System.out.println("接受服务端发送过来的消息= "+msg1);
ReferenceCountUtil.release(msg1);
}
NettyClientBootstrap nettyClient=new NettyClientBootstrap();
//这里是断线要进行的操作
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("重连了。---------");
//这里最好暂停一下。不然会基本属于毫秒时间内执行很多次。
//造成重连失败
TimeUnit.SECONDS.sleep(5);
nettyClient.startNetty();
//ctx.channel().eventLoop().schedule();
}
//这里是出现异常的话要进行的操作
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
super.exceptionCaught(ctx, cause);
System.out.println("出现异常了。。。。。。。。。。。。。");
TimeUnit.SECONDS.sleep(10);
//nettyClient.startNetty(context);
cause.printStackTrace();
}
}