当前位置: 首页 > news >正文

手把手教你用Flask搭个视频中转站:爬取m3u8流,本地/Cloudflare R2双备份实战

Flask视频中转站实战:从m3u8爬取到双备份存储的完整方案

在数字内容爆炸式增长的今天,视频资源的管理和存储成为了许多独立开发者和内容创作者面临的挑战。想象一下这样的场景:你是一位知识付费内容创作者,课程视频分散在多个平台;或者你是一位自媒体运营者,需要整理来自不同渠道的素材。这些视频往往以m3u8流媒体形式存在,难以统一管理。本文将带你从零开始构建一个基于Flask的视频中转站,实现m3u8视频的自动爬取、本地存储与Cloudflare R2云存储的双备份方案。

1. 环境准备与基础架构设计

在开始编码之前,我们需要规划好整个系统的架构。一个健壮的视频中转站应该包含以下几个核心模块:

  • 视频爬取模块:负责解析m3u8文件并下载ts分片
  • 本地存储模块:将视频文件保存到服务器本地磁盘
  • 云存储模块:将视频同步上传到Cloudflare R2
  • 任务管理模块:记录爬取状态和存储位置
  • Web接口模块:提供API供外部调用

1.1 安装必要的Python包

首先创建一个新的Python虚拟环境,然后安装以下依赖:

pip install flask flask-sqlalchemy pymysql boto3 requests beautifulsoup4 python-dotenv

这些包各自的作用如下:

  • flask:我们的Web框架核心
  • flask-sqlalchemy:ORM工具,用于数据库操作
  • pymysql:MySQL数据库驱动
  • boto3:AWS SDK,用于与Cloudflare R2交互
  • requests:HTTP请求库,用于下载视频
  • beautifulsoup4:HTML解析,用于提取视频链接
  • python-dotenv:管理环境变量

1.2 数据库设计

我们使用MySQL来存储爬取任务和配置信息。创建两个主要表:

CREATE TABLE video_tasks ( id INT AUTO_INCREMENT PRIMARY KEY, source_url VARCHAR(512) NOT NULL, status ENUM('pending', 'downloading', 'completed', 'failed') DEFAULT 'pending', local_path VARCHAR(512), cloud_path VARCHAR(512), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); CREATE TABLE system_config ( id INT AUTO_INCREMENT PRIMARY KEY, config_key VARCHAR(255) NOT NULL UNIQUE, config_value TEXT, description VARCHAR(512) );

2. Flask应用核心实现

2.1 应用初始化与配置

创建一个app.py文件作为应用入口:

from flask import Flask from flask_sqlalchemy import SQLAlchemy from dotenv import load_dotenv import os load_dotenv() app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DB_URI') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # 导入模型和路由 from models import VideoTask, SystemConfig from routes import api_blueprint app.register_blueprint(api_blueprint, url_prefix='/api')

2.2 视频下载与处理核心逻辑

创建一个video_processor.py文件处理视频下载:

import os import requests import tempfile from concurrent.futures import ThreadPoolExecutor from urllib.parse import urlparse class VideoProcessor: def __init__(self, max_workers=5): self.executor = ThreadPoolExecutor(max_workers=max_workers) def download_m3u8(self, m3u8_url, output_dir, headers=None): """下载m3u8文件及其所有ts分片""" try: # 下载m3u8文件 response = requests.get(m3u8_url, headers=headers) response.raise_for_status() # 解析m3u8内容 content = response.text lines = content.split('\n') ts_urls = [line.strip() for line in lines if line.endswith('.ts')] # 创建本地目录 os.makedirs(output_dir, exist_ok=True) # 保存m3u8文件 m3u8_path = os.path.join(output_dir, 'index.m3u8') with open(m3u8_path, 'w') as f: f.write(content) # 并行下载所有ts文件 futures = [] for ts_url in ts_urls: if not ts_url.startswith('http'): base_url = m3u8_url.rsplit('/', 1)[0] ts_url = f"{base_url}/{ts_url}" future = self.executor.submit( self.download_ts, ts_url, output_dir, headers ) futures.append(future) # 等待所有下载完成 for future in futures: future.result() return True, m3u8_path except Exception as e: return False, str(e) def download_ts(self, ts_url, output_dir, headers=None): """下载单个ts文件""" try: response = requests.get(ts_url, headers=headers) response.raise_for_status() filename = os.path.basename(urlparse(ts_url).path) filepath = os.path.join(output_dir, filename) with open(filepath, 'wb') as f: f.write(response.content) return True, filepath except Exception as e: return False, str(e)

3. Cloudflare R2集成

3.1 R2配置与初始化

.env文件中添加R2配置:

R2_ENDPOINT_URL=https://your-account-id.r2.cloudflarestorage.com R2_ACCESS_KEY_ID=your-access-key R2_SECRET_ACCESS_KEY=your-secret-key R2_BUCKET_NAME=your-bucket-name

创建r2_client.py处理与Cloudflare R2的交互:

import boto3 from botocore.exceptions import ClientError import os from dotenv import load_dotenv load_dotenv() class R2Client: def __init__(self): self.client = boto3.client( 's3', endpoint_url=os.getenv('R2_ENDPOINT_URL'), aws_access_key_id=os.getenv('R2_ACCESS_KEY_ID'), aws_secret_access_key=os.getenv('R2_SECRET_ACCESS_KEY') ) self.bucket_name = os.getenv('R2_BUCKET_NAME') def upload_file(self, file_path, object_name=None): """上传文件到R2""" if object_name is None: object_name = os.path.basename(file_path) try: self.client.upload_file(file_path, self.bucket_name, object_name) return True, f"r2://{self.bucket_name}/{object_name}" except ClientError as e: return False, str(e) def upload_directory(self, local_dir, r2_prefix=''): """上传整个目录到R2""" results = [] for root, _, files in os.walk(local_dir): for file in files: local_path = os.path.join(root, file) relative_path = os.path.relpath(local_path, local_dir) r2_path = os.path.join(r2_prefix, relative_path).replace('\\', '/') success, message = self.upload_file(local_path, r2_path) results.append({ 'file': file, 'success': success, 'message': message }) return results

3.2 分片上传优化

对于大文件上传,我们可以使用分片上传提高可靠性:

def multipart_upload(self, file_path, object_name=None, part_size=8*1024*1024): """分片上传大文件到R2""" if object_name is None: object_name = os.path.basename(file_path) try: # 初始化分片上传 mpu = self.client.create_multipart_upload( Bucket=self.bucket_name, Key=object_name ) mpu_id = mpu['UploadId'] # 分片上传 parts = [] with open(file_path, 'rb') as f: i = 1 while True: data = f.read(part_size) if not data: break part = self.client.upload_part( Bucket=self.bucket_name, Key=object_name, PartNumber=i, UploadId=mpu_id, Body=data ) parts.append({ 'PartNumber': i, 'ETag': part['ETag'] }) i += 1 # 完成分片上传 result = self.client.complete_multipart_upload( Bucket=self.bucket_name, Key=object_name, UploadId=mpu_id, MultipartUpload={'Parts': parts} ) return True, result['Location'] except Exception as e: # 出错时中止上传 self.client.abort_multipart_upload( Bucket=self.bucket_name, Key=object_name, UploadId=mpu_id ) return False, str(e)

4. 系统监控与错误处理

4.1 日志记录配置

良好的日志系统对于运维至关重要。在Flask中配置日志:

import logging from logging.handlers import RotatingFileHandler def setup_logging(app): # 禁用Flask默认的日志处理器 app.logger.handlers.clear() # 设置日志格式 formatter = logging.Formatter( '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]' ) # 文件日志 - 按大小轮转 file_handler = RotatingFileHandler( 'video_transfer.log', maxBytes=1024*1024*10, # 10MB backupCount=5 ) file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) app.logger.addHandler(file_handler) # 控制台日志 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) console_handler.setLevel(logging.DEBUG) app.logger.addHandler(console_handler) app.logger.setLevel(logging.DEBUG) app.logger.info('Video Transfer logging initialized')

4.2 错误处理中间件

创建错误处理中间件统一处理异常:

from functools import wraps from flask import jsonify def handle_errors(f): @wraps(f) def wrapper(*args, **kwargs): try: return f(*args, **kwargs) except requests.exceptions.RequestException as e: app.logger.error(f"Request error: {str(e)}") return jsonify({ 'status': 'error', 'message': f'Network error: {str(e)}' }), 500 except Exception as e: app.logger.error(f"Unexpected error: {str(e)}") return jsonify({ 'status': 'error', 'message': f'Internal server error: {str(e)}' }), 500 return wrapper

4.3 任务状态监控

我们可以创建一个简单的监控端点来检查系统状态:

@app.route('/api/status') def system_status(): # 获取数据库状态 db_status = 'ok' try: db.session.execute('SELECT 1') except Exception as e: db_status = str(e) # 获取存储空间状态 local_storage = { 'total': os.statvfs('/').f_blocks * os.statvfs('/').f_frsize, 'used': (os.statvfs('/').f_blocks - os.statvfs('/').f_bfree) * os.statvfs('/').f_frsize } # 获取任务统计 stats = { 'pending': VideoTask.query.filter_by(status='pending').count(), 'downloading': VideoTask.query.filter_by(status='downloading').count(), 'completed': VideoTask.query.filter_by(status='completed').count(), 'failed': VideoTask.query.filter_by(status='failed').count() } return jsonify({ 'database': db_status, 'storage': local_storage, 'tasks': stats })

5. 部署与性能优化

5.1 生产环境部署

对于生产环境,我们推荐使用Gunicorn作为WSGI服务器,配合Nginx作为反向代理:

pip install gunicorn

创建gunicorn.conf.py配置文件:

workers = 4 worker_class = 'gevent' bind = '0.0.0.0:8000' accesslog = '-' errorlog = '-' timeout = 120 keepalive = 5

Nginx配置示例:

server { listen 80; server_name yourdomain.com; location / { proxy_pass http://127.0.0.1:8000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } location /static { alias /path/to/your/static/files; } }

5.2 性能优化技巧

  1. 连接池优化
from sqlalchemy.pool import QueuePool app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { 'poolclass': QueuePool, 'pool_size': 10, 'max_overflow': 20, 'pool_timeout': 30, 'pool_recycle': 3600 }
  1. 缓存常用配置
from flask_caching import Cache cache = Cache(config={ 'CACHE_TYPE': 'SimpleCache', 'CACHE_DEFAULT_TIMEOUT': 300 }) cache.init_app(app) @cache.memoize(timeout=60) def get_config_value(key): config = SystemConfig.query.filter_by(config_key=key).first() return config.config_value if config else None
  1. 异步任务处理

对于长时间运行的任务,可以使用Celery进行异步处理:

from celery import Celery def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery app.config.update( CELERY_BROKER_URL='redis://localhost:6379/0', CELERY_RESULT_BACKEND='redis://localhost:6379/0' ) celery = make_celery(app) @celery.task(bind=True) def process_video_task(self, task_id): task = VideoTask.query.get(task_id) if not task: return {'status': 'error', 'message': 'Task not found'} try: task.status = 'downloading' db.session.commit() processor = VideoProcessor() success, result = processor.download_m3u8( task.source_url, f"videos/{task.id}" ) if success: # 上传到R2 r2 = R2Client() upload_results = r2.upload_directory( f"videos/{task.id}", f"videos/{task.id}" ) task.status = 'completed' task.local_path = result task.cloud_path = upload_results[0]['message'] # 取第一个文件的上传路径 db.session.commit() return {'status': 'success', 'task_id': task.id} else: task.status = 'failed' db.session.commit() return {'status': 'error', 'message': result} except Exception as e: task.status = 'failed' db.session.commit() return {'status': 'error', 'message': str(e)}
http://www.zskr.cn/news/1431642.html

相关文章:

  • QMCDecode终极指南:如何快速将QQ音乐加密格式转换为通用音频文件
  • 告别手动抠图!用Labelme的AI-Polygon功能快速分割图像(Python 3.8 + Windows保姆级教程)
  • 从石英振荡到TDA7294功放:深入拆解一个400Hz中频电源的每个模块(含稳压电路设计)
  • Zotero Style:让文献管理变得直观高效的智能插件
  • IPv6与IPv4的区别:地址数量、协议特性与过渡技术
  • 告别刻盘!用Ventoy+Win10/11 VHDX打造随身系统盘(保姆级避坑指南)
  • Lindy 2025核心能力图谱发布倒计时,这5项API级能力将强制升级——开发者必须今晚完成兼容性自查
  • 别再纠结了!STM32CubeMX下软件IIC和硬件IIC读写AT24C02,我帮你实测对比(附完整代码)
  • 保姆级教程:在Proxmox VE 8上用OSX-PROXMOX脚本安装macOS Monterey(含VNC远程访问)
  • 用Python和递归算法,5分钟搞定‘聪明士兵’问题(附完整代码)
  • 告别玄学调试:用Wireshark抓包实战分析USB3.0链路训练(LTSSM)全过程
  • 别再只懂AM!一文搞懂中波广播的PDM、DAM、同步广播都是啥
  • 稀疏矩阵量子块编码:原理与电路优化实践
  • 硬件工程师必看:千兆以太网PHY芯片选型与电路设计实战(电流型 vs 电压型详解)
  • 别再傻傻分不清了!UE5里UI、HUD、UMG到底怎么用?一个实战案例讲透
  • 从石英晶体到TDA7294:拆解一个老派但经典的400Hz电源设计(含AD采集与数码管显示)
  • 5分钟搞定Milvus单机版:用Docker Compose一键拉起向量数据库(附Attu可视化)
  • 从DOSCAR到漂亮图表:用VESTA和p4vasp搞定VASP态密度与成键分析可视化
  • Keil MDK中创建支持F1快速访问的CMSIS Pack
  • 别再死记硬背payload了!用PHPStudy本地复现HUBUCTF checkin题,理解反序列化与弱比较
  • 校园网环境下,一根网线搞定树莓派SSH连接(Windows 10/11保姆级教程)
  • Win11任务栏太占地方?用StartAllBack 3.6.8把它挪到屏幕侧边,分屏效率翻倍
  • 昇腾NPU多模态大模型训练框架MindSpeed-MLLM解析
  • 别再只盯着Mesh组网了!用Easymesh R5给你的家庭Wi-Fi做个‘全身体检’与主动优化
  • FlexNet许可体系中Host ID的作用与获取方法
  • Gemini多模态调度引擎深度拆解(千亿参数级低延迟协同架构首次公开)
  • 视唱练耳乐理培训避坑排行:音乐艺考校考培训、音乐艺考校考考集训、音乐艺考零基础培训、音乐高考培训、音工方向艺考培训选择指南 - 优质品牌商家
  • FIR滤波器设计避坑指南:C语言实现中窗函数与阶数选择的那些事儿
  • StartUML画时序图避坑指南:从‘Hello World’到复杂循环逻辑的完整表达
  • 别再手动改Word链接了!用Python-docx批量处理超链接的保姆级教程(附增删改查完整代码)