当前位置: 首页 > news >正文

Spark 行动算子(Action)全面解析

Spark 行动算子(Action)全面解析

摘要:本文系统梳理 Spark 中的行动算子,涵盖触发机制、常用 API 分类、执行原理与实际使用场景,帮助你真正理解 Action 背后发生了什么。


一、为什么要讲行动算子?

在 Spark 的编程模型中,算子分为两大类:

类型特点是否触发计算
转换算子(Transformation)惰性求值,返回新 RDD❌ 不触发
行动算子(Action)触发 DAG 执行,返回结果或写出数据✅ 触发

核心理解:每一次调用 Action,Spark 就会向 Driver 提交一个Job,Driver 将 DAG 切分成若干Stage,Stage 内部拆分为Task并分发到各 Executor 执行。

调用 Action └─► 提交 Job └─► DAG Scheduler 切分 Stage └─► Task Scheduler 分发 Task └─► Executor 执行 → 结果返回 Driver

二、行动算子分类总览

Action 算子 ├── 聚合类 collect / count / countByKey / countByValue ├── 取值类 first / take / takeOrdered / takeSample ├── 归约类 reduce / fold / aggregate ├── 遍历类 foreach / foreachPartition ├── 保存类 saveAsTextFile / saveAsObjectFile / saveAsSequenceFile └── 统计类 max / min / sum / mean / variance / stdev

三、常用行动算子详解

3.1 collect()

将 RDD 中所有数据收集到 Driver 端,返回Array

valrdd=sc.parallelize(List(1,2,3,4,5))valresult=rdd.collect()// result: Array[Int] = Array(1, 2, 3, 4, 5)

⚠️注意:生产环境中慎用!数据量过大会导致 Driver OOM。仅适合数据量可控的场景(如测试、调试)。


3.2 count()

返回 RDD 中元素的总个数。

valrdd=sc.parallelize(List("a","b","c","a"))println(rdd.count())// 4

3.3 countByKey()

仅适用于PairRDD (K, V),统计每个 Key 出现的次数,返回Map[K, Long]

valrdd=sc.parallelize(List(("a",1),("b",2),("a",3)))valresult=rdd.countByKey()// result: Map(a -> 2, b -> 1)

3.4 countByValue()

统计 RDD 中每个元素出现的次数,返回Map[T, Long]

valrdd=sc.parallelize(List("apple","banana","apple","orange"))valresult=rdd.countByValue()// result: Map(apple -> 2, banana -> 1, orange -> 1)

3.5 first()

返回 RDD 中的第一个元素,等价于take(1)(0)

valrdd=sc.parallelize(List(10,20,30))println(rdd.first())// 10

3.6 take(n)

返回 RDD 前 n 个元素组成的数组,不保证顺序(按分区顺序扫描)。

valrdd=sc.parallelize(List(5,3,1,4,2))rdd.take(3)// Array(5, 3, 1)

3.7 takeOrdered(n)

返回 RDD 中最小的 n 个元素(升序),可自定义排序规则。

valrdd=sc.parallelize(List(5,3,1,4,2))rdd.takeOrdered(3)// Array(1, 2, 3)rdd.takeOrdered(3)(Ordering[Int].reverse)// Array(5, 4, 3) 降序

take的区别:takeOrdered会在每个 Partition 局部排序后再归并,效率优于sortBy + take


3.8 reduce(func)

通过一个二元函数对 RDD 所有元素进行归约,要求函数满足交换律和结合律

valrdd=sc.parallelize(List(1,2,3,4,5))valsum=rdd.reduce((a,b)=>a+b)// 15valmax=rdd.reduce((a,b)=>if(a>b)aelseb)// 5

执行过程:先在每个 Partition 内部归约,再将各 Partition 结果汇总到 Driver 做最终归约。


3.9 aggregate(zeroValue)(seqOp, combOp)

aggregate是最通用的归约算子,允许返回值类型与输入类型不同

  • seqOp:分区内的聚合函数(U, T) => U
  • combOp:分区间的合并函数(U, U) => U
// 同时计算总和与元素个数,从而得到平均值valrdd=sc.parallelize(List(1,2,3,4,5),2)val(sum,count)=rdd.aggregate((0,0))((acc,num)=>(acc._1+num,acc._2+1),// seqOp(a,b)=>(a._1+b._1,a._2+b._2)// combOp)valavg=sum.toDouble/count// 3.0

3.10 foreach(func)

对 RDD 每个元素执行函数,在 Executor 端执行,不返回值。常用于写入外部系统。

rdd.foreach(x=>println(x))// 输出在 Executor 端,Driver 不可见

3.11 foreachPartition(func)

分区为单位执行函数,每个分区调用一次。适合需要建立连接的场景(如数据库写入),避免每条数据都创建连接。

rdd.foreachPartition{iter=>valconn=createDBConnection()// 每个分区只建一次连接iter.foreach{record=>conn.write(record)}conn.close()}

最佳实践:凡是涉及外部资源(数据库、消息队列、缓存),优先用foreachPartition而非foreach,显著降低连接开销。


3.12 saveAsTextFile(path)

将 RDD 保存为文本文件,每个元素调用toString写为一行。分区数决定输出文件数。

rdd.saveAsTextFile("hdfs://namenode/output/result")// 输出: /output/result/part-00000, part-00001, ...

四、行动算子执行原理深入

4.1 宽依赖与 Stage 划分

rdd1 ──map──► rdd2 ──filter──► rdd3 ──reduceByKey──► rdd4 ──collect() ▲ Shuffle 边界 ◄── Stage 0 ──┤──── Stage 1 ───►
  • mapfilter是窄依赖,同属一个 Stage
  • reduceByKey触发 Shuffle,产生 Stage 边界
  • 调用collect()触发整个 Job 执行

4.2 Action 与 Job 的关系

valrdd=sc.textFile("data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)rdd.count()// Job 1rdd.collect()// Job 2,DAG 重新执行(除非 cache)

每次 Action 都是一个独立 Job。如果 RDD 会被多次使用,务必调用cache()persist()避免重复计算。


五、常见误区与最佳实践

❌ 误区 1:在循环中多次调用 Action

// 错误:每次循环都触发一个 Jobfor(i<-1to10){println(rdd.count())// 触发 10 个 Job!}// 正确:缓存后复用valcached=rdd.cache()valtotal=cached.count()

❌ 误区 2:用 collect() 处理大数据集

// 危险:数据全量拉到 Driverrdd.collect().foreach(process)// 安全:在 Executor 端处理rdd.foreach(process)// 或写出到存储rdd.saveAsTextFile(outputPath)

✅ 最佳实践总结

场景推荐算子
调试/验证少量数据take(n)first()
统计元素数量count()
全局聚合(同类型)reduce()
全局聚合(跨类型)aggregate()
写入外部存储foreachPartition()
落地到 HDFSsaveAsTextFile()
RDD 多次复用cache(),再 Action

六、总结

行动算子是 Spark 程序的"触发器",理解它的核心在于:

  1. 惰性求值:Transformation 只是构建 DAG,Action 才真正触发计算
  2. Job 粒度:每个 Action 对应一个 Job,合理减少 Action 调用次数
  3. 数据位置collect/take将数据拉回 Driver,foreach/save在 Executor 端处理
  4. 缓存策略:多次复用同一 RDD 时,配合cache()避免重算

掌握行动算子的选择与优化,是写出高性能 Spark 程序的基础。


如有问题欢迎在评论区交流,也欢迎关注后续 Spark 系列文章 🚀

http://www.zskr.cn/news/1472547.html

相关文章:

  • PHP多维数组操作与聚合分析
  • Chromatic:如何像外科手术一样精准修改Chromium/V8应用?
  • 算法复杂度的统计特征与实验验证的技术8
  • 保定 8 区县全套文案(全区统一固定标题:2026 上海防水补漏 + 瓷砖空鼓修复推荐,苏易修缮本土直营,老城老房漏水、瓷砖翘边拱起就近微创修) - 苏易修缮
  • 告别理论!用Proteus仿真直观理解PID算法:以51单片机温控为例
  • 创客匠人AI智能体:知识付费的效率革命与未来图景
  • 别再只用它开空调了!深度挖掘涂鸦万能红外遥控器的DIY模式:手把手教你学习并控制家里所有红外设备
  • 【工具推荐】手机上直接查看 CAN Log!iOS App「CANviewer」—— 汽车工程师的随身 CAN 分析工具
  • 基于 S7-1200 的隧道综合监控系统模块化 PLC 编程设计
  • 从“彩票假设”到智能体学习:深度网络剪枝的前沿玩法与未来猜想
  • 校园资源整合视角下大学生创业者的多元盈利模式探索
  • 3步快速上手:用StreamFX插件让OBS直播画面瞬间升级
  • python实战实例:杨辉三角
  • 2026年6个字体下载网站推荐,字体资源再也不怕不够
  • 从V-REP到CoppeliaSim 4.9.0:一个机器人仿真软件的版本变迁与安装避坑全记录
  • AI写标书工具软件:五维度技术架构深度拆解
  • 主流多 AI 聚合工具横向实测:程序员编码场景全维度对比
  • PyTorch版Informer时间序列预测代码包,含训练推理全流程与可视化结构图
  • 2026最新诚信优选长沙市黄金回收白银回收铂金回收彩金回收高口碑靠谱门店TOP5权威排行榜+联系方式推荐 - 前途无量YY
  • 告别STM32!用NVIDIA TX2串口+C语言搞定大疆C620电机控制(附完整代码)
  • Nginx 升级指南:从 1.24.0 升级到 1.30.0
  • Synopsys ICC GUI高效操作秘籍:除了鼠标点击,这些键盘热键和隐藏技巧让你布局布线快人一步
  • 别再凭感觉挑照片了!用FaceQnet给你的AI人脸识别系统做个‘质检员’
  • 别再依赖在线服务了!手把手教你用Fast Downward在本地搭建PDDL规划器(附VSCode配置避坑指南)
  • 2026最新诚信优选长治市黄金回收白银回收铂金回收彩金回收高口碑靠谱门店TOP5权威排行榜+联系方式推荐 - 前途无量YY
  • 2026年靖江大平层全屋高端定制企业选型指南
  • 实战避坑:Jenkins Pipeline中多容器Pod Agent的权限与日志问题解决指南
  • 2026最新诚信优选西安市黄金回收白银回收铂金回收彩金回收高口碑靠谱门店TOP5权威排行榜+联系方式推荐 - 前途无量YY
  • CVX默认求解器太慢?手把手教你为Matlab的CVX工具箱“外挂”MOSEK加速包(含许可证激活与路径配置详解)
  • 告别理论:在STM32F407上实测FFT逆变换,单精度和双精度结果对比一目了然