Flink 代码跑在 IDEA 里,尝试连接 Kafka?Kafaka 装在 WSL2 中,怎么连接?

Flink 代码跑在 IDEA 里,尝试连接 Kafka?Kafaka 装在 WSL2 中,怎么连接?

Flink 代码跑在 IDEA 里,尝试连接 Kafka?Kafaka 装在 WSL2 中,怎么连接?


连接 WSL2 里的 Kafka,关键在于解决网络连通性问题。你的 Flink 代码跑在 IDEA 里(Windows 上的 IDE,但实际执行在 WSL2 的 Java 环境),所以需要找到一个 WSL2 和 Windows 都能访问的地址来配置 Kafka。

这里给你一个最稳妥、能一次成功的完整操作流程:

1. 获取 WSL2 的正确 IP 地址(最关键一步)

千万不要用localhost,因为 WSL2 和 Windows 的网络命名空间不同,直接写localhost:9092大概率会连接失败。

WSL2 终端里执行这个命令,拿到你的 WSL2 网卡 IP:

bash

ip addr | grep eth0

你会看到类似172.x.y.z这样的地址,记下它,这就是客户端(我们的 Flink 程序)要连接的 Kafka Broker 地址。

2. 修改你的 Kafka 配置(让它监听这个 IP)

去你的 Kafka 安装目录(比如/usr/local/kafka/config/server.properties),找到并修改advertised.listeners属性:

properties

# 将这行配置的地址改为你的 WSL2 IP advertised.listeners=PLAINTEXT://172.x.y.z:9092

advertised.listeners这个参数决定了Kafka 告诉客户端(Flink)去哪里找自己,不设对的话,即使 Flink 找到了门,Kafka 也会指错路。

然后重启 Kafka,让配置生效。

3. 在 Flink 项目里添加 Kafka 连接器依赖

在你的pom.xml<dependencies>里加上这个。你用的是 Flink 1.15,flink-connector-kafka的版本一般是3.0.03.1.0建议和你的 Flink 版本保持一致

xml

<!-- Kafka 连接器依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>3.0.0-1.17</version> <!-- 如果 Flink 是 1.15,试试 3.0.0-1.17 或 3.1.0-1.18 --> </dependency>

4. 用 Flink 写一个消费者程序测试连接

用下面的代码,把bootstrap.servers换成你刚才记下的WSL2 IP,主题名换成你 Kafka 里有的主题。这段代码是连接 Kafka 并消费数据的最小单元。

java

import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class KafkaFlinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. 用你的 WSL2 IP 地址替换 [WSL2_IP] String bootstrapServers = "PLAINTEXT://172.x.y.z:9092"; String topicName = "你的主题名"; String consumerGroup = "flink-test-group"; // 2. 创建 Kafka Source (推荐使用较新的 KafkaSource 构建器) KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(bootstrapServers) .setTopics(topicName) .setGroupId(consumerGroup) .setStartingOffsets(OffsetsInitializer.latest()) // 从最新位置开始读 .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); // 3. 将 Source 添加到环境并打印输出 env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") .print("Kafka Output"); env.execute("Flink Kafka Connector Test"); } }

如果连接失败,还有个后备方案

有些版本的 WSL2 网络配置比较复杂,如果直接连 IP 还是失败,可以考虑在 Windows 端(管理员权限)配置一个端口转发,让 Windows 的localhost:9092指向 WSL2 的9092端口:

cmd

netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.x.y.z

这样配置后,你的 Flink 程序里就可以先用localhost:9092来尝试连接了。

💡 操作顺序总结

  1. 获取 IP:在 WSL2 里执行ip addr | grep eth0

  2. 改 Kafka 配置:修改advertised.listeners为这个 IP,重启 Kafka。

  3. 加依赖:在你的pom.xml里添加flink-connector-kafka

  4. 运行测试代码:把bootstrap.servers改成这个 IP,运行你的 Flink 消费者程序。

先用这个“硬核”的 IP 方式跑通,之后再慢慢优化。先去试一下,如果遇到报错,把错误信息贴给我,我帮你分析是网络问题、配置问题还是依赖版本问题!