Flink 代码跑在 IDEA 里,尝试连接 Kafka?Kafaka 装在 WSL2 中,怎么连接?
连接 WSL2 里的 Kafka,关键在于解决网络连通性问题。你的 Flink 代码跑在 IDEA 里(Windows 上的 IDE,但实际执行在 WSL2 的 Java 环境),所以需要找到一个 WSL2 和 Windows 都能访问的地址来配置 Kafka。
这里给你一个最稳妥、能一次成功的完整操作流程:
1. 获取 WSL2 的正确 IP 地址(最关键一步)
千万不要用localhost,因为 WSL2 和 Windows 的网络命名空间不同,直接写localhost:9092大概率会连接失败。
在WSL2 终端里执行这个命令,拿到你的 WSL2 网卡 IP:
bash
ip addr | grep eth0
你会看到类似172.x.y.z这样的地址,记下它,这就是客户端(我们的 Flink 程序)要连接的 Kafka Broker 地址。
2. 修改你的 Kafka 配置(让它监听这个 IP)
去你的 Kafka 安装目录(比如/usr/local/kafka/config/server.properties),找到并修改advertised.listeners属性:
properties
# 将这行配置的地址改为你的 WSL2 IP advertised.listeners=PLAINTEXT://172.x.y.z:9092
advertised.listeners这个参数决定了Kafka 告诉客户端(Flink)去哪里找自己,不设对的话,即使 Flink 找到了门,Kafka 也会指错路。
然后重启 Kafka,让配置生效。
3. 在 Flink 项目里添加 Kafka 连接器依赖
在你的pom.xml的<dependencies>里加上这个。你用的是 Flink 1.15,flink-connector-kafka的版本一般是3.0.0或3.1.0,建议和你的 Flink 版本保持一致。
xml
<!-- Kafka 连接器依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>3.0.0-1.17</version> <!-- 如果 Flink 是 1.15,试试 3.0.0-1.17 或 3.1.0-1.18 --> </dependency>
4. 用 Flink 写一个消费者程序测试连接
用下面的代码,把bootstrap.servers换成你刚才记下的WSL2 IP,主题名换成你 Kafka 里有的主题。这段代码是连接 Kafka 并消费数据的最小单元。
java
import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class KafkaFlinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. 用你的 WSL2 IP 地址替换 [WSL2_IP] String bootstrapServers = "PLAINTEXT://172.x.y.z:9092"; String topicName = "你的主题名"; String consumerGroup = "flink-test-group"; // 2. 创建 Kafka Source (推荐使用较新的 KafkaSource 构建器) KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(bootstrapServers) .setTopics(topicName) .setGroupId(consumerGroup) .setStartingOffsets(OffsetsInitializer.latest()) // 从最新位置开始读 .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); // 3. 将 Source 添加到环境并打印输出 env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") .print("Kafka Output"); env.execute("Flink Kafka Connector Test"); } }如果连接失败,还有个后备方案
有些版本的 WSL2 网络配置比较复杂,如果直接连 IP 还是失败,可以考虑在 Windows 端(管理员权限)配置一个端口转发,让 Windows 的localhost:9092指向 WSL2 的9092端口:
cmd
netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.x.y.z
这样配置后,你的 Flink 程序里就可以先用localhost:9092来尝试连接了。
💡 操作顺序总结
获取 IP:在 WSL2 里执行
ip addr | grep eth0。改 Kafka 配置:修改
advertised.listeners为这个 IP,重启 Kafka。加依赖:在你的
pom.xml里添加flink-connector-kafka。运行测试代码:把
bootstrap.servers改成这个 IP,运行你的 Flink 消费者程序。
先用这个“硬核”的 IP 方式跑通,之后再慢慢优化。先去试一下,如果遇到报错,把错误信息贴给我,我帮你分析是网络问题、配置问题还是依赖版本问题!