当前位置: 首页 > news >正文

Flink的函数接口与富函数类

函数接口

上一小节中学习过的Flink算子方法都有对应的接口来完成业务逻辑处理,我们可以自定义类来实现这些接口完成业务逻辑编写,然后将这些类作为参数传递给Flink算子。这些实现接口在Flink中我们通常称为函数接口,常见的Flink函数接口有:MapFunction、FlatMapFunction、ReduceFunction、FilterFunction等。

无论是Java代码还是Scala代码编写都可以单独实现对应函数接口后当做参数传递给Flink算子,下面举例说明。

案例:向Socket中输入通话数据,按照指定格式输出每个通话的拨号时间和结束时间。

向Socket中输入数据格式如下:

001,186,187,busy,1000,10
002,187,186,fail,2000,20
003,186,188,busy,3000,30
004,188,186,busy,4000,40
005,188,187,busy,5000,50

Java代码

public class CommonFunctionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /** * Socket中的数据格式如下: * 001,186,187,busy,1000,10 * 002,187,186,fail,2000,20 * 003,186,188,busy,3000,30 * 004,188,186,busy,4000,40 * 005,188,187,busy,5000,50 */ DataStreamSource<String> ds = env.socketTextStream("node5", 9999); ds.map(new MyMapFunction()).print(); env.execute(); } private static class MyMapFunction implements MapFunction<String, String> { @Override public String map(String value) throws Exception { //value格式:001,186,187,busy,1000,10 String[] split = value.split(","); //获取通话时间,并转换成yyyy-MM-dd HH:mm:ss格式 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String startTime = sdf.format(Long.parseLong(split[4])); //获取通话时长,通话时间加上通话时长,得到通话结束时间,转换成yyyy-MM-dd HH:mm:ss格式 String duration = split[5]; String endTime = sdf.format(Long.parseLong(split[4]) + Long.parseLong(duration)); return "基站ID:" + split[0] + ",主叫号码:" + split[1] + "," + "被叫号码:" + split[2] + ",通话类型:" + split[3] + "," + "通话开始时间:" + startTime + ",通话结束时间:" + endTime ; } } }

Scala代码

object CommonFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //导入隐式转换 import org.apache.flink.api.scala._ /** * Socket中的数据格式如下: * 001,186,187,busy,1000,10 * 002,187,186,fail,2000,20 * 003,186,188,busy,3000,30 * 004,188,186,busy,4000,40 * 005,188,187,busy,5000,50 */ val ds: DataStream[String] = env.socketTextStream("node5", 9999) ds.map(new MyMapFunction()).print() env.execute() } private class MyMapFunction extends MapFunction[String, String] { override def map(value: String): String = { //value格式:001,186,187,busy,1000,10 val split: Array[String] = value.split(",") //获取通话时间,并转换成yyyy-MM-dd HH:mm:ss格式 val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val startTime = sdf.format(split(4).toLong) //获取通话时长,通话时间加上通话时长,得到通话结束时间,转换成yyyy-MM-dd HH:mm:ss格式 val duration = split(5) val endTime = sdf.format(split(4).toLong + duration.toLong) "基站ID:" + split(0) + ",主叫号码:" + split(1) + "," + "被叫号码:" + split(2) + ",通话类型:" + split(3) + "," + "通话开始时间:" + startTime + ",通话结束时间:" + endTime } } }

富函数类

Flink中除了有函数接口之外还有功能更强大的富函数接口,富函数接口与其他常规函数接口的不同在于:可以获取运行环境的上下文,在上下文环境中可以管理状态(状态内容后续章节介绍),并拥有一些生命周期方法,所以可以实现更复杂的功能。常见的富函数接口有:RichMapFunction、RichFlatMapFunction、RichFilterFunction等。

所有RichFunction中有一个生命周期的概念,典型的生命周期方法有:

open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用,一般用于初始化资源。
close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
下面通过案例来演示富函数接口的使用。

案例:读取Socket中数据,结合MySQL中电话对应的姓名来输出信息。

1) 准备mysql数据

create database mydb; use mydb; create table person_info ( phone_num varchar(255), name varchar(255), city varchar(255) )ENGINE=InnoDB DEFAULT CHARSET=utf8; insert into person_info values (186,"张三","北京"),(187,"李四","上海"),(188,"王五","深圳");

**2) 编写代码**

* **Java代码**

public class RichFunctionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /** * Socket中的数据格式如下: * 001,186,187,busy,1000,10 * 002,187,186,fail,2000,20 * 003,186,188,busy,3000,30 * 004,188,186,busy,4000,40 * 005,188,187,busy,5000,50 */ DataStreamSource<String> ds = env.socketTextStream("node5", 9999); ds.map(new MyRichMapFunction()).print(); env.execute(); } private static class MyRichMapFunction extends RichMapFunction<String,String> { Connection conn = null; PreparedStatement pst = null; ResultSet rst = null; // open()方法在map方法之前执行,用于初始化 @Override public void open(Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://node2:3306/mydb?useSSL=false","root","123456"); pst = conn.prepareStatement("select * from person_info where phone_num = ?"); } // map方法,输入一个元素,返回一个元素 @Override public String map(String value) throws Exception { //value 格式:001,186,187,busy,1000,10 String[] split = value.split(","); String sid = split[0]; String callOut = split[1];//主叫 String callIn = split[2];//被叫 String callType = split[3];//通话类型 String callTime = split[4];//通话时间 String duration = split[5];//通话时长 //mysql中获取主叫和被叫的姓名 String callOutName = ""; String callInName = ""; pst.setString(1,callOut); rst = pst.executeQuery(); while (rst.next()){ callOutName = rst.getString("name"); } pst.setString(1,callIn); rst = pst.executeQuery(); while (rst.next()){ callInName = rst.getString("name"); } return "基站ID:" + sid + ",主叫号码:" + callOut + ",主叫姓名:" + callOutName + "," + "被叫号码:" + callIn + ",被叫姓名:" + callInName + ",通话类型:" + callType + "," + "通话时间:" + callTime + ",通话时长:" + duration+"s"; } // close()方法在map方法之后执行,用于清理 @Override public void close() throws Exception { rst.close(); pst.close(); conn.close(); } } }

Scala代码

object RichFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //导入隐式转换 import org.apache.flink.api.scala._ /** * Socket中的数据格式如下: * 001,186,187,busy,1000,10 * 002,187,186,fail,2000,20 * 003,186,188,busy,3000,30 * 004,188,186,busy,4000,40 * 005,188,187,busy,5000,50 */ val ds: DataStream[String] = env.socketTextStream("node5", 9999) ds.map(new MyRichMapFunction).print() env.execute() } private class MyRichMapFunction extends RichMapFunction[String, String] { private var conn: Connection = _ private var pst: PreparedStatement = _ private var rst: ResultSet = _ // open()方法在map方法之前执行,用于初始化 override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://node2:3306/mydb?useSSL=false", "root", "123456") pst = conn.prepareStatement("select * from person_info where phone_num = ?") } // map方法,输入一个元素,返回一个元素 override def map(value: String): String = { //value 格式:001,186,187,busy,1000,10 val split: Array[String] = value.split(",") val sid: String = split(0) val callOut: String = split(1) //主叫 val callIn: String = split(2) //被叫 val callType: String = split(3) //通话类型 val callTime: String = split(4) //通话时间 val duration: String = split(5) //通话时长 //mysql中获取主叫和被叫的姓名 var callOutName = "" var callInName = "" pst.setString(1, callOut) rst = pst.executeQuery() while (rst.next()) { callOutName = rst.getString("name") } pst.setString(1, callIn) rst = pst.executeQuery() while (rst.next()) { callInName = rst.getString("name") } s"基站ID:$sid,主叫号码:$callOut,主叫姓名:$callOutName," + s"被叫号码:$callIn,被叫姓名:$callInName,通话类型:$callType," + s"通话时间:$callTime,通话时长:$duration" } // close()方法在map方法之后执行,用于清理 override def close(): Unit = { if (rst != null) rst.close() if (pst != null) pst.close() if (conn != null) conn.close() } } }

http://www.zskr.cn/news/1443999.html

相关文章:

  • 因瓦36选购,上海三青股份有哪些优势 - mypinpai
  • Veo 2企业级工作流集成指南:如何在Adobe Premiere+Runway+Veo 2三端同步触发场景切换(含时间码精准对齐协议)
  • 3步免费解锁WeMod专业版:Wand-Enhancer完全使用指南
  • 2026年零基础无人机考证机构评测:航拍无人机培训/院校低空专业共建/零基础学无人机/低空合规加盟/低空无人机院校加盟/选择指南 - 优质品牌商家
  • Obsidian科研模板库:研究者的终极知识管理解决方案
  • 如何快速分析虚幻引擎Pak文件:5个可视化技巧
  • 2026年6月杭州门窗推荐排行榜 品牌实力实测盘点 - 优质品牌商家
  • Sora 2立体视频生成实战指南:5步完成从文本提示→深度图生成→视差校准→双目合成→HDR10+输出全流程
  • BGP配置
  • Sora 2音乐视频制作提速300%:基于FFmpeg+Whisper+Custom Diffusion的端到端流水线
  • 郑州鼎力品牌的烘干机好用吗?多少钱? - 工业品牌热点
  • 2026年荣赢科技产品性能怎么样 - mypinpai
  • [特殊字符] 2025年Java面试通关秘籍:高频核心知识点全解析(建议收藏)
  • 2026年口碑好的急件航空运输公司有哪些? - mypinpai
  • 抖音无水印批量下载终极指南:三步搞定海量视频收藏
  • 3个实战技巧揭秘PyInstaller逆向分析:从黑盒到源码的深度解析
  • 报废设备回收机构哪家性价比高?北京钜旺如何 - mypinpai
  • 别再只测单接口了!用Postman Runner给你的图书管理系统做个‘压力体检’
  • nodejs nvm 安装与使用教程
  • Sora 2视频画质突变真相:3大压缩伪影、2类运动失真、5种光照崩溃场景全曝光(工程师内部测试日志)
  • 别再用OBS了!Sora 2原生录制引擎对比测试:延迟降低63%,带宽节省41%,但90%用户忽略的License授权陷阱
  • 如何用WaveTools鸣潮工具箱彻底改变你的游戏体验:终极优化指南
  • 【孤岛划分】分布式能源接入弹性配电网模型研究【IEEE33节点】(Matlab代码实现)
  • 2026年近期安徽铜陵代理记账公司深度分析与选择指南 - 2026年企业资讯
  • 甲级防火门标准规格与选购指南
  • Jellyfin Android TV客户端:打造智能电视媒体中心的终极解决方案
  • 深圳阿曼卢梭回收权威商家大盘点:广东帕图斯回收/广东干白回收/广东康帝系列回收/广东拉图回收/广东拉塔西回收/广东拉菲回收/选择指南 - 优质品牌商家
  • 卡在 Hermes 环境配置?这篇实操教程一次性搞定
  • 2026西南叉车价格选型指南:成都叉车出租/成都载货升降平台/手动升降平台/电动升降平台/载货升降平台/中力叉车/选择指南 - 优质品牌商家
  • 2026年6月北京定制游旅行社推荐:五大专业评测家庭游防拥挤案例价格 - 品牌推荐