基于Netty实现自定义RPC框架
基于Netty实现自定义RPC框架
1. 概述
1.1 项目简介
本项目是一个基于Netty和Spring Boot实现的轻量级RPC(远程过程调用)框架。通过该框架,开发者可以像调用本地方法一样调用远程服务,实现服务解耦和分布式部署。
1.2 技术栈
| 技术 | 版本 | 说明 |
|---|---|---|
| Java | 17 | 语言基础 |
| Spring Boot | 2.7.6 | 服务管理框架 |
| Netty | 4.x (由Spring Boot管理) | 高性能网络通信框架 |
| FastJSON | 1.2.83 | JSON序列化/反序列化 |
| CGLIB | (Spring内置) | 高性能反射调用 |
| Lombok | (Spring Boot管理) | 简化代码 |
1.3 核心特性
- ✅服务注册: 通过
@RpcService注解自动注册服务 - ✅动态代理: JDK动态代理实现透明远程调用
- ✅异步通信: Netty异步非阻塞IO模型
- ✅序列化支持: FastJSON序列化/反序列化
- ✅优雅关闭: 资源优雅释放机制
2. 架构设计
2.1 整体架构图
┌─────────────────────────────────────────────────────────────────────────┐ │ RPC 框架架构图 │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ Consumer │ │ Provider │ │ │ │ (rpc-consumer) │ │ (rpc-provider) │ │ │ ├─────────────────────┤ ├─────────────────────┤ │ │ │ ClientBootstrap │ │ ServerBootstrapApp │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ ▼ │ │ │ │ RpcClientProxy │ │ RpcServer │ │ │ │ (JDK动态代理) │ │ (Netty服务端) │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ ▼ │ │ │ │ RpcClient │ │ RpcServerHandler │ │ │ │ (Netty客户端) │ │ (服务注册&处理) │ │ │ │ │ │ │ │ │ │ │ │ ▼ │ │ ▼ │ │ │ │ RpcClientHandler │ │ @RpcService │ │ │ │ (请求发送&响应) │ │ (服务实现类) │ │ │ └──────────┬──────────┘ └──────────┬──────────┘ │ │ │ │ │ │ │ Netty Socket │ │ │ └────────────────────────────────────┘ │ │ 网络传输层 │ └─────────────────────────────────────────────────────────────────────────┘2.2 模块划分
| 模块 | 职责 | 核心文件 |
|---|---|---|
| rpc-api | 公共接口定义 | IUserService.java,RpcRequest.java,RpcResponse.java,User.java |
| rpc-provider | 服务提供端 | RpcServer.java,RpcServerHandler.java,@RpcService,UserServiceImpl.java |
| rpc-consumer | 服务消费端 | RpcClient.java,RpcClientHandler.java,RpcClientProxy.java |
2.3 调用流程图
客户端调用 网络传输 服务端处理 │ │ │ ▼ │ │ 1. 用户调用代理对象方法 │ │ │ │ │ ▼ │ │ 2. RpcClientProxy封装RpcRequest │ │ │ │ │ ▼ │ │ 3. 序列化并发送请求 ──────────────► │ ────────────────────────────► 4. RpcServerHandler接收请求 │ │ │ │ │ ▼ │ │ 5. 反序列化RpcRequest │ │ │ │ │ ▼ │ │ 6. CGLIB反射调用服务方法 │ │ │ │ │ ▼ │ │ 7. 封装RpcResponse │ │ │ │ │ ▼ │ │ 8. 序列化并返回响应 │ │ │ ▼ │ │ 9. RpcClientHandler接收响应 ◄─────── │ ◄─────────────────────────── │ │ │ │ ▼ │ │ 10. 反序列化RpcResponse │ │ │ │ │ ▼ │ │ 11. 返回结果给调用方 │ │3. 核心组件详解
3.1 数据模型
3.1.1 RpcRequest(请求对象)
文件路径:rpc-api/src/main/java/com/weh/rpc/commom/RpcRequest.java
| 字段 | 类型 | 说明 |
|---|---|---|
requestId | String | 请求唯一标识(UUID) |
className | String | 目标服务接口全限定名 |
methodName | String | 调用方法名 |
parameterTypes | Class<?>[] | 参数类型数组 |
parameters | Object[] | 参数值数组 |
3.1.2 RpcResponse(响应对象)
文件路径:rpc-api/src/main/java/com/weh/rpc/commom/RpcResponse.java
| 字段 | 类型 | 说明 |
|---|---|---|
requestId | String | 对应请求的ID |
error | String | 错误信息(成功时为null) |
result | Object | 方法调用返回结果 |
3.1.3 User(数据传输对象)
文件路径:rpc-api/src/main/java/com/weh/rpc/pojo/User.java
| 字段 | 类型 | 说明 |
|---|---|---|
id | Integer | 用户ID |
name | String | 用户姓名 |
3.2 服务端核心组件
3.2.1 @RpcService注解
文件路径:rpc-provider/src/main/java/com/weh/anno/RpcService.java
用于标识需要暴露为RPC服务的类。服务启动时,RpcServerHandler会扫描所有带有此注解的Bean并注册到服务缓存中。
@Target(ElementType.TYPE)// 只能用于类/接口@Retention(RetentionPolicy.RUNTIME)// 运行时可获取public@interfaceRpcService{}3.2.2 RpcServerHandler(服务端处理器)
文件路径:rpc-provider/src/main/java/com/weh/handler/RpcServerHandler.java
核心职责:
- 服务注册: 实现
ApplicationContextAware,在Spring容器初始化时扫描@RpcService注解的Bean - 请求处理: 继承
SimpleChannelInboundHandler<String>,处理客户端请求 - 反射调用: 使用CGLIB的
FastClass和FastMethod进行高效方法调用
关键代码解析:
// 服务实例缓存,key为接口全限定名privatestaticfinalMap<String,Object>SERVICE_INSTANCE_MAP=newConcurrentHashMap<>();// Spring容器初始化时自动注册服务@OverridepublicvoidsetApplicationContext(ApplicationContextapplicationContext){Map<String,Object>beanMap=applicationContext.getBeansWithAnnotation(RpcService.class);for(ObjectserviceBean:beanMap.values()){// 获取服务实现的第一个接口作为keyStringinterfaceName=serviceBean.getClass().getInterfaces()[0].getName();SERVICE_INSTANCE_MAP.put(interfaceName,serviceBean);}}// 处理客户端请求@OverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,Stringmsg){RpcRequestrequest=JSON.parseObject(msg,RpcRequest.class);RpcResponseresponse=newRpcResponse();response.setRequestId(request.getRequestId());try{response.setResult(getResult(request));// 反射调用}catch(Exceptione){response.setError(e.getMessage());}ctx.writeAndFlush(JSON.toJSONString(response));}注意: 使用CGLIB反射在JDK 17+环境下需要添加JVM参数:
--add-opens java.base/java.lang=ALL-UNNAMED
3.2.3 RpcServer(服务端启动类)
文件路径:rpc-provider/src/main/java/com/weh/server/RpcServer.java
Netty服务端配置:
- BossGroup: 单线程,负责接收客户端连接
- WorkerGroup: 默认CPU核数*2,负责处理业务逻辑
- 编解码器:
StringDecoder+StringEncoder(字符串编解码)
优雅关闭: 实现DisposableBean接口,Spring容器销毁时自动释放资源。
3.3 客户端核心组件
3.3.1 RpcClientProxy(动态代理)
文件路径:rpc-consumer/src/main/java/com/weh/proxy/RpcClientProxy.java
核心职责: 通过JDK动态代理实现透明的远程调用。
代理流程:
- 创建
Proxy.newProxyInstance()生成代理对象 - 在
InvocationHandler.invoke()中封装RpcRequest - 创建
RpcClient发送请求 - 接收响应并反序列化返回
publicstaticObjectcreateProxy(Class<?>serviceClass){returnProxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),newClass[]{serviceClass},(proxy,method,args)->{// 1. 封装请求RpcRequestrequest=newRpcRequest();request.setRequestId(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);// 2. 发送请求try(RpcClientclient=newRpcClient("127.0.0.1",8081)){Objectresponse=client.send(JSON.toJSONString(request));RpcResponserpcResponse=JSON.parseObject(response.toString(),RpcResponse.class);// 3. 处理响应if(rpcResponse.getError()!=null){thrownewRuntimeException(rpcResponse.getError());}returnJSON.parseObject(rpcResponse.getResult().toString(),method.getReturnType());}});}3.3.2 RpcClient(客户端连接)
文件路径:rpc-consumer/src/main/java/com/weh/client/RpcClient.java
关键配置:
ChannelOption.SO_KEEPALIVE: 保持长连接ChannelOption.CONNECT_TIMEOUT_MILLIS: 连接超时3秒
发送机制: 使用ExecutorService.submit()提交任务,通过Future.get()阻塞等待响应。
3.3.3 RpcClientHandler(客户端处理器)
文件路径:rpc-consumer/src/main/java/com/weh/handler/RpcClientHandler.java
线程同步机制: 实现Callable接口,通过synchronized+wait/notify实现请求-响应同步:
// 发送请求后等待响应@OverridepublicsynchronizedObjectcall()throwsException{context.writeAndFlush(requestMsg);wait();// 阻塞等待服务端响应returnresponseMsg;}// 收到响应后唤醒线程@OverrideprotectedsynchronizedvoidchannelRead0(ChannelHandlerContextctx,Stringmsg){responseMsg=msg;notify();// 唤醒等待的线程}4. 使用说明
4.1 服务端开发
步骤1: 定义服务接口(在rpc-api模块)
publicinterfaceIUserService{UsergetUserById(intid);}步骤2: 实现服务并添加注解
@RpcService// 标识为RPC服务@ServicepublicclassUserServiceImplimplementsIUserService{@OverridepublicUsergetUserById(intid){// 业务逻辑}}步骤3: 配置Netty端口
netty:host:127.0.0.1port:8081步骤4: 启动服务
运行ServerBootstrapApplication类。
4.2 客户端开发
publicclassClientBootstrap{publicstaticvoidmain(String[]args){// 创建代理对象IUserServiceuserService=(IUserService)RpcClientProxy.createProxy(IUserService.class);// 像调用本地方法一样调用远程服务Useruser=userService.getUserById(1);System.out.println(user);}}5. 项目配置
5.1 Maven依赖
父pom.xml:
<properties><spring-boot.version>2.7.6</spring-boot.version></properties><dependencies><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- FastJSON --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><!-- Netty --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency></dependencies>5.2 模块依赖关系
rpc-provider └── rpc-api (依赖公共接口) rpc-consumer └── rpc-api (依赖公共接口)6. 运行验证
6.1 启动服务端
cdrpc-provider mvn spring-boot:run输出:
服务端启动成功6.2 运行客户端
cdrpc-consumer mvn exec:java-Dexec.mainClass="com.weh.ClientBootstrap"7. 代码优化建议
7.1 当前实现的局限
- 连接管理: 当前每次调用创建新连接,性能开销大
- 服务发现: 硬编码服务地址,缺乏服务注册中心
- 序列化: 仅支持JSON,可扩展Protobuf等高效序列化方案
- 负载均衡: 不支持多服务实例的负载均衡
- 异常处理: 错误处理较为简单
7.2 优化方向
| 优化项 | 方案 | 收益 |
|---|---|---|
| 连接池 | 使用Apache Commons Pool或Netty自带连接池 | 减少连接创建开销 |
| 服务注册中心 | 集成ZooKeeper或Nacos | 动态服务发现 |
| 序列化优化 | 添加Protobuf支持 | 提高传输效率 |
| 负载均衡 | 实现轮询/随机/加权算法 | 支持多实例部署 |
| 超时控制 | 添加请求超时机制 | 防止无限等待 |
8. 总结
本RPC框架基于Netty实现了核心的远程调用功能,包含以下关键技术点:
- Netty异步通信: 基于Reactor模式的高性能网络通信
- JDK动态代理: 实现透明的远程方法调用
- CGLIB反射: 高性能的方法调用实现
- JSON序列化: 简洁的数据传输格式
- Spring集成: 利用Spring容器管理服务Bean
