现在位置:首页 » 娱乐 » 正文内容
工具将包含以下功能:

· 批量读取直播源文件
· 多线程并发检测直播源可用性
· 支持多种流媒体协议(m3u8、rtmp、http-flv等)
· 检测响应时间和可用状态
· 生成检测报告
import requests
import concurrent.futures
import time
import argparse
from urllib.parse import urlparse
import json

class LiveStreamChecker:
    def __init__(self, timeout=5, max_workers=10):
        self.timeout = timeout
        self.max_workers = max_workers
        self.results = []
        
    def check_stream(self, url):
        """检测单个直播源"""
        result = {
            'url': url,
            'status': 'unknown',
            'response_time': 0,
            'error': None
        }
        
        start_time = time.time()
        
        try:
            # 根据不同协议采用不同检测方式
            parsed_url = urlparse(url)
            
            if url.endswith('.m3u8'):
                # 检测m3u8流
                response = requests.get(url, timeout=self.timeout, stream=True)
                if response.status_code == 200:
                    result['status'] = 'active'
                else:
                    result['status'] = 'inactive'
                    result['error'] = f'HTTP {response.status_code}'
                    
            elif url.startswith('rtmp://'):
                # RTMP流检测较为复杂,这里简化处理
                result['status'] = 'unknown'
                result['error'] = 'RTMP protocol requires special handling'
                
            else:
                # 其他HTTP流
                response = requests.get(url, timeout=self.timeout, stream=True)
                if response.status_code == 200:
                    result['status'] = 'active'
                else:
                    result['status'] = 'inactive'
                    result['error'] = f'HTTP {response.status_code}'
                    
        except requests.exceptions.RequestException as e:
            result['status'] = 'error'
            result['error'] = str(e)
            
        result['response_time'] = round((time.time() - start_time) * 1000, 2)
        return result
    
    def check_batch(self, urls):
        """批量检测直播源"""
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_url = {executor.submit(self.check_stream, url): url for url in urls}
            
            for future in concurrent.futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    result = future.result()
                    self.results.append(result)
                    print(f"Checked: {url} - {result['status']} ({result['response_time']}ms)")
                except Exception as e:
                    print(f"{url} generated an exception: {str(e)}")
    
    def generate_report(self, output_file=None):
        """生成检测报告"""
        active_streams = [r for r in self.results if r['status'] == 'active']
        inactive_streams = [r for r in self.results if r['status'] == 'inactive']
        error_streams = [r for r in self.results if r['status'] == 'error']
        
        report = {
            'total': len(self.results),
            'active': len(active_streams),
            'inactive': len(inactive_streams),
            'errors': len(error_streams),
            'active_streams': active_streams,
            'inactive_streams': inactive_streams,
            'error_streams': error_streams
        }
        
        if output_file:
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(report, f, ensure_ascii=False, indent=2)
                
            # 同时生成一个只有有效源的M3U文件
            with open(output_file.replace('.json', '_active.m3u'), 'w', encoding='utf-8') as f:
                f.write("#EXTM3U\n")
                for stream in active_streams:
                    f.write(f"#EXTINF:-1,{stream['url']}\n")
                    f.write(f"{stream['url']}\n")
        
        return report

def main():
    parser = argparse.ArgumentParser(description='直播源批量检测工具')
    parser.add_argument('input_file', help='输入文件路径,包含直播源URL列表')
    parser.add_argument('-o', '--output', help='输出报告文件路径', default='stream_report.json')
    parser.add_argument('-t', '--timeout', help='超时时间(秒)', type=int, default=5)
    parser.add_argument('-w', '--workers', help='并发工作线程数', type=int, default=10)
    
    args = parser.parse_args()
    
    # 读取直播源文件
    try:
        with open(args.input_file, 'r', encoding='utf-8') as f:
            urls = [line.strip() for line in f if line.strip() and not line.startswith('#')]
    except FileNotFoundError:
        print(f"错误: 文件 {args.input_file} 不存在")
        return
    
    print(f"开始检测 {len(urls)} 个直播源...")
    
    # 创建检测器并执行检测
    checker = LiveStreamChecker(timeout=args.timeout, max_workers=args.workers)
    checker.check_batch(urls)
    
    # 生成报告
    report = checker.generate_report(args.output)
    
    print(f"\n检测完成!")
    print(f"总计: {report['total']} 个源")
    print(f"有效: {report['active']} 个")
    print(f"无效: {report['inactive']} 个")
    print(f"错误: {report['errors']} 个")
    print(f"详细报告已保存至: {args.output}")

if __name__ == "__main__":
    main()
使用方法

1. 创建一个文本文件,每行一个直播源URL

2. 运行检测工具

3.查看生成的报告文件(report.json)和有效直播源文件(report_active.m3u)

TAG: m3u8