一文读懂DolphinScheduler插件机制:如何轻松扩展任务类型与数据源

一文读懂DolphinScheduler插件机制:如何轻松扩展任务类型与数据源

一、整体架构概览

DolphinScheduler 的插件体系基于 Java SPI(Service Provider Interface) 机制,配合 Google AutoService 自动生成注册文件,实现了零侵入的插件化扩展

dolphinscheduler-spi ← 核心接口层(SPI基础设施) dolphinscheduler-datasource-plugin ← 数据源插件层 dolphinscheduler-task-plugin ← Task插件层 dolphinscheduler-worker ← 插件消费层(Worker执行任务)

二、SPI 基础设施

PrioritySPI (接口) └── getIdentify(): SPIIdentify ← 插件唯一标识 + 优先级 └── compareTo(Integer) ← 优先级比较

PrioritySPIFactory 是插件发现的核心引擎

通过 Java 标准 ServiceLoader 扫描 classpath for (T t : ServiceLoader.load(spiClass)) { if (map.containsKey(t.getIdentify().getName())) { resolveConflict(t); // 同名插件按优先级决策,优先级相同则抛异常 } else { map.put(t.getIdentify().getName(), t); } }

插件注册方式:每个插件模块使用 @AutoService 注解,编译时自动在 META-INF/services/ 下生成 SPI 配置文件,无需手动维护

三、数据源插件详细流程

3.1 接口层次结构

DataSourceChannelFactory (SPI入口) └── getName() ← 插件唯一名称,如 "MYSQL" └── create() ← 创建 DataSourceChannel DataSourceChannel (通道) └── createAdHocDataSourceClient() ← 创建临时连接客户端 └── createPooledDataSourceClient() ← 创建连接池客户端 DataSourceClient (基础接口) └── getConnection(): Connection PooledDataSourceClient extends DataSourceClient └── createDataSourcePool() ← 创建 HikariCP 连接池

3.2 以 MySQL 为例的完整实现链

MySQLDataSourceChannelFactory ← @AutoService 注册 └── create() → MySQLDataSourceChannel └── createPooledDataSourceClient() → MySQLPooledDataSourceClient └── extends BasePooledDataSourceClient └── createDataSourcePool() → HikariDataSource ├── setDriverClassName() ├── setJdbcUrl() ├── setUsername() / setPassword() ├── setMinimumIdle() / setMaximumPoolSize() └── setConnectionTestQuery()

四、Task 插件详细流程

4.1 接口层次结构

TaskChannelFactory (SPI入口) extends UiChannelFactory, PrioritySPI └── getName() ← 插件类型名,如 "SHELL" └── create() ← 创建 TaskChannel └── getParams() ← 返回 UI 配置参数(前端渲染用) TaskChannel (通道) └── createTask(TaskExecutionContext) → AbstractTask └── parseParameters(ParametersNode) → AbstractParameters └── getResources(parameters) → ResourceParametersHelper └── cancelApplication(boolean) AbstractTask (任务执行基类) └── init() ← 初始化 └── handle(callback) ← 执行(抽象) └── cancel() ← 取消(抽象) └── getExitStatus() ← 根据 exitCode 返回状态

4.2 以 Shell 为例的完整实现链

ShellTaskChannelFactory ← @AutoService 注册 └── getName() → "SHELL" └── getParams() → [nodeName, runFlag, ...] ← 前端UI参数 └── create() → ShellTaskChannel └── createTask(ctx) → ShellTask └── handle(callback) └── ShellCommandExecutor.run(shellActuatorBuilder) └── 执行 Shell 脚本进程

五、两种插件的关键对