【Netty源码解读和权威指南】第49篇:Netty实战——开发高性能im即时通讯系统

【Netty源码解读和权威指南】第49篇:Netty实战——开发高性能im即时通讯系统

上一篇【第48篇】Netty整合Spring——生产级Netty服务的最佳实践
下一篇【第50篇】Netty MQTT实战——开发物联网消息服务器


一、IM系统架构

+-----+ +-----+ +-----+ +------------------+ | 客户端 | | 客户端 | | 客户端 | | IM接入服务器 | +-----+ +-----+ +-----+ +------------------+ | | | | +---------+---------+--------------+ | +------------------+ | 消息路由层 | +------------------+ | +-------------+-------------+ | | +-------------+ +-------------+ | 用户管理 | | 消息存储 | +-------------+ +-------------+

二、核心Handler实现

@ChannelHandler.SharablepublicclassIMServerHandlerextendsChannelInboundHandlerAdapter{// userId → Channel映射privatestaticfinalConcurrentHashMap<String,Channel>userChannels=newConcurrentHashMap<>();@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg){IMMessageimMsg=(IMMessage)msg;switch(imMsg.getType()){caseLOGIN:handleLogin(ctx,imMsg);break;caseSINGLE_CHAT:handleSingleChat(ctx,imMsg);break;caseGROUP_CHAT:handleGroupChat(ctx,imMsg);break;caseHEARTBEAT:handleHeartbeat(ctx);break;}}privatevoidhandleLogin(ChannelHandlerContextctx,IMMessagemsg){userChannels.put(msg.getFromUser(),ctx.channel());ctx.channel().attr(AttributeKey.valueOf("userId")).set(msg.getFromUser());ctx.writeAndFlush(IMMessage.of(LOGIN_RESP,"OK"));System.out.println("用户登录: "+msg.getFromUser());}privatevoidhandleSingleChat(ChannelHandlerContextctx,IMMessagemsg){Channeltarget=userChannels.get(msg.getToUser());if(target!=null&&target.isActive()){target.writeAndFlush(msg);}else{ctx.writeAndFlush(IMMessage.of(ERROR,"用户不在线"));}}@OverridepublicvoidchannelInactive(ChannelHandlerContextctx){StringuserId=(String)ctx.channel().attr(AttributeKey.valueOf("userId")).get();if(userId!=null){userChannels.remove(userId);System.out.println("用户下线: "+userId);}}}

三、消息协议

publicclassIMMessage{publicenumType{LOGIN,LOGIN_RESP,SINGLE_CHAT,GROUP_CHAT,HEARTBEAT,ERROR}privateTypetype;privateStringfromUser;privateStringtoUser;privateStringcontent;privatelongtimestamp;// getters/setters...}// 编码器publicclassIMMessageEncoderextendsMessageToByteEncoder<IMMessage>{protectedvoidencode(ChannelHandlerContextctx,IMMessagemsg,ByteBufout){// type(1B) + fromLen(1B) + fromUser + toLen(1B) + toUser + contentLen(4B) + contentout.writeByte(msg.getType().ordinal());writeString(out,msg.getFromUser());writeString(out,msg.getToUser());out.writeLong(msg.getTimestamp());byte[]content=msg.getContent().getBytes();out.writeInt(content.length);out.writeBytes(content);}}

四、群聊广播

// 群聊实现:遍历channel列表广播privatestaticfinalMap<String,Set<Channel>>groupChannels=newConcurrentHashMap<>();privatevoidhandleGroupChat(ChannelHandlerContextctx,IMMessagemsg){Set<Channel>members=groupChannels.get(msg.getToUser());// toUser=群IDif(members!=null){for(Channelmember:members){if(member.isActive()&&member!=ctx.channel()){member.writeAndFlush(msg);}}}}

五、总结

功能实现方式
用户登录userId → Channel映射
单聊根据userId获取Channel直接发送
群聊遍历群成员Channel列表广播
心跳IdleStateHandler检测
下线channelInactive移除映射

上一篇【第48篇】Netty整合Spring——生产级Netty服务的最佳实践
下一篇【第50篇】Netty MQTT实战——开发物联网消息服务器