Flink DataStream API vs Flink SQL:核心异同对比

Flink DataStream API vs Flink SQL:核心异同对比

一、编译链路对比

二、多维度对比

维度

DataStream API

Flink SQL

抽象层次

命令式(HOW:告诉引擎怎么做)

声明式(WHAT:告诉引擎做什么)

优化空间

用户手动优化逻辑

优化器自动选择最优策略

前端编译

无(直接构建 Transformation)

完整的 SQL 编译流水线(Parse → Validate → Optimize → Physical Plan)

算子可控性

完全可控(自定义 ProcessFunction 等)

受限于 SQL 语义(通过 UDF 扩展)

状态管理

手动管理状态(State API)

框架自动管理状态(Aggregate State 等)

侧输出(Side Output)

OutputTag

不支持(需用 UNION ALL + Filter 模拟)

自定义窗口 Trigger

任意 Trigger 逻辑

仅支持标准窗口语义

复杂事件处理(CEP)

CEP Library

MATCH_RECOGNIZE(SQL 标准 CEP)

类型系统

Java/Scala 类型系统

SQL 类型系统(与 Java 类型有映射关系)

Schema 演化

手动处理

依赖 Catalog,有一定自动支持

适用场景

复杂事件处理、精细化控制、自定义状态逻辑

ETL、聚合分析、多表 Join、标准化 pipeline

三、运行时行为差异

特性

DataStream

Flink SQL

代码生成

部分算子使用 CodeGen(如 Calc 节点中的表达式计算编译为字节码)

状态结构

用户自定义

框架规定的内部状态格式(如 MapState 存储聚合中间结果)

序列化

TypeSerializer 由用户类型决定

内部使用 BinaryRowData 等紧凑行式格式

Watermark

用户指定 WatermarkStrategy

通过 DDL 中 WATERMARK FOR 子句声明

uid 管理

用户手动设置,完全可控

框架自动生成,基于查询结构的确定性 hash

拓扑变更恢复

只要 uid 一致即可恢复

SQL 语句任何修改(包括列顺序)可能导致 uid 变化,无法恢复

四、何时选择 DataStream API?

场景

原因

复杂事件处理(CEP 自定义模式)

需要精细控制状态和触发逻辑

自定义窗口逻辑(如 Session Gap 动态计算)

SQL 窗口语义固定,难以扩展

异步 IO 调用外部服务

SQL 无直接对应能力

精细化状态管理(如 BroadcastState 模式)

SQL 状态由框架管理,不可自定义结构

需要 Side Output 分流

SQL 不支持多路输出到不同类型的 Sink

与非结构化数据交互

SQL 要求强 Schema

低延迟要求(逐条处理,不能攒批)

SQL 的 Mini-batch 等优化可能引入延迟

五、何时选择 Flink SQL?

场景

原因

标准 ETL(过滤、映射、聚合)

SQL 表达简洁,优化器自动优化

多表 Join

优化器自动选择 Join 策略和顺序

维表关联(Lookup Join)

SQL 内置支持,无需手写异步逻辑

快速原型验证

声明式表达,开发效率高

团队 SQL 技能强于 Java 技能

降低上手门槛

需要统一批流逻辑

同一 SQL 可在两种模式下运行

频繁变更业务逻辑

SQL 变更无需重新编译部署 Jar

六、混用架构模式:SQL 与 DataStream 混用

在实际生产中,我们常常会将Flink SQL与DataStream API 搭配使用:

  • SQL 做主体 + UDF 补充
  • DataStream 为骨架 + SQL 做聚合分析
// Table → DataStream Table resultTable = tableEnv.sqlQuery("SELECT * FROM orders WHERE amount > 100"); DataStream<Row> resultStream = tableEnv.toDataStream(resultTable); // DataStream → Table DataStream<Order> orderStream = env.addSource(...); Table orderTable = tableEnv.fromDataStream(orderStream, Schema.newBuilder() .column("orderId", DataTypes.STRING()) .column("amount", DataTypes.DECIMAL(10, 2)) .columnByExpression("proc_time", "PROCTIME()") .watermark("event_time", "event_time - INTERVAL '5' SECOND") .build()); tableEnv.createTemporaryView("orders", orderTable);

混用时的注意事项:

  • toDataStream() 时 SQL 层的优化边界在此截断,后续 DataStream 操作不再享受 SQL 优化器的优化
  • fromDataStream() 时需要明确定义 Schema 映射,特别是时间属性和水位线
  • 混用时 uid 管理变得复杂,SQL 自动生成的 uid 在拓扑变更时可能不稳定