构建分布式RouterSploit:突破单节点瓶颈,实现协同渗透测试

构建分布式RouterSploit:突破单节点瓶颈,实现协同渗透测试

1. 项目概述:从单兵作战到协同渗透的跃迁

在安全测试和渗透评估领域,RouterSploit 这个名字对很多从业者来说并不陌生。它是一个专注于嵌入式设备,尤其是路由器、摄像头、IoT设备漏洞利用的框架,集成了扫描、漏洞验证和利用模块,堪称针对这类设备的“瑞士军刀”。然而,传统的 RouterSploit 使用模式存在一个明显的天花板:单节点性能瓶颈。当你面对一个包含成百上千台设备的大型内网或复杂网络环境时,单点扫描和利用不仅效率低下,而且容易因网络波动、目标响应慢而卡住整个流程,更别提多任务并发执行了。

“突破单节点限制”这个标题,直指的就是这个痛点。它描述的是一种将 RouterSploit 从一个独立的命令行工具,升级为一个可以协同工作的分布式系统的实战方法。这不仅仅是简单地多开几个终端窗口,而是涉及到任务调度、结果聚合、状态同步和资源管理等一系列工程化问题。想象一下,你可以将扫描任务分发给部署在不同网络位置的多个“探针”(Agent),由一台中心服务器(Server)统一指挥和收集数据,从而实现横向的大规模资产探测与漏洞验证。这种模式特别适合红队演练、大型企业内网安全评估以及安全服务商对客户复杂网络环境的审计。

本指南旨在为你拆解构建这样一个分布式系统的核心思路、技术选型考量以及具体的实现步骤。无论你是想提升个人渗透测试的效率,还是为团队构建一个内部的自动化评估平台,这里分享的经验和踩过的坑,或许能帮你少走不少弯路。我们将从为什么需要分布式架构讲起,一步步深入到系统设计、通信协议、安全加固和实战排错。

2. 系统架构设计与核心组件选型

构建一个分布式系统,首要任务是确定清晰的架构。我们不能简单地把 RouterSploit 复制多份然后手动运行,那和单节点没什么本质区别。我们需要的是一个中心控制、多个节点执行、结果统一回传的模型。

2.1 主流架构模式对比

在安全领域,常见的分布式架构主要有两种:中心化任务队列模式和去中心化P2P模式。

中心化任务队列模式:这是最直观、也最易于管理和实现的架构。它包含一个中心服务器(Server)和多个客户端(Agent)。Server 负责任务的创建、分发、调度和结果收集;Agent 负责接收任务、执行并返回结果。所有 Agent 都只与 Server 通信,彼此之间不直接交互。这种模式的优点是逻辑清晰,控制力强,便于状态监控和任务重试。缺点是 Server 可能成为单点故障和性能瓶颈。

去中心化P2P模式:所有节点地位平等,通过某种共识机制或消息广播来协同工作。任务可以被任何节点生成并传播,执行结果也在节点间同步。这种模式容错性高,没有单点故障。但实现复杂度陡增,需要处理数据一致性、任务去重、节点发现等棘手问题,对于我们的渗透测试场景来说,可能有些“杀鸡用牛刀”。

基于 RouterSploit 任务执行的特点——任务明确、结果独立、需要集中分析——中心化任务队列模式无疑是更合适的选择。它的简单可靠 outweighs 其对中心服务器的依赖,而且我们可以通过将 Server 部署在稳定环境、做好备份来规避单点风险。

2.2 核心组件技术选型

确定了架构,接下来就要为每个组件选择合适的技术栈。

1. 通信协议与序列化Agent 和 Server 需要频繁交换数据(任务、结果、心跳)。HTTP/HTTPS 是一个稳妥的选择,因为它穿透性好(通常只开放80/443端口),库支持成熟,且易于调试。虽然实时性不如 WebSocket,但对于渗透测试这种“任务-执行-返回”的模型来说完全够用。序列化格式推荐 JSON,可读性强,各种语言解析方便,非常适合传输结构化的任务和结果数据。如果考虑二进制数据(如 exploit 后的 shell 交互),可以将其 Base64 编码后放入 JSON 字段。

2. 任务队列与状态管理Server 需要管理待分发、执行中、已完成的任务状态。这里不需要引入复杂的消息队列中间件(如 RabbitMQ, Kafka),对于中小规模场景,一个关系型数据库(如 SQLite 或 PostgreSQL)或一个内存数据库(如 Redis)就足够了。我们可以设计几张简单的表:

  • tasks: 存储任务ID、任务类型(如扫描、利用)、目标、参数、状态(pending, running, completed, failed)、创建时间、分配给哪个 Agent 等。
  • agents: 存储在线 Agent 的ID、IP、最后心跳时间、状态(idle, busy)。
  • results: 存储任务执行的结果详情。

使用数据库可以方便地进行任务查询、统计和失败重试。如果追求极简,也可以用内存字典加文件持久化的方式,但可靠性和查询能力会差很多。

3. Agent 端执行引擎这是核心,即如何驱动 RouterSploit 执行。RouterSploit 本身是基于 Python 的,有良好的模块化接口。我们不应该通过子进程调用rsf.py命令行,而是应该直接导入其核心模块(如rsf库),在 Python 代码中调用相应的扫描器或利用模块。这样做的好处是:

  • 性能更好,避免反复启动解释器的开销。
  • 可以更精细地捕获执行状态和输出。
  • 便于进行错误处理和资源清理。

因此,Agent 最好也用 Python 编写,与 RouterSploit 天然兼容。你需要一个常驻进程,它定期(如每30秒)向 Server 轮询新任务,或者由 Server 在任务就绪时主动推送(通过长轮询或 Webhook 回调,后者实现稍复杂)。

4. Server 端 Web 框架Server 需要提供 RESTful API 供 Agent 调用(获取任务、提交结果、发送心跳),最好还能有一个简单的 Web 界面用于任务管理和结果查看。轻量级的 Python Web 框架如FlaskFastAPI是绝佳选择。它们开发快速,易于集成数据库和构建 API。FastAPI 凭借其自动生成 API 文档、异步支持等特性,近年来更受欢迎。对于前端界面,如果要求不高,可以使用简单的 HTML + JavaScript,或者利用 Flask 的模板渲染;如果想更美观,可以分离前后端,前端使用 Vue/React,通过 API 与后端交互。

注意:安全第一。所有 API 接口必须进行身份认证和授权。最简单的办法是为每个 Agent 分配一个唯一的 Token(UUID),Agent 在每次请求的 Header 中携带此 Token。Server 端验证 Token 的有效性以及该 Agent 是否有权限执行某类任务(例如,某些高风险的 exploit 模块只能由特定的、经过更严格审查的 Agent 执行)。此外,Server 与 Agent 之间的通信强烈建议使用 HTTPS,防止任务和结果在传输过程中被窃听或篡改。

2.3 一个可行的架构蓝图

综合以上考量,我们可以勾勒出这样一个系统蓝图:

  • 中心服务器 (Server):基于 FastAPI 开发,提供任务管理 API 和简易 Web 界面。使用 SQLite(轻量)或 PostgreSQL(更稳定)存储任务、Agent 和结果数据。内置一个调度器,负责将pending状态的任务分配给idle状态的 Agent。
  • 客户端代理 (Agent):一个用 Python 编写的常驻进程。核心是一个循环,定期向 Server 的/api/task/pull端点发起 GET 请求(携带 Token),获取任务。拿到任务后,在内存中加载对应的 RouterSploit 模块并执行,将执行状态和结果通过 POST 请求提交到/api/task/result。同时,定期向/api/agent/heartbeat发送心跳,告知 Server 自己还活着。
  • 通信:全部基于 HTTPS + JSON。Server 需要配置有效的 SSL 证书(可以使用 Let‘s Encrypt 获取免费证书,或在内网使用自签名证书并在 Agent 端信任)。

这个架构清晰、技术栈成熟,足以支撑起一个实用的分布式渗透测试系统。

3. 核心模块实现与关键代码解析

理论说再多,不如一行代码。接下来,我们深入到几个核心模块的实现细节。我会以 Python 为例,展示关键部分的代码逻辑和设计思路。

3.1 Server 端:任务管理与 API 设计

首先,我们搭建 Server 的核心。假设我们使用 FastAPI 和 SQLAlchemy(ORM)连接 SQLite 数据库。

数据库模型定义

# models.py from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.sql import func import uuid Base = declarative_base() def generate_uuid(): return str(uuid.uuid4()) class Task(Base): __tablename__ = 'tasks' id = Column(String(36), primary_key=True, default=generate_uuid) task_type = Column(String(50), nullable=False) # 如:scanner, exploit module_name = Column(String(100), nullable=False) # RouterSploit 模块名,如 'scanners/cameras/axis' target = Column(String(255), nullable=False) # 目标IP或主机名 port = Column(Integer, default=None) # 可选端口 extra_args = Column(Text, default='{}') # 额外参数,JSON字符串 status = Column(String(20), default='pending') # pending, running, completed, failed assigned_to = Column(String(36), ForeignKey('agents.id'), nullable=True) # 分配给哪个Agent result_id = Column(String(36), ForeignKey('results.id'), nullable=True) # 关联的结果 created_at = Column(DateTime(timezone=True), server_default=func.now()) updated_at = Column(DateTime(timezone=True), onupdate=func.now()) class Agent(Base): __tablename__ = 'agents' id = Column(String(36), primary_key=True, default=generate_uuid) name = Column(String(100), unique=True) # Agent自定义名称 token = Column(String(64), unique=True, nullable=False) # 认证Token last_seen = Column(DateTime(timezone=True)) # 最后心跳时间 status = Column(String(20), default='idle') # idle, busy, offline ip_address = Column(String(45)) # 最后一次上报的IP created_at = Column(DateTime(timezone=True), server_default=func.now()) class Result(Base): __tablename__ = 'results' id = Column(String(36), primary_key=True, default=generate_uuid) task_id = Column(String(36), ForeignKey('tasks.id'), unique=True) output = Column(Text) # 执行输出的文本 success = Column(Boolean) # 任务是否成功 details = Column(Text, default='{}') # 结构化详情,JSON字符串 created_at = Column(DateTime(timezone=True), server_default=func.now())

核心 API 端点

# main.py (FastAPI 应用主文件) from fastapi import FastAPI, Depends, HTTPException, Header from sqlalchemy.orm import Session from . import models, schemas, crud from .database import SessionLocal, engine from .auth import verify_token models.Base.metadata.create_all(bind=engine) app = FastAPI(title="RouterSploit Distributed Server") # 依赖项:获取数据库会话 def get_db(): db = SessionLocal() try: yield db finally: db.close() # 依赖项:验证Agent Token def get_agent_from_token(db: Session = Depends(get_db), x_agent_token: str = Header(...)): agent = crud.get_agent_by_token(db, token=x_agent_token) if agent is None: raise HTTPException(status_code=403, detail="Invalid or missing agent token") # 更新最后在线时间 agent.last_seen = func.now() db.commit() return agent # Agent 拉取任务 @app.get("/api/task/pull", response_model=schemas.Task) def pull_task(agent: models.Agent = Depends(get_agent_from_token), db: Session = Depends(get_db)): # 1. 查找一个 pending 状态的任务 # 2. 将其状态改为 running,并 assigned_to 设为当前 agent.id # 3. 将 agent.status 改为 busy # 4. 返回任务详情 task = crud.assign_pending_task_to_agent(db, agent_id=agent.id) if not task: # 没有任务,可以返回空或特定状态码 raise HTTPException(status_code=404, detail="No pending task available") return task # Agent 提交结果 @app.post("/api/task/result") def submit_result(result_data: schemas.ResultCreate, agent: models.Agent = Depends(get_agent_from_token), db: Session = Depends(get_db)): # 1. 验证该任务是否确实分配给了此Agent且状态为running task = crud.get_task(db, task_id=result_data.task_id) if not task or task.assigned_to != agent.id or task.status != 'running': raise HTTPException(status_code=400, detail="Task not found or not assigned to this agent") # 2. 创建结果记录,关联到任务 result = crud.create_result(db, result_data, task_id=task.id) # 3. 更新任务状态为 completed 或 failed task.status = 'completed' if result_data.success else 'failed' task.result_id = result.id # 4. 将Agent状态改回idle agent.status = 'idle' db.commit() return {"message": "Result submitted successfully"} # Agent 心跳 @app.post("/api/agent/heartbeat") def heartbeat(agent: models.Agent = Depends(get_agent_from_token), db: Session = Depends(get_db)): # get_agent_from_token 依赖已经更新了 last_seen # 这里可以额外做一些健康状态检查或资源汇报 return {"status": "alive", "agent_id": agent.id}

实操心得:任务分配策略。上面的assign_pending_task_to_agent函数是核心调度器。一个简单的策略是“先入先出”(FIFO),但我们可以做得更智能。例如,可以根据任务类型(扫描、利用)和 Agent 的“能力标签”(比如某个 Agent 部署在特定网络区域,只适合处理该区域的目标)进行匹配。这需要在 Agent 注册或心跳时上报其能力标签(如capabilities: ["scanner", "exploit_http"]),并在 Task 表中增加对应的需求字段。初期可以简化,但预留扩展接口很重要。

3.2 Agent 端:任务执行与 RouterSploit 集成

Agent 端是真正“干活”的地方。它的核心循环可以这样设计:

# agent_main.py import requests import time import json import sys import logging from pathlib import Path # 假设我们将RouterSploit作为子模块或安装在本地的包 sys.path.insert(0, str(Path(__file__).parent.parent / 'routersploit')) # 注意:这里需要你根据RouterSploit的实际结构来导入 # 例如,对于扫描器:from rsf.scanners.cameras.axis import Exploit logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RouterSploitAgent: def __init__(self, server_url, agent_token, agent_name): self.server_url = server_url.rstrip('/') self.headers = {'X-Agent-Token': agent_token} self.agent_name = agent_name self.session = requests.Session() # 可以配置请求重试、超时等 def send_heartbeat(self): try: resp = self.session.post(f"{self.server_url}/api/agent/heartbeat", headers=self.headers, timeout=10) resp.raise_for_status() logger.debug("Heartbeat sent") except requests.exceptions.RequestException as e: logger.error(f"Heartbeat failed: {e}") def pull_task(self): try: resp = self.session.get(f"{self.server_url}/api/task/pull", headers=self.headers, timeout=30) if resp.status_code == 404: logger.info("No pending task.") return None resp.raise_for_status() task = resp.json() logger.info(f"Pulled task: {task['id']} - {task['module_name']} on {task['target']}") return task except requests.exceptions.RequestException as e: logger.error(f"Failed to pull task: {e}") return None def execute_task(self, task): task_id = task['id'] module_name = task['module_name'] # 例如 'scanners/cameras/axis' target = task['target'] extra_args = json.loads(task.get('extra_args', '{}')) result_output = "" success = False details = {} try: # 动态导入RouterSploit模块 - 这是关键且需要小心处理的部分 # RouterSploit 3.x 的模块结构通常是:rsf.modules.exploits.routers.dlink.dir_300_320_600_615.auth_bypass # 我们需要将传入的 module_name 转换成正确的导入路径 # 这里是一个简化的示例,实际需要更复杂的映射和错误处理 module_path = f"rsf.modules.{module_name.replace('/', '.')}" module = __import__(module_path, fromlist=['Exploit']) exploit_class = module.Exploit exploit_instance = exploit_class() # 设置目标 exploit_instance.target = target if 'port' in task and task['port']: exploit_instance.port = task['port'] # 设置额外参数 for arg, value in extra_args.items(): if hasattr(exploit_instance, arg): setattr(exploit_instance, arg, value) else: logger.warning(f"Ignoring unknown argument: {arg}") # 执行检查或攻击 # 注意:RouterSploit 模块通常有 check() 和 run() 方法 logger.info(f"Running check for {module_name}...") check_result = exploit_instance.check() details['check'] = check_result if check_result: logger.info(f"Target is vulnerable. Attempting exploit...") # 注意:run() 方法可能没有返回值,或者返回成功/失败 # 我们需要捕获其输出,可能需要重定向sys.stdout import io, contextlib f = io.StringIO() with contextlib.redirect_stdout(f): exploit_instance.run() result_output = f.getvalue() # 简单判断:如果输出中包含特定成功关键词,则认为成功 # 这非常粗糙,强烈建议根据具体模块调整 success = "success" in result_output.lower() or "vulnerable" in result_output.lower() or "exploited" in result_output.lower() details['exploit_output'] = result_output else: result_output = "Target is not vulnerable according to check." success = False except ImportError as e: result_output = f"Failed to import module {module_name}: {e}" logger.error(result_output) details['error'] = str(e) except Exception as e: result_output = f"Unexpected error during execution: {e}" logger.exception("Task execution failed") details['error'] = str(e) return { "task_id": task_id, "output": result_output, "success": success, "details": details } def submit_result(self, result_data): try: resp = self.session.post(f"{self.server_url}/api/task/result", json=result_data, headers=self.headers, timeout=30) resp.raise_for_status() logger.info(f"Result for task {result_data['task_id']} submitted.") except requests.exceptions.RequestException as e: logger.error(f"Failed to submit result: {e}") def run(self): logger.info(f"Agent '{self.agent_name}' starting...") heartbeat_interval = 60 # 秒 pull_interval = 10 # 秒 last_heartbeat = 0 while True: current_time = time.time() # 发送心跳 if current_time - last_heartbeat > heartbeat_interval: self.send_heartbeat() last_heartbeat = current_time # 拉取并执行任务 task = self.pull_task() if task: logger.info(f"Executing task {task['id']}") result = self.execute_task(task) self.submit_result(result) # 任务执行后立即尝试拉取下一个,无需等待间隔 continue else: # 没有任务,休眠一段时间再试 time.sleep(pull_interval) if __name__ == "__main__": # 配置应从配置文件或环境变量读取 SERVER_URL = "https://your-server-ip:8443" AGENT_TOKEN = "your-unique-agent-token-here" AGENT_NAME = "agent-01" agent = RouterSploitAgent(SERVER_URL, AGENT_TOKEN, AGENT_NAME) agent.run()

关键细节与避坑指南

  1. 动态导入模块:这是 Agent 最复杂也最容易出错的部分。RouterSploit 的模块路径结构可能随版本变化。上述代码中的module_path构建逻辑是假设性的。你需要根据你使用的 RouterSploit 版本的实际源码结构来调整。一个更稳健的方法是维护一个“模块名到类”的映射字典,或者使用 Python 的pkgutil进行遍历发现。
  2. 输出捕获:RouterSploit 的模块通常直接打印信息到标准输出。为了获取这些输出,我们使用了contextlib.redirect_stdout将其重定向到一个字符串缓冲区。这能捕获大部分输出,但有些模块可能直接写文件或通过网络发送数据,这就需要更精细的拦截。
  3. 成功判定success字段的判定逻辑极其重要,但又很难通用。check()方法通常返回布尔值,相对可靠。但run()方法是否成功,往往需要解析其输出文本或检查其产生的副作用(如是否建立了会话)。最准确的方式是为每个重要的 exploit 模块编写单独的结果解析器。初期可以放宽标准,先收集原始输出,后期再人工或通过规则引擎分析。
  4. 错误处理与超时:一定要为每个网络请求和模块执行设置合理的超时。一个卡住的exploit.run()可能会让整个 Agent 线程挂起。考虑使用signal模块或multiprocessing为任务执行设置超时限制,超时后强制终止子进程。
  5. 资源隔离:考虑将每个任务的执行放在独立的子进程甚至 Docker 容器中。这可以防止一个任务的崩溃(或恶意负载)影响 Agent 主进程,也便于资源清理。虽然增加了复杂度,但对于生产环境是值得的。

3.3 安全加固与通信加密

分布式系统意味着攻击面扩大。我们必须加固系统。

1. 双向 TLS 认证(mTLS)除了 HTTPS,我们可以启用双向 TLS。Server 和每个 Agent 都持有自己的证书和私钥,并在连接时相互验证。这提供了比 Token 更强大的身份认证,能有效防止 Token 泄露导致的未授权访问。使用requests库的客户端和 FastAPI 的服务端配置 mTLS 需要一些额外的 SSL 上下文设置,但一旦配置好,安全性大幅提升。

2. 任务签名与防篡改Server 下发的任务可以附带一个数字签名(使用 Server 的私钥)。Agent 收到任务后,用预置的 Server 公钥验证签名,确保任务指令确实来自可信的 Server,且在传输过程中未被篡改。同样,Agent 返回的结果也可以签名。

3. 最小权限原则在 Server 上,严格区分 API 权限。例如,查询结果的 API 可能不需要 Agent Token,而拉取任务和提交结果的 API 必须验证。在数据库层面,使用不同的数据库用户,Agent 相关的 API 只能操作tasksresults表,而不能操作agents表或其他系统表。

4. Agent 自我保护Agent 进程应以非特权用户身份运行。可以配置系统级防火墙,只允许 Agent 与指定的 Server IP 和端口通信。定期更新 Agent 代码和依赖库。

4. 部署、运维与实战排错指南

系统搭建好了,如何让它稳定可靠地跑起来?这才是真正的挑战。

4.1 部署方案

Server 部署

  • 环境:选择一台有公网 IP 或在内网可达的稳定服务器(Linux)。
  • 依赖:安装 Python 3.8+、PostgreSQL/SQLite、Nginx(反向代理)。
  • 步骤
    1. 克隆你的 Server 代码库。
    2. 创建虚拟环境并安装依赖:pip install fastapi uvicorn sqlalchemy psycopg2-binary
    3. 初始化数据库(如果使用 SQLAlchemy Alembic 进行迁移管理更好)。
    4. 配置 Nginx 反向代理到 Uvicorn 运行的 FastAPI 应用(例如运行在 127.0.0.1:8000),并在 Nginx 中配置 SSL 证书。
    5. 使用 systemd 或 Supervisor 将 Uvicorn 进程托管为系统服务,实现开机自启和崩溃重启。
  • 配置管理:将数据库连接字符串、密钥、Token 等敏感信息存储在环境变量或配置文件中,切勿硬编码在代码里。

Agent 部署

  • 环境:可以在多种环境中部署——云服务器、虚拟机、甚至树莓派。关键是要能访问到目标网络。
  • 步骤
    1. 安装 Python 和 RouterSploit 依赖。建议将 RouterSploit 作为子模块(git submodule)包含在你的 Agent 项目中,以便版本控制。
    2. 为每个 Agent 生成唯一的 Token(在 Server 的agents表中预先插入记录)。
    3. 编写配置文件(如config.iniconfig.yaml),包含 Server URL、Agent Token、Agent Name 等。
    4. 同样使用 systemd 或 Supervisor 将agent_main.py托管为服务。
  • 批量部署:如果 Agent 数量多,可以考虑使用 Ansible、SaltStack 等自动化运维工具进行批量安装和配置。

4.2 监控与日志

没有监控的系统就像在黑暗中飞行。

  • Server 监控:监控 Server 的 CPU、内存、磁盘和网络流量。监控数据库连接数。FastAPI 自带/docs/redoc端点,可以查看 API 状态。
  • Agent 监控:Agent 应定期向 Server 发送心跳和状态信息(如负载、内存使用情况)。Server 的 Web 界面应能展示所有 Agent 的在线状态、最后活跃时间、当前任务。
  • 集中式日志:将 Server 和所有 Agent 的日志集中收集到一处,例如使用 ELK Stack(Elasticsearch, Logstash, Kibana)或 Grafana Loki。这对于排查分布式问题至关重要。在代码中关键位置(如任务开始/结束、错误发生)打上结构化的日志。

4.3 常见问题与排查实录

以下是我在搭建和运行类似系统时遇到的一些典型问题及解决方法:

问题1:Agent 拉取不到任务,但 Web 界面显示有 pending 任务。

  • 排查
    1. 检查 Agent 日志,看/api/task/pull接口返回什么。如果是 403,说明 Token 认证失败。
    2. 如果是 404,说明调度逻辑可能有问题。登录 Server 数据库,直接查询tasks表,确认确实有status='pending'assigned_to IS NULL的记录。
    3. 检查调度器代码。是否在分配任务时,没有原子性地更新任务状态(pending->running)?在高并发下,可能出现两个 Agent 同时读到同一个 pending 任务。解决方法是在数据库查询时使用SELECT ... FOR UPDATE(行锁)或类似的乐观锁机制。
  • 解决:在crud.assign_pending_task_to_agent函数中,使用数据库事务和行锁确保一个任务只被分配一次。

问题2:任务执行时间过长,导致 Agent 卡死,后续任务无法执行。

  • 排查
    1. 查看该任务对应的 RouterSploit 模块是什么。有些 exploit 模块在特定条件下(如网络超时、目标无响应)可能会挂起。
    2. 检查 Agent 日志,看是否在某个任务后就没有心跳或新的任务拉取了。
  • 解决
    • 设置超时:在execute_task函数中,使用multiprocessingthreading模块,将任务执行放在一个单独的进程/线程中,主进程/线程等待其完成,如果超时则强制终止。
      import multiprocessing def _run_module(args): # 实际执行任务的函数 ... def execute_task_with_timeout(task, timeout=300): pool = multiprocessing.Pool(processes=1) try: result = pool.apply_async(_run_module, (task,)) return result.get(timeout=timeout) # 等待最多timeout秒 except multiprocessing.TimeoutError: pool.terminate() pool.join() return {"output": "Task execution timeout", "success": False, "details": {}} finally: pool.close()
    • 任务细分:将耗时长的扫描任务拆分成更小的子任务(如按IP段拆分),分发给多个 Agent 并行执行。

问题3:Server 返回数据库连接池耗尽错误。

  • 排查:在高并发下,如果每个请求都创建新的数据库连接而不及时关闭,会导致连接池耗尽。
  • 解决:确保使用 FastAPI 的依赖注入Depends(get_db),它会在请求结束时自动关闭会话。检查是否有地方手动创建了 Session 而没有关闭。考虑使用数据库连接池工具,并适当调整池大小。

问题4:RouterSploit 模块导入失败,报错ModuleNotFoundError

  • 排查:这是路径问题。Agent 所在环境的 Python 路径可能没有包含 RouterSploit 的源码目录。
  • 解决
    • 确保 RouterSploit 被正确安装(pip install -e .)或将其路径添加到sys.path
    • 使用绝对路径添加:sys.path.insert(0, '/absolute/path/to/routersploit')
    • 在动态导入前,打印sys.pathmodule_path进行调试。

问题5:Web 界面显示任务成功,但结果输出为空或异常。

  • 排查
    1. 检查 Agent 提交的结果数据格式是否正确,是否包含了output字段。
    2. 检查execute_task函数中的输出捕获逻辑。某些模块的输出可能写到了stderr而非stdout,需要同时重定向。
    3. 有些模块的成功与否不能仅靠输出文本判断。例如,一个反弹 shell 的 exploit,成功标志是建立了一个网络连接。这时需要在details字段中记录更结构化的信息,如打开的端口、会话ID等。
  • 解决:完善结果解析逻辑。对于重要的模块,编写定制化的结果处理器,而不是依赖通用的文本匹配。

分布式系统的调试比单体应用复杂,关键在于日志要详尽、状态要可查。给每个任务和 Agent 都赋予唯一的 ID,并在所有相关的日志行中打印这些 ID,这样在排查问题时就能轻松地串联起整个执行链条。

最后,再分享一个部署上的小技巧:在 Server 的 Web 界面里,增加一个“一键测试”功能,可以手动触发一个针对127.0.0.1或某个已知测试目标的简单任务(如一个端口扫描),并指定某个 Agent 执行。这能快速验证从任务下发到执行再到结果回传的整个链路是否通畅,在系统初始化或出问题时非常有用。