当前位置: 首页 > news >正文

[Flink] Flink 经典场景:数据流输出到多个Sink

需求描述

  • Flink 数据流的处理过程中,需要将同一数据流输出到多个输出器(Sink)。

需求分析

  • 在处理数据流时,Flink 提供了一种称为侧输出流(Side Output)的机制,可以将主数据流分割成多个不同的侧输出流。

这种机制在处理不同类型的数据时非常有用,避免了多次复制数据流带来的性能浪费。

  • 使用场景

侧输出流主要有2个作用:

  • 分隔过滤:将源数据中的不同类型的数据进行分割处理。例如,可以将不同价格类型的订单从主流中分开处理。
  • 延时数据处理:在处理延时窗口计算时,对延时到达的数据进行处理,避免数据丢失。

解决方案

案例示范

import com.alibaba.fastjson2.JSON;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @description 验证同一数据流,输出到 多个 sink (含: 侧流的方式)*/
public class FlinkJobDemo {private final static Logger log = LoggerFactory.getLogger(FlinkJobDemo.class);public static final String JOB_NAME = "DemoJob";public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // new StreamExecutionEnvironment();————> 错误示范,会报错: `NullPointerException: No execution.target specified in your configuration file.` )// 设置运行模式 | STREAMING, BATCH , AUTOMATICenvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//将配置设置成全局变量//environment.getConfig().setGlobalJobParameters(jobParameterTool);//加载数据源DataStreamSource<String> dataStreamSource = environment.fromElements( new String [] { "Hello", "World", "Bdp"});OutputTag< String > thirdSideOutput = new OutputTag<String>("ThirdSideOutput", TypeInformation.of(new TypeHint< String >() {  }));//第1个输出流(主流)dataStreamSource.keyBy( in -> in ).map( in -> { return in.toLowerCase(); } ).addSink(new RichSinkFunction<String>() {@Overridepublic void open(Configuration parameters) throws Exception {}public void invoke(String input, Context context) throws Exception {System.out.println(String.format("<sink:1> ts:%d, input:%s", System.currentTimeMillis(), JSON.toJSONString( input ) ));}});SingleOutputStreamOperator< String > dataStreamSource2 =dataStreamSource.process(new ProcessFunction< String , String>() {@Overridepublic void processElement(String value, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {collector.collect( value );//发送到主流context.output( thirdSideOutput, value );//发送到侧流}});//第2个输出流 (主流)dataStreamSource2.addSink(new RichSinkFunction<String>() {@Overridepublic void open(Configuration parameters) throws Exception {}public void invoke(String input, Context context) throws Exception {System.out.println(String.format("<sink:2> ts:%d, input:%s", System.currentTimeMillis(), JSON.toJSONString( input ) ));}});//第3个输出流 (侧流)dataStreamSource2 //dataStreamSource (x).getSideOutput(thirdSideOutput).keyBy( in -> in ).addSink(new RichSinkFunction<String>() {@Overridepublic void open(Configuration parameters) throws Exception {}public void invoke(String input, Context context) throws Exception {System.out.println(String.format("<sink:3> ts:%d, input:%s", System.currentTimeMillis(), JSON.toJSONString( input ) ));}});environment.execute(JOB_NAME);}
}

输出:

<sink:2> ts:1757422085116, input:"Hello"
<sink:2> ts:1757422085116, input:"Bdp"
<sink:2> ts:1757422085116, input:"World"
<sink:1> ts:1757422085048, input:"hello"
<sink:1> ts:1757422085048, input:"bdp"
<sink:1> ts:1757422085048, input:"world"<sink:3> ts:1757422085623, input:"Hello"
<sink:3> ts:1757422085650, input:"World"
<sink:3> ts:1757422085724, input:"Bdp"

X 参考文献

http://www.zskr.cn/news/1310.html

相关文章:

  • 【ChipIntelli 系列】SDK详解4——Makefile 设置 单SDK多工程文件夹实现方法
  • Codeforces Round 1049 (Div. 2)
  • java学习起航喽
  • 从windows 自动进入BIOS
  • Offer发放革命:Moka软件如何将平均入职转化率提升25%
  • 常见的一些Dos命令
  • AUC和ROC
  • CSP 2025 游记
  • KVM虚拟机快照链创建,合并,删除及回滚研究
  • AI编程新范式:从Coding到Vibe Coding,你准备好了吗?
  • KD-Tree
  • yyjj
  • Laravel PHP 忘记密码如何重置(创建新管理员账号)
  • 第一章 逻辑代数基础 - Wisdom
  • golang netpoll 底层原理
  • MATLAB R2025a安装教程和资源(中文版)
  • Xmanager Power Suite使用教程 - Invinc
  • Ubuntu 安装微信
  • 主存储器和cpu的链接
  • 滑动窗口(不与单调队列结合的总结)
  • 9.9未完成
  • 202205_宁波市赛_Cr4ck2
  • 20250909 GOJ 模拟赛
  • 自我介绍
  • MQ
  • 自我介绍+软工五问
  • 三数之和-leetcode
  • 相似了
  • 最新可用Docker镜像加速站点
  • 第一周作业