Flink编程模型与API(一)
针对Flink的编程模型与API进行讲解,主要基于DataStream API 进行编程,学习Flink编程方式,处理数据流程以及转换处理,本章节中涉及到的代码实现使用Java和Scala两种语言来实现。
Flink API
Stateful Stream Processing
底层的状态流处理API的抽象程度最低,而且只能用于流处理,提供了非常灵活的接口,可以用于自定义底层与状态、时间相关的操作。
DataSteam/DataSet API
这一层级的API是Flink中的核心API,这一层级中要处理的数据会被抽象成数据流(DataStream)或数据集(DataSet),然后在其上通过定义转换操作实现业务逻辑,例如:map/flatMap/window/keyby/sum/join等,这一层级API的使用风格与Java 8中的Stream使用风格十分类似。
Table API
在DataStream/DataSet API 之上是Table API ,Table API和DataStream/DataSet API不同,不是用复杂的函数定义业务流程的,而是用陈述性的语言加以描述,这样就大大降低编程难度,增强描述性。这种语言来着SQL语法,只不过以API的形式呈现出来,既然有了Table API ,那么自然可以直接使用SQL来进行描述,这就是最上层的SQL。
SQL
Flink提供的最高层级的抽象是SQL,这一层抽象在语法与表达能力上与Table API 类似,SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。
总而言之,越上层的API,其描述性和可阅读性越强,越下层API,其灵活度高、表达力越强,多数时候上层API能做到的事情,下层API也能做到,反过来未必,不过这些API的底层模型是一致的,可以混合使用。
Flink架构可以处理批和流,Flink 批处理数据需要使用到Flink中的DataSet API,此API主要是支持Flink针对批数据进行操作,本质上Flink处理批数据也是看成一种特殊的流处理(有界流),所以没有必要分成批和流两套API,从Flink1.12版本往后,Dataset API 已经标记为Legacy(已过时),已被官方软弃用,官方建议使用Table API 或者SQL 来处理批数据,我们也可以使用带有Batch执行模式的DataStream API来处理批数据(DataSet和DataStream API做到了合并),在未来Flink版本中DataSet API 将会被删除。
DataStream API的学习对于理解Flink数据处理流程非常方便,上手相对来说比较容易,下面我们先从核心API层开始学习,对于底层API、Table API、SQL部分在后续章节在做介绍。
Flink编程模型
代码编写流程
我们知道DataStream的编程模型包括以下几个部分:Environment、DataSource、Transformation、DataSink、触发执行。
nvironment是编写Flink程序的基础,不同层级API编程中创建的Environment环境不同,如:Dataset 编程中需要创建ExecutionEnvironment,DataStream编程中需要创建StreamExecutionEnvironment,在Table和SQL API中需要创建TableExecutionEnvironment,使用不同语言编程导入的包也不同,在获取到对应的Environment后我们还可以进行外参数的配置,例如:并行度、容错机制设置等。
DataSource部分主要定义了数据接入功能,主要是将外部数据接入到Flink系统中并转换成DataStream对象供后续的转换使用。Transformation部分有各种各样的算子操作可以对DataStream流进行转换操作,最终将转换结果数据通过DataSink写出到外部存储介质中,例如:文件、数据库、Kafka消息系统等。
在DataStream编程中编写完成DataSink代码后并不意味着程序结束,由于Flink是基于事件驱动处理的,有一条数据时就会进行处理,所以最后一定要使用Environment.execute()来触发程序执行。
Flink数据类型
在Flink内部处理数据时,涉及到数据的网络传输、数据的序列化及反序列化,Flink需要知道操作的数据类型,为了能够在分布式计算过程中对数据的类型进行管理和判断,Flink中定义了TypeInformation来对数据类型进行描述,通过TypeInfomation能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样可以有效避免用户在编写Flink应用的过程出现数据类型问题。
常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo类等,针对这些常用TypeInfomation介绍如下:
Flink通过实现BasicTypeInfo数据类型,能够支持任意Java原生基本(或装箱)类型和String类型,例如:Integer,String,Double等,除了BasicTypeInfo外,类似的还有BasicArrayTypeInfo,支持Java中数组和集合类型;
通过定义TupleTypeInfo来支持Tuple类型的数据;
通过CaseClassTypeInfo支持Scala Case Class ;
PojoTypeInfo可以识别任意的POJOs类,包括Java和Scala类,POJOs可以完成复杂数据架构的定义,但是在Flink中使用POJOs数据类型需要满足以下要求:
POJOs类必须是Public修饰且独立定义,不能是内部类;
POJOs 类中必须含有默认空构造器;
POJOs类中所有的Fields必须是Public或者具有Public修饰的getter和Setter方法;
在使用Java API开发Flink应用时,通常情况下Flink都能正常进行数据类型推断进而选择合适的serializers以及comparators,但是在定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,Flink就获取不到对应的类型信息,这就需要借助类型提示(Type Hints)来告诉系统函数中传入的参数类型信息和输出类型,进而对数据类型进行推断处理。如:
Flink序列化机制
在两个进程进行远程通信时,它们需要将各种类型的数据以二进制序列的形式在网络上传输,数据发送方需要将对象转换为字节序列,进行序列化,而接收方则将字节序列恢复为各种对象,进行反序列化。对象的序列化有两个主要用途:一是将对象的字节序列永久保存到硬盘上,通常存放在文件中;二是在网络上传输对象的字节序列。序列化的好处包括减少数据在内存和硬盘中的占用空间,减少网络传输开销,精确推算内存使用情况,降低垃圾回收的频率。
Flink序列化机制负责在节点之间传输数据时对数据对象进行序列化和反序列化,确保数据的正确性和一致性。Flink提供了多种序列化器,包括Kryo、Avro和Java序列化器等,大多数情况下,用户不用担心flink的序列化框架,Flink会通过TypeInfomation在数据处理之前推断数据类型,进而使用对应的序列化器,例如:针对标准类型(int,double,long,string)直接由Flink自带的序列化器处理,其他类型默认会交给Kryo处理。但是对于Kryo仍然无法处理的类型,可以采取以下两种解决方案:
public class Student { public Integer id; public String name; public Integer age; public Student() { } public Student(Integer id, String name, Integer age) { this.id = id; this.name = name; this.age = age; } @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}'; } } public class StudentSerializer extends Serializer { @Override public void write(Kryo kryo, Output output, Object o) { Student student = (Student) o; output.writeInt(student.id); output.writeString(student.name); output.writeInt(student.age); } @Override public Object read(Kryo kryo, Input input, Class aClass) { Student student = new Student(); student.id = input.readInt(); student.name = input.readString(); student.age = input.readInt(); return student; } }