当前位置: 首页 > news >正文

从脚本到自动化:用Python和Shell封装YARN应用管理,实现一键终止与巡检

从脚本到自动化:用Python和Shell封装YARN应用管理,实现一键终止与巡检

在大规模数据处理场景中,YARN集群往往同时运行着数百个应用实例。当某个ETL任务因代码缺陷陷入死循环,或是某个临时查询占用了生产环境关键资源时,传统的手动操作方式显得力不从心。本文将通过实战案例,展示如何构建一个智能化的YARN应用管控系统,实现三个核心能力:实时状态巡检精准条件筛选批量操作执行

1. 构建YARN应用管理的基础工具链

1.1 环境准备与API接入

YARN ResourceManager的REST API是我们所有自动化操作的基础入口点。首先需要确保操作主机能够访问ResourceManager服务,并安装必要的命令行工具:

# 验证网络连通性 ping resourcemanager.example.com telnet resourcemanager.example.com 8088 # 安装基础工具包 sudo yum install -y curl jq python3-pip # CentOS/RHEL sudo apt-get install -y curl jq python3-pip # Ubuntu/Debian pip install requests pandas

提示:生产环境建议配置Kerberos认证,可通过kinit命令预先获取票据

1.2 应用状态查询的原型实现

通过组合curljq工具,可以快速构建应用列表查询功能:

#!/bin/bash RM_HOST="resourcemanager.example.com" API_URL="http://$RM_HOST:8088/ws/v1/cluster/apps" # 获取所有RUNNING状态的应用 curl -s "$API_URL" | jq '.apps.app[] | select(.state == "RUNNING")'

对应的Python实现版本更具扩展性:

import requests def fetch_yarn_apps(state='RUNNING'): api_url = "http://resourcemanager.example.com:8088/ws/v1/cluster/apps" params = {'states': state} if state else None response = requests.get(api_url, params=params) response.raise_for_status() return response.json().get('apps', {}).get('app', []) if __name__ == '__main__': running_apps = fetch_yarn_apps() print(f"Found {len(running_apps)} running applications")

2. 高级筛选策略的实现

2.1 多维度过滤条件设计

在实际运维中,我们需要根据多种条件组合筛选目标应用。下表列出了常见的过滤维度及其实现方式:

筛选维度数据类型示例值实现方法
用户字符串etl_user.user == "etl_user"
队列字符串production.queue == "production"
运行时长整数3600 (秒).elapsedTime > 3600000
资源占用浮点数50.0 (vcore占比).allocatedVCores > 50
应用类型枚举SPARK.applicationType == "SPARK"

2.2 动态过滤器的Python实现

通过封装过滤逻辑,可以构建灵活的筛选系统:

from datetime import datetime, timedelta class AppFilter: @staticmethod def by_user(apps, username): return [app for app in apps if app['user'] == username] @staticmethod def by_runtime(apps, hours): threshold = hours * 3600 * 1000 # 转为毫秒 return [app for app in apps if app['elapsedTime'] > threshold] @staticmethod def by_resource(apps, min_vcores, min_memory): return [ app for app in apps if app['allocatedVCores'] >= min_vcores and app['allocatedMB'] >= min_memory * 1024 ] # 使用示例 apps = fetch_yarn_apps() long_running = AppFilter.by_runtime(apps, hours=2) overloaded = AppFilter.by_resource(apps, min_vcores=8, min_memory=32)

3. 安全终止机制的实现

3.1 批量终止的防护措施

直接执行批量终止操作存在风险,需要建立防护机制:

  1. 二次确认机制:显示将被终止的应用详情,要求人工确认
  2. 模拟执行模式:仅输出将要执行的操作而不实际调用API
  3. 操作白名单:限制可操作的应用类型和用户范围
  4. 操作日志记录:详细记录每次终止操作的元数据

3.2 带防护的终止脚本实现

import logging from typing import List, Dict logging.basicConfig( filename='yarn_operations.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) class YarnTerminator: def __init__(self, dry_run=False): self.dry_run = dry_run def safe_kill(self, app_ids: List[str], reason: str = ''): results = [] for app_id in app_ids: if self.dry_run: logging.info(f"DRY RUN: Would kill {app_id}") results.append({'id': app_id, 'status': 'simulated'}) continue try: url = f"http://resourcemanager.example.com:8088/ws/v1/cluster/apps/{app_id}/state" payload = {'state': 'KILLED'} headers = {'Content-Type': 'application/json'} response = requests.put(url, json=payload, headers=headers) if response.status_code == 200: logging.info(f"Killed {app_id} - Reason: {reason}") results.append({'id': app_id, 'status': 'success'}) else: logging.error(f"Failed to kill {app_id}: {response.text}") results.append({'id': app_id, 'status': 'failed'}) except Exception as e: logging.exception(f"Error killing {app_id}") results.append({'id': app_id, 'status': 'error'}) return results # 使用示例 terminator = YarnTerminator(dry_run=True) apps_to_kill = [app['id'] for app in overloaded] terminator.safe_kill(apps_to_kill, reason='Resource overcommit')

4. 系统集成与自动化调度

4.1 与监控系统对接

将YARN管理脚本集成到Prometheus监控系统中,可以通过Textfile Collector暴露指标:

#!/bin/bash OUTPUT_FILE="/var/lib/node_exporter/textfile_collector/yarn_metrics.prom" # 获取运行超时的应用数量 TIMEOUT_APPS=$(python3 yarn_manager.py --filter runtime=2h --count-only) cat <<EOF > "$OUTPUT_FILE" # HELP yarn_timeout_apps Number of applications running over 2 hours # TYPE yarn_timeout_apps gauge yarn_timeout_apps $TIMEOUT_APPS EOF

4.2 定时巡检的Crontab配置

设置定期执行的巡检任务:

# 每30分钟检查一次长时间运行的应用 */30 * * * * /usr/local/bin/yarn_manager.py --filter runtime=4h --notify # 每天凌晨清理测试环境残留应用 0 1 * * * /usr/local/bin/yarn_manager.py --queue test --older-than 24h --kill

4.3 完整的CLI工具封装

通过argparse模块创建功能完善的命令行工具:

import argparse def main(): parser = argparse.ArgumentParser(description='YARN Application Manager') parser.add_argument('--user', help='Filter by user') parser.add_argument('--queue', help='Filter by queue') parser.add_argument('--runtime', help='Filter by runtime (e.g. 2h, 30m)') parser.add_argument('--kill', action='store_true', help='Terminate matched apps') parser.add_argument('--dry-run', action='store_true', help='Simulate operations') parser.add_argument('--notify', action='store_true', help='Send alert notifications') args = parser.parse_args() # 应用过滤逻辑 filters = {} if args.user: filters['user'] = args.user if args.queue: filters['queue'] = args.queue if args.runtime: filters['runtime'] = parse_runtime(args.runtime) apps = fetch_yarn_apps() matched = apply_filters(apps, **filters) if args.kill: terminator = YarnTerminator(dry_run=args.dry_run) results = terminator.safe_kill([app['id'] for app in matched]) if args.notify and matched: send_notification(matched) def parse_runtime(time_str): # 实现时间字符串解析逻辑 pass

在实际项目中,这套系统将运维人员从重复的手动操作中解放出来。通过组合不同的过滤条件,可以精确控制操作范围,比如仅终止测试环境中运行超过8小时的Spark作业,或是清理特定用户提交的异常任务。

http://www.zskr.cn/news/1429221.html

相关文章:

  • 基于Arduino的防酒驾系统:从传感器到物联网的嵌入式实战
  • 2026成都花园户型装修设计榜单|一楼庭院+顶楼露台花园专属装企推荐,避坑首选 - 资讯纵览
  • 2026年新闻稿发布平台TOP10权威测评报告 - 资讯纵览
  • 2026年企业如何鉴别一家靠谱的AI搜索GEO服务商 - 品牌报告
  • DS4Windows终极指南:让PS4/PS5手柄在Windows电脑上完美运行
  • 5.30 天津黄金回收,今日大盘价无套路 - 资讯纵览
  • 步进梁加热炉炉温综合优化控制策略【附仿真】
  • 终极指南:如何快速解包Godot游戏资源文件
  • HotSpot VM源码剖析2026版开源!
  • 【信息融合】自适应集成粒子滤波算法的磁图与惯性导航融合算法【含Matlab源码 15579期】
  • 5步将键盘鼠标变专业游戏手柄:vJoy虚拟手柄完整使用指南
  • 不同国家发稿合规要求不同,平台能帮处理吗?媒介易一站式合规发稿能力解析 - 一搜百应
  • 2026年徐州企业AI获客效率提升3倍,怎么做到的?
  • Linux 基金会征集 DNS - AID 项目贡献,让 AI 代理借 DNS 通信无需新基建
  • 抖音评论区图标
  • 2026石家庄品牌首饰回收哪里快 ?添价收秒到账资质全 - 薛定谔的梨花猫
  • 2026郴州黄金奢侈品回收避坑攻略!Top5精选 郴奢汇万宝店领衔 - 小仙贝贝
  • BilibiliDown视频下载器终极使用指南:轻松保存B站高清视频的完整秘籍
  • Wireshark 深度技术解析:从原理到实战的完整指南
  • 实时BPM分析器完整指南:5分钟学会音频节拍检测技术
  • 基于PIC18F2550与DS3231的高精度实时时钟设计与实现
  • mini-cc 的 MCP 协议:给 AI 装个 USB-C 接口
  • 深入探索MuPDF mutool:PDF处理的命令行高效解决方案
  • HarmonyOS文件基础服务(Core File Kit)实战演练04-文件监听与流式读写
  • SLAM 算法横向对比与选型指南
  • Revelation光影包:终极Minecraft写实渲染技术完全指南
  • 国产开源软件盘点:替代商业软件的 10 个优秀方案与落地边界
  • VCS仿真不出波形?从Makefile到TB代码,手把手教你生成和打开FSDB文件
  • 2026年SEO现状:精分时代的AI博弈
  • 单Agent搞不定长链路?OpenClaw动态编排架构,让多智能体协作不再“各说各话”