netty客户端连接多个服务端
还是上一个项目,又变了一次方案,网安说服务器暴露端口太不安全,要求用服务器上部署TCP客户端程序,主动连接下属的各个终端,终端上面跑TCP服务端程序。
那就用Netty实现客户端,去连接多个服务端。
POM文件中引入netty:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.56.Final</version>
</dependency>
netty起始类,定时任务,从数据库中读取待连接的设备列表,设备列表中有一列是ip地址,
@Slf4j
@Component
public class NettyConfig {
@Autowired
NettyClientHandler nettyClientHandler;
@Value("${terminal.port}")
private Integer port;
@PostConstruct
public void init() {
NettyClient.setHandler(nettyClientHandler);
connectDevice();
}
/**
* 初始化连接
* 查询数据库表中所有host和port不为空的设备
* netty客户端尝试连接并保存至channels内存中
*/
@Scheduled(cron = "30/60 * * * * ?")
public void connectDevice() {
log.info("剔除长时间不活动的终端");
long curt = System.currentTimeMillis();
for(Map.Entry<String, NettyConnectInfo> entry : NettyClient.channels.entrySet()) {
String ip = entry.getKey();
NettyConnectInfo info = entry.getValue();
long actt = info.getActiveTime().getTime();
//log.info("{},{}", curt, actt);
if(curt-actt > 130000) {
log.info("--{}-连接2分钟无活动,剔除!", ip);
info.getChannel().close();
NettyClient.channels.remove(ip);
}
}
log.info("连接终端");
new Thread() {
@Override
public void run() {
try {
Map<String, NettyConnectInfo> mapParam = new HashMap<String, NettyConnectInfo>();
// device from db
List<DeviceEntity> list = deviceRepository.getIps();
for(DeviceEntity it : list) {
NettyConnectInfo info = new NettyConnectInfo();
info.setPort(port);
info.setChannel(null);
info.setConnected(false);
info.setDevId(it.getIdentifierIn());
info.setDeviceInfo(it);
mapParam.put(it.getIp()+":"+String.valueOf(port), info);
}
// connect
NettyClient.getChannel(mapParam);
}
catch (Exception e) {
log.error("Netty client start fail : {}", e.getMessage());
}
}
}.start();
}
}
NettyClient类,建立socket连接:
@Slf4j
public class NettyClient {
public static Map<String, NettyConnectInfo> channels = new HashMap<>();
static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
static Bootstrap bootstrap = null;
static NettyClientHandler handler;
public static void setHandler(NettyClientHandler h) {
handler = h;
}
/**
* 初始化Bootstrap
*/
public static final Bootstrap getBootstrap(EventLoopGroup group) {
if(bootstrap == null) {
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(20480, 20480, 65536))
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ByteArrayEncoder());
socketChannel.pipeline().addLast(handler);
}
});
}
return bootstrap;
}
// 获取所有连接
public static void getChannel(Map<String, NettyConnectInfo> map) {
//Map<String , ChannelFuture> result = new HashMap<>();
Bootstrap bootstrap = getBootstrap(null);
for(Map.Entry<String, NettyConnectInfo> entry : map.entrySet()) {
String ip = entry.getKey();
// 如果IP没有连接,则尝试连接
if(channels.get(ip) == null) {
String[] arr = ip.split(":");
NettyConnectInfo info = entry.getValue();
//bootstrap.remoteAddress(ip, info.getPort());
bootstrap.remoteAddress(arr[0], Integer.valueOf(arr[1]));
//log.info("...尝试连接{}:{}", arr[0], Integer.valueOf(arr[1]));
ChannelFuture future = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
//final EventLoop eventLoop = futureListener.channel().eventLoop();
if (!futureListener.isSuccess()) {
log.info("与"+ip+"连接失败!");
}
else {
info.setChannel(futureListener.channel());
info.setActiveTime(new Timestamp(System.currentTimeMillis()));
channels.put(ip, info);
}
});
}
}
}
}
NettyClientHandler类,处理消息,业务处理:
@Slf4j
@ChannelHandler.Sharable
@Component
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 建立连接时
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
try {
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
int port = ipSocket.getPort();
String host = ipSocket.getHostString();
log.info("与设备"+host+":"+port+"连接成功!");
ctx.fireChannelActive();
}
catch (Exception e) {
log.info("channelActive: " + e.getMessage());
}
}
/**
* 关闭连接时
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
int port = ipSocket.getPort();
String host = ipSocket.getHostString();
log.error("与设备" + host + ":" + port + "连接断开!");
NettyClient.channels.remove(host+":"+String.valueOf(port));
final EventLoop eventLoop = ctx.channel().eventLoop();
}
catch (Exception e) {
log.info("channelInactive: " + e.getMessage());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("服务端连接异常:" + ctx.channel().id().asShortText());
}
/**
* 收到报文,业务逻辑处理
*/
@Override
@Transactional
public void channelRead(ChannelHandlerContext ctx, Object msg) {
InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();
int port = ipSocket.getPort();
String host = ipSocket.getHostString();
ByteBuf byteBuf = (ByteBuf) msg;
byte[] buffers = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(buffers);
byteBuf.release();
//log.info("接收到 " + host + ":" + port + "的数据");
int rplen = OpsProtocol.parseHead(buffers);
// 断帧处理
if(rplen == -1) {
byte[] prerp = NettyClient.channels.get(host + ":" + port).getBuffer();
if(prerp == null) {
log.info("错误的报文");
return;
}
else {
byte[] buffers2 = new byte[prerp.length+buffers.length];
System.arraycopy(prerp, 0, buffers2, 0, prerp.length);
System.arraycopy(buffers, 0, buffers2, prerp.length, buffers.length);
buffers = buffers2;
rplen = OpsProtocol.parseHead(buffers2);
}
}
if(rplen > buffers.length) {
NettyClient.channels.get(host + ":" + port).setBuffer(buffers);
}
int cur = 0;
while (cur < buffers.length) {
// 粘桢处理
byte[] bytes = new byte[buffers.length-cur];
System.arraycopy(buffers, cur, bytes, 0, buffers.length-cur);
StrcApduHead apduHead = new StrcApduHead();
Head head = new Head();
EnumFunName en = OpsProtocol.parseHead(bytes, head, apduHead);
//……
// 解析报文,发送报文
ctx.channel().writeAndFlush(send2);
//……
cur += apduHead.getLength();
}
NettyClient.channels.get(host + ":" + port).setActiveTime(new Timestamp(System.currentTimeMillis()));
}
}
bean,记录了每个连接的信息:
@Data
public class NettyConnectInfo {
int port;
Channel channel;
boolean connected;
Timestamp activeTime;
DeviceEntity deviceInfo;
byte[] buffer;
// ……
}
@Data
@Entity
@Table(name="device")
public class DeviceEntity {
@Column(name = "ip" )
private String ip;
// ……
}
netty基于Reactor模式的处理,一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
作者:xuruilll
来源链接:https://blog.csdn.net/xuruilll/article/details/125162304
版权声明:
1、JavaClub(https://www.javaclub.cn)以学习交流为目的,由作者投稿、网友推荐和小编整理收藏优秀的IT技术及相关内容,包括但不限于文字、图片、音频、视频、软件、程序等,其均来自互联网,本站不享有版权,版权归原作者所有。
2、本站提供的内容仅用于个人学习、研究或欣赏,以及其他非商业性或非盈利性用途,但同时应遵守著作权法及其他相关法律的规定,不得侵犯相关权利人及本网站的合法权利。
3、本网站内容原作者如不愿意在本网站刊登内容,请及时通知本站(javaclubcn@163.com),我们将第一时间核实后及时予以删除。