直播源批量检测工具设计
工具将包含以下功能:
· 批量读取直播源文件
· 多线程并发检测直播源可用性
· 支持多种流媒体协议(m3u8、rtmp、http-flv等)
· 检测响应时间和可用状态
· 生成检测报告
· 批量读取直播源文件
· 多线程并发检测直播源可用性
· 支持多种流媒体协议(m3u8、rtmp、http-flv等)
· 检测响应时间和可用状态
· 生成检测报告
import requests
import concurrent.futures
import time
import argparse
from urllib.parse import urlparse
import json
class LiveStreamChecker:
def __init__(self, timeout=5, max_workers=10):
self.timeout = timeout
self.max_workers = max_workers
self.results = []
def check_stream(self, url):
"""检测单个直播源"""
result = {
'url': url,
'status': 'unknown',
'response_time': 0,
'error': None
}
start_time = time.time()
try:
# 根据不同协议采用不同检测方式
parsed_url = urlparse(url)
if url.endswith('.m3u8'):
# 检测m3u8流
response = requests.get(url, timeout=self.timeout, stream=True)
if response.status_code == 200:
result['status'] = 'active'
else:
result['status'] = 'inactive'
result['error'] = f'HTTP {response.status_code}'
elif url.startswith('rtmp://'):
# RTMP流检测较为复杂,这里简化处理
result['status'] = 'unknown'
result['error'] = 'RTMP protocol requires special handling'
else:
# 其他HTTP流
response = requests.get(url, timeout=self.timeout, stream=True)
if response.status_code == 200:
result['status'] = 'active'
else:
result['status'] = 'inactive'
result['error'] = f'HTTP {response.status_code}'
except requests.exceptions.RequestException as e:
result['status'] = 'error'
result['error'] = str(e)
result['response_time'] = round((time.time() - start_time) * 1000, 2)
return result
def check_batch(self, urls):
"""批量检测直播源"""
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_url = {executor.submit(self.check_stream, url): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
self.results.append(result)
print(f"Checked: {url} - {result['status']} ({result['response_time']}ms)")
except Exception as e:
print(f"{url} generated an exception: {str(e)}")
def generate_report(self, output_file=None):
"""生成检测报告"""
active_streams = [r for r in self.results if r['status'] == 'active']
inactive_streams = [r for r in self.results if r['status'] == 'inactive']
error_streams = [r for r in self.results if r['status'] == 'error']
report = {
'total': len(self.results),
'active': len(active_streams),
'inactive': len(inactive_streams),
'errors': len(error_streams),
'active_streams': active_streams,
'inactive_streams': inactive_streams,
'error_streams': error_streams
}
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
# 同时生成一个只有有效源的M3U文件
with open(output_file.replace('.json', '_active.m3u'), 'w', encoding='utf-8') as f:
f.write("#EXTM3U\n")
for stream in active_streams:
f.write(f"#EXTINF:-1,{stream['url']}\n")
f.write(f"{stream['url']}\n")
return report
def main():
parser = argparse.ArgumentParser(description='直播源批量检测工具')
parser.add_argument('input_file', help='输入文件路径,包含直播源URL列表')
parser.add_argument('-o', '--output', help='输出报告文件路径', default='stream_report.json')
parser.add_argument('-t', '--timeout', help='超时时间(秒)', type=int, default=5)
parser.add_argument('-w', '--workers', help='并发工作线程数', type=int, default=10)
args = parser.parse_args()
# 读取直播源文件
try:
with open(args.input_file, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip() and not line.startswith('#')]
except FileNotFoundError:
print(f"错误: 文件 {args.input_file} 不存在")
return
print(f"开始检测 {len(urls)} 个直播源...")
# 创建检测器并执行检测
checker = LiveStreamChecker(timeout=args.timeout, max_workers=args.workers)
checker.check_batch(urls)
# 生成报告
report = checker.generate_report(args.output)
print(f"\n检测完成!")
print(f"总计: {report['total']} 个源")
print(f"有效: {report['active']} 个")
print(f"无效: {report['inactive']} 个")
print(f"错误: {report['errors']} 个")
print(f"详细报告已保存至: {args.output}")
if __name__ == "__main__":
main()
使用方法1. 创建一个文本文件,每行一个直播源URL
2. 运行检测工具
3.查看生成的报告文件(report.json)和有效直播源文件(report_active.m3u)
TAG: m3u8