当前位置: 首页 > news >正文

Flink编程模型与API(四)

Transformation 类算子是 Apache Flink 中用于定义数据流处理的基本构建块。它们允许对DataStream数据流进行转换和操作,包括数据转换、数据操作和数据重组,通过Transformation类算子,可以对输入数据流进行映射、过滤、聚合等操作,生成新的DataStream数据流作为输出,以满足特定的处理需求。下面分别介绍Flink中常见的Transformation类算子。

map

map用于对输入的DataStream数据流中的每个元素进行映射操作,它接受一个函数作为参数,该函数将每个输入元素转换为一个新的元素,并生成一个新的数据流作为输出。DataStream类型数据通过map函数进行数据转换后还会得到DataStream类型,其中数据格式可能会发生变化。 下图演示将输入数据集中的每个数值全部加1处理,经过map算子转换后输出到下游数据集。

flatMap

flatMap算子用于对输入的DataStream中的每个元素进行扁平化映射操作的算子,它接受一个函数作为参数,该函数将每个输入元素转换为零个或多个新的元素,并生成一个新的DataStream数据流作为输出。DataStream类型数据通过map函数进行数据转换后还会得到DataStream类型,其中数据格式可能会发生变化。

与map算子不同,flatMap算子可以生成比输入更多的元素,因此可以用于扁平化操作。下图表示通过flatMap算子对输入数据集中每行数据按照逗号分割得到新的数据流输出到下游。

Filter

keyBy

KeyBy算子用于将输入的DataStream按照指定的键或键选择器函数进行分组操作,它接受一个键选择器函数作为参数,该函数根据输入元素返回一个键,用于将数据流中的元素分组到不同的分区中,相同键的元素分配到同一个分区中,以便后续的操作可以基于键对数据进行聚合、合并或其他操作。

KeyBy算子使用时可以通过KeySelector函数来指定key键,DataStream通过KeyBy算子处理后得到的是KeyedStream对象,该对象也是DataStream。默认KeyBy算子会对数据流中指定的key键的hash值与Flink分区数(并行度)进行取模运算,从而决定该条数据后续被哪个并行度处理,如果Flink DataStream类型是POJOs类型,需要在该类型中重写hashCode方法,否则后续不能正确的将相同数据进行分组处理。

下图表示通过KeyBy算子将DataStream中的数据按照指定的key进行分组统计value总和。

Aggregations

Aggregations(聚合函数)是Flink中用于对输入数据进行聚合操作的函数集合,它们可以应用于KeyedStream上,将一组输入元素聚合为一个输出元素。

Flink提供了多种聚合函数,包括sum、min、minBy、max、maxBy,这些函数都是常见的聚合操作,作用如下:

sum:针对输入keyedStream对指定列进行sum求和操作。
min:针对输入keyedStream对指定列进行min最小值操作,结果流中其他列保持最开始第一条数据的值。
minBy:同min类似,对指定的字段进行min最小值操作minBy返回的是最小值对应的整个对象。
max:针对输入keyedStream对指定列进行max最大值操作,结果流中其他列保持最开始第一条数据的值。
maxBy:同max类似,对指定的字段进行max最大值操作,maxBy返回的是最大值对应的整个对象。
Java代码实现

Java代码和Scala代码执行后结果如下:

# sum执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=150} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=200} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=290} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343077146, duration=590} # min 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343412282, duration=30} # minBy 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343474909, duration=120} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} StationLog{sid='sid1', callOut='18600000001', callIn='18600000002', callType='fail', callTime=1685343474909, duration=30} # max 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343523009, duration=300} # maxBy 执行结果 StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000000', callIn='18600000001', callType='success', callTime=1685343559342, duration=120} StationLog{sid='sid1', callOut='18600000004', callIn='18600000005', callType='success', callTime=1685343559342, duration=300}

reduce

union

union算子是Flink流处理框架中数据流合并算子,可以将多个输入的DataStream多个数据流进行合并,并输出一个新的DataStream数据流作为结果,适用于需要将多个数据流合并为一个流的场景。

需要注意的是union合并的数据流类型必须相同,合并之后的数据流包含两个或多个流中所有元素,并且数据类型不变。下图表示将两个流进行合并得到合并后的结果流,并将结果输出到下游。

connect

connect算子将两个输入的DataStream数据流作为参数,将两个不同数据类型的DataStream数据流连接在一起,生成一个ConnectedStreams对象作为结果,与union算子不同,union只是简单的将两个类型一样的流合并在一起,而connect算子可以将不同类型的DataStream连接在一起,并且connect只能连接两个流。

connect生成的结果保留了两个输入流的类型信息,例如:dataStream1数据集为(String, Int)元祖类型,dataStream2数据集为Int类型,通过connect连接算子将两个不同数据类型的流结合在一起,其内部数据为[(String, Int), Int]的混合数据类型,保留了两个原始数据集的数据类型。

对于连接后的数据流可以使用map、flatMap、process等算子进行操作,但内部方法使用的是CoMapFunction、CoFlatMapFunction、CoProcessFunction等函数来进行处理,这些函数称作“协处理函数”,分别接收两个输入流中的元素,并生成一个新的数据流作为输出,输出结果DataStream类型保持一致。

Java代码实现

iterate

iterate算子用于实现迭代计算的算子,它允许对输入的DataStream进行多次迭代操作,直到迭代条件不满足时迭代停止,该算子适合迭代计算场景,例如:机器学习中往往会对损失函数进行判断是否到达某个精度来判断训练是否需要结束就可以使用该算子来完成。

http://www.zskr.cn/news/1444007.html

相关文章:

  • Flink的函数接口与富函数类
  • 因瓦36选购,上海三青股份有哪些优势 - mypinpai
  • Veo 2企业级工作流集成指南:如何在Adobe Premiere+Runway+Veo 2三端同步触发场景切换(含时间码精准对齐协议)
  • 3步免费解锁WeMod专业版:Wand-Enhancer完全使用指南
  • 2026年零基础无人机考证机构评测:航拍无人机培训/院校低空专业共建/零基础学无人机/低空合规加盟/低空无人机院校加盟/选择指南 - 优质品牌商家
  • Obsidian科研模板库:研究者的终极知识管理解决方案
  • 如何快速分析虚幻引擎Pak文件:5个可视化技巧
  • 2026年6月杭州门窗推荐排行榜 品牌实力实测盘点 - 优质品牌商家
  • Sora 2立体视频生成实战指南:5步完成从文本提示→深度图生成→视差校准→双目合成→HDR10+输出全流程
  • BGP配置
  • Sora 2音乐视频制作提速300%:基于FFmpeg+Whisper+Custom Diffusion的端到端流水线
  • 郑州鼎力品牌的烘干机好用吗?多少钱? - 工业品牌热点
  • 2026年荣赢科技产品性能怎么样 - mypinpai
  • [特殊字符] 2025年Java面试通关秘籍:高频核心知识点全解析(建议收藏)
  • 2026年口碑好的急件航空运输公司有哪些? - mypinpai
  • 抖音无水印批量下载终极指南:三步搞定海量视频收藏
  • 3个实战技巧揭秘PyInstaller逆向分析:从黑盒到源码的深度解析
  • 报废设备回收机构哪家性价比高?北京钜旺如何 - mypinpai
  • 别再只测单接口了!用Postman Runner给你的图书管理系统做个‘压力体检’
  • nodejs nvm 安装与使用教程
  • Sora 2视频画质突变真相:3大压缩伪影、2类运动失真、5种光照崩溃场景全曝光(工程师内部测试日志)
  • 别再用OBS了!Sora 2原生录制引擎对比测试:延迟降低63%,带宽节省41%,但90%用户忽略的License授权陷阱
  • 如何用WaveTools鸣潮工具箱彻底改变你的游戏体验:终极优化指南
  • 【孤岛划分】分布式能源接入弹性配电网模型研究【IEEE33节点】(Matlab代码实现)
  • 2026年近期安徽铜陵代理记账公司深度分析与选择指南 - 2026年企业资讯
  • 甲级防火门标准规格与选购指南
  • Jellyfin Android TV客户端:打造智能电视媒体中心的终极解决方案
  • 深圳阿曼卢梭回收权威商家大盘点:广东帕图斯回收/广东干白回收/广东康帝系列回收/广东拉图回收/广东拉塔西回收/广东拉菲回收/选择指南 - 优质品牌商家
  • 卡在 Hermes 环境配置?这篇实操教程一次性搞定
  • 2026西南叉车价格选型指南:成都叉车出租/成都载货升降平台/手动升降平台/电动升降平台/载货升降平台/中力叉车/选择指南 - 优质品牌商家