Flink的函数接口与富函数类
函数接口
上一小节中学习过的Flink算子方法都有对应的接口来完成业务逻辑处理,我们可以自定义类来实现这些接口完成业务逻辑编写,然后将这些类作为参数传递给Flink算子。这些实现接口在Flink中我们通常称为函数接口,常见的Flink函数接口有:MapFunction、FlatMapFunction、ReduceFunction、FilterFunction等。
无论是Java代码还是Scala代码编写都可以单独实现对应函数接口后当做参数传递给Flink算子,下面举例说明。
案例:向Socket中输入通话数据,按照指定格式输出每个通话的拨号时间和结束时间。
向Socket中输入数据格式如下:
001,186,187,busy,1000,10
002,187,186,fail,2000,20
003,186,188,busy,3000,30
004,188,186,busy,4000,40
005,188,187,busy,5000,50
Java代码
public class CommonFunctionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /** * Socket中的数据格式如下: * 001,186,187,busy,1000,10 * 002,187,186,fail,2000,20 * 003,186,188,busy,3000,30 * 004,188,186,busy,4000,40 * 005,188,187,busy,5000,50 */ DataStreamSource<String> ds = env.socketTextStream("node5", 9999); ds.map(new MyMapFunction()).print(); env.execute(); } private static class MyMapFunction implements MapFunction<String, String> { @Override public String map(String value) throws Exception { //value格式:001,186,187,busy,1000,10 String[] split = value.split(","); //获取通话时间,并转换成yyyy-MM-dd HH:mm:ss格式 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String startTime = sdf.format(Long.parseLong(split[4])); //获取通话时长,通话时间加上通话时长,得到通话结束时间,转换成yyyy-MM-dd HH:mm:ss格式 String duration = split[5]; String endTime = sdf.format(Long.parseLong(split[4]) + Long.parseLong(duration)); return "基站ID:" + split[0] + ",主叫号码:" + split[1] + "," + "被叫号码:" + split[2] + ",通话类型:" + split[3] + "," + "通话开始时间:" + startTime + ",通话结束时间:" + endTime ; } } }Scala代码
object CommonFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //导入隐式转换 import org.apache.flink.api.scala._ /** * Socket中的数据格式如下: * 001,186,187,busy,1000,10 * 002,187,186,fail,2000,20 * 003,186,188,busy,3000,30 * 004,188,186,busy,4000,40 * 005,188,187,busy,5000,50 */ val ds: DataStream[String] = env.socketTextStream("node5", 9999) ds.map(new MyMapFunction()).print() env.execute() } private class MyMapFunction extends MapFunction[String, String] { override def map(value: String): String = { //value格式:001,186,187,busy,1000,10 val split: Array[String] = value.split(",") //获取通话时间,并转换成yyyy-MM-dd HH:mm:ss格式 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val startTime = sdf.format(split(4).toLong) //获取通话时长,通话时间加上通话时长,得到通话结束时间,转换成yyyy-MM-dd HH:mm:ss格式 val duration = split(5) val endTime = sdf.format(split(4).toLong + duration.toLong) "基站ID:" + split(0) + ",主叫号码:" + split(1) + "," + "被叫号码:" + split(2) + ",通话类型:" + split(3) + "," + "通话开始时间:" + startTime + ",通话结束时间:" + endTime } } }富函数类
Flink中除了有函数接口之外还有功能更强大的富函数接口,富函数接口与其他常规函数接口的不同在于:可以获取运行环境的上下文,在上下文环境中可以管理状态(状态内容后续章节介绍),并拥有一些生命周期方法,所以可以实现更复杂的功能。常见的富函数接口有:RichMapFunction、RichFlatMapFunction、RichFilterFunction等。
所有RichFunction中有一个生命周期的概念,典型的生命周期方法有:
open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用,一般用于初始化资源。
close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
下面通过案例来演示富函数接口的使用。
案例:读取Socket中数据,结合MySQL中电话对应的姓名来输出信息。
1) 准备mysql数据
create database mydb; use mydb; create table person_info ( phone_num varchar(255), name varchar(255), city varchar(255) )ENGINE=InnoDB DEFAULT CHARSET=utf8; insert into person_info values (186,"张三","北京"),(187,"李四","上海"),(188,"王五","深圳");**2) 编写代码**
* **Java代码**
public class RichFunctionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /** * Socket中的数据格式如下: * 001,186,187,busy,1000,10 * 002,187,186,fail,2000,20 * 003,186,188,busy,3000,30 * 004,188,186,busy,4000,40 * 005,188,187,busy,5000,50 */ DataStreamSource<String> ds = env.socketTextStream("node5", 9999); ds.map(new MyRichMapFunction()).print(); env.execute(); } private static class MyRichMapFunction extends RichMapFunction<String,String> { Connection conn = null; PreparedStatement pst = null; ResultSet rst = null; // open()方法在map方法之前执行,用于初始化 @Override public void open(Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://node2:3306/mydb?useSSL=false","root","123456"); pst = conn.prepareStatement("select * from person_info where phone_num = ?"); } // map方法,输入一个元素,返回一个元素 @Override public String map(String value) throws Exception { //value 格式:001,186,187,busy,1000,10 String[] split = value.split(","); String sid = split[0]; String callOut = split[1];//主叫 String callIn = split[2];//被叫 String callType = split[3];//通话类型 String callTime = split[4];//通话时间 String duration = split[5];//通话时长 //mysql中获取主叫和被叫的姓名 String callOutName = ""; String callInName = ""; pst.setString(1,callOut); rst = pst.executeQuery(); while (rst.next()){ callOutName = rst.getString("name"); } pst.setString(1,callIn); rst = pst.executeQuery(); while (rst.next()){ callInName = rst.getString("name"); } return "基站ID:" + sid + ",主叫号码:" + callOut + ",主叫姓名:" + callOutName + "," + "被叫号码:" + callIn + ",被叫姓名:" + callInName + ",通话类型:" + callType + "," + "通话时间:" + callTime + ",通话时长:" + duration+"s"; } // close()方法在map方法之后执行,用于清理 @Override public void close() throws Exception { rst.close(); pst.close(); conn.close(); } } }Scala代码
object RichFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //导入隐式转换 import org.apache.flink.api.scala._ /** * Socket中的数据格式如下: * 001,186,187,busy,1000,10 * 002,187,186,fail,2000,20 * 003,186,188,busy,3000,30 * 004,188,186,busy,4000,40 * 005,188,187,busy,5000,50 */ val ds: DataStream[String] = env.socketTextStream("node5", 9999) ds.map(new MyRichMapFunction).print() env.execute() } private class MyRichMapFunction extends RichMapFunction[String, String] { private var conn: Connection = _ private var pst: PreparedStatement = _ private var rst: ResultSet = _ // open()方法在map方法之前执行,用于初始化 override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://node2:3306/mydb?useSSL=false", "root", "123456") pst = conn.prepareStatement("select * from person_info where phone_num = ?") } // map方法,输入一个元素,返回一个元素 override def map(value: String): String = { //value 格式:001,186,187,busy,1000,10 val split: Array[String] = value.split(",") val sid: String = split(0) val callOut: String = split(1) //主叫 val callIn: String = split(2) //被叫 val callType: String = split(3) //通话类型 val callTime: String = split(4) //通话时间 val duration: String = split(5) //通话时长 //mysql中获取主叫和被叫的姓名 var callOutName = "" var callInName = "" pst.setString(1, callOut) rst = pst.executeQuery() while (rst.next()) { callOutName = rst.getString("name") } pst.setString(1, callIn) rst = pst.executeQuery() while (rst.next()) { callInName = rst.getString("name") } s"基站ID:$sid,主叫号码:$callOut,主叫姓名:$callOutName," + s"被叫号码:$callIn,被叫姓名:$callInName,通话类型:$callType," + s"通话时间:$callTime,通话时长:$duration" } // close()方法在map方法之后执行,用于清理 override def close(): Unit = { if (rst != null) rst.close() if (pst != null) pst.close() if (conn != null) conn.close() } } }