当前位置: 首页 > news >正文

XXL-JOB-源码分享(1)

 自研调度组件并支持集群部署,可保证调度中心HA;

 

com.xxl.job.core.executor.XxlJobExecutor#initEmbedServer 初始化内部调度服务

查看代码
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip portport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);}

开启服务(netty 实现)

定义业务线程池

public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();// 自定义业务线程池ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} catch (Throwable e) {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Throwable e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}

EmbedHttpServerHandler  内部处理器 、主要处理来自admi 服务以http方式对执行器服务的调用。主要包括心跳、执行、停止、日志等接口。

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// validif (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri == null || uri.trim().length() == 0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (accessToken != null&& accessToken.trim().length() > 0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {switch (uri) {case "/beat":return executorBiz.beat();case "/idleBeat":IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);case "/run":TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);case "/kill":KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);case "/log":LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);default:return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");}} catch (Throwable e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}

该执行器是实现为: com.xxl.job.core.biz.impl.ExecutorBizImpl

自动注册执行器地址(执行器会周期性自动注册任务,调度中心将会自动发现注册的任务并触发执行):

public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}

image

http://www.zskr.cn/news/8497.html

相关文章:

  • WPF 字符竖向排列的排版格式(直排)表明控件
  • 深入解析:HSA35NV001美光固态闪存NQ482NQ470
  • YOLO实战应用 1YOLOv5 架构与模块
  • SpringBoot整合RustFS:全方位优化文件上传性能
  • windows使用es-client插件
  • AI学习日记 - 实践
  • es中的索引
  • VIVADO的IP核 DDS快速采用——生成正弦波,线性调频波
  • 深入解析:C语言---判断语句
  • YOLO进阶提升 4训练准备与数据处理
  • YOLO进阶提升 5标注与配置
  • 【学术会议前沿信息|科研必备】IEEE/EI/Scopus三检护航!人工智能+自动化控制+人文社科+遥感+地理信息+视觉领域国际会议征稿启动,硕博生速来! - 教程
  • YOLO进阶提升 3YOLOv4 改进
  • 深入解析:数据库入门实战版
  • C# Avalonia 15- Animation- AnimationPlayerTest
  • JSONArray集合根据某个字段查询对象
  • 完整教程:Qt开发经验 --- qmake执行系统命令(15)
  • 13. LangChain4j + 加入检索增加生成 RAG(知识库) - Rainbow
  • CentOS 7 源码版 PhpMyAdmin 安装指南(适配 Nginx+PHP-FPM 环境) - 教程
  • AI智能体服务优秀的平台架构设计
  • 深入解析:YARN架构解析:深入理解Hadoop资源管理核心
  • JBoltAI:破解Java企业级AI应用落地难题的利器
  • Day04 C:\Users\Lenovo\Desktop\note\code\JavaSE\Basic\src\com\David\operator Demo01-08+Doc
  • springboot创建请求处理 - 指南
  • Mapper.xml与数据库进行映射的sql语言注意事项
  • 深入解析:人工智能学习:什么是LSTM模型
  • RabbitMQ 幂等性, 顺序性 和 消息积压 - 详解
  • resultMap和自定义映射结果形式(ResultMapManage)以及ResultMap Vs ResultType
  • 嵌入式设备不能正常上网问题
  • 2、论文固定模板(背景过度结尾)