现在位置:首页 » 娱乐 » 正文内容
这是一个一个基于Python的M3U8广告内容提取与植入程序的实现方案。这个程序可以解析M3U8文件,提取广告时段信息,并允许在指定位置植入新的广告内容。

功能概述

· 解析M3U8文件结构
· 识别和提取现有的广告标记
· 在指定位置插入新的广告内容
· 生成修改后的M3U8文件

完整代码实现

import re
import os
import argparse
from urllib.parse import urlparse, urljoin
from typing import List, Dict, Tuple

class M3U8Processor:
    def __init__(self, m3u8_content: str, base_url: str = None):
        self.m3u8_content = m3u8_content
        self.base_url = base_url
        self.segments = []
        self.tags = []
        self.ad_breaks = []  # 存储广告时段信息 (start_index, end_index)
        self._parse_content()
    
    def _parse_content(self):
        """解析M3U8内容"""
        lines = self.m3u8_content.splitlines()
        current_ad_break = None
        
        for i, line in enumerate(lines):
            line = line.strip()
            if not line:
                continue
                
            # 检测广告标记
            if line.startswith('#EXT-X-CUE-OUT') or line.startswith('#EXT-X-AD'):
                # 开始广告时段
                if current_ad_break is None:
                    current_ad_break = i
                self.tags.append(('AD_START', line, i))
            elif line.startswith('#EXT-X-CUE-IN'):
                # 结束广告时段
                if current_ad_break is not None:
                    self.ad_breaks.append((current_ad_break, i))
                    self.tags.append(('AD_END', line, i))
                    current_ad_break = None
            elif line.startswith('#EXT'):
                # 其他标签
                self.tags.append(('TAG', line, i))
            elif not line.startswith('#'):
                # 媒体段
                segment_url = line
                if self.base_url and not urlparse(line).netloc:
                    segment_url = urljoin(self.base_url, line)
                self.segments.append((segment_url, i))
    
    def extract_ads(self) -> List[Tuple[int, int, List[str]]]:
        """提取所有广告时段的内容"""
        ads = []
        for start, end in self.ad_breaks:
            ad_content = []
            # 获取广告时段的所有行
            lines = self.m3u8_content.splitlines()
            for i in range(start, end + 1):
                ad_content.append(lines[i])
            ads.append((start, end, ad_content))
        return ads
    
    def insert_ad(self, position: int, ad_content: List[str]):
        """在指定位置插入广告内容"""
        # 确定插入点
        lines = self.m3u8_content.splitlines()
        
        # 在指定位置插入广告内容
        new_lines = lines[:position] + ad_content + lines[position:]
        
        # 更新内容
        self.m3u8_content = '\n'.join(new_lines)
        
        # 重新解析内容
        self._parse_content()
    
    def replace_ad(self, ad_index: int, new_ad_content: List[str]):
        """替换指定索引的广告内容"""
        if ad_index < 0 or ad_index >= len(self.ad_breaks):
            raise ValueError("Invalid ad index")
        
        start, end = self.ad_breaks[ad_index]
        lines = self.m3u8_content.splitlines()
        
        # 替换广告内容
        new_lines = lines[:start] + new_ad_content + lines[end+1:]
        
        # 更新内容
        self.m3u8_content = '\n'.join(new_lines)
        
        # 重新解析内容
        self._parse_content()
    
    def save_to_file(self, file_path: str):
        """保存修改后的M3U8到文件"""
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(self.m3u8_content)
    
    def get_content(self) -> str:
        """获取当前M3U8内容"""
        return self.m3u8_content

def read_m3u8_file(file_path: str) -> str:
    """读取M3U8文件内容"""
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read()

def create_ad_content(ad_segments: List[str], duration: float = 30.0) -> List[str]:
    """创建广告内容"""
    ad_content = []
    # 添加广告开始标记
    ad_content.append(f'#EXT-X-CUE-OUT:{duration}')
    
    # 添加广告段
    for segment in ad_segments:
        ad_content.append(f'#EXTINF:6.0,')
        ad_content.append(segment)
    
    # 添加广告结束标记
    ad_content.append('#EXT-X-CUE-IN')
    
    return ad_content

def main():
    parser = argparse.ArgumentParser(description='M3U8广告内容提取与植入工具')
    parser.add_argument('input', help='输入M3U8文件路径')
    parser.add_argument('-o', '--output', help='输出M3U8文件路径')
    parser.add_argument('-e', '--extract', action='store_true', help='提取广告内容')
    parser.add_argument('-i', '--insert', help='在指定位置插入广告 (格式: position:ad_segment1,ad_segment2,...)')
    parser.add_argument('-r', '--replace', help='替换指定广告 (格式: index:ad_segment1,ad_segment2,...)')
    parser.add_argument('-l', '--list', action='store_true', help='列出所有广告时段')
    
    args = parser.parse_args()
    
    # 读取M3U8文件
    content = read_m3u8_file(args.input)
    base_url = 'http://example.com/'  # 根据实际情况设置基础URL
    
    processor = M3U8Processor(content, base_url)
    
    if args.extract:
        # 提取广告内容
        ads = processor.extract_ads()
        print(f"找到 {len(ads)} 个广告时段:")
        for i, (start, end, ad_content) in enumerate(ads):
            print(f"广告时段 {i+1} (位置 {start}-{end}):")
            for line in ad_content:
                print(f"  {line}")
            print()
    
    if args.list:
        # 列出广告时段
        ads = processor.extract_ads()
        print(f"找到 {len(ads)} 个广告时段:")
        for i, (start, end, ad_content) in enumerate(ads):
            print(f"广告时段 {i+1}: 位置 {start}-{end}, 长度 {end-start+1} 行")
    
    if args.insert:
        # 插入广告
        try:
            position_str, ad_segments_str = args.insert.split(':', 1)
            position = int(position_str)
            ad_segments = ad_segments_str.split(',')
            
            ad_content = create_ad_content(ad_segments)
            processor.insert_ad(position, ad_content)
            print(f"已在位置 {position} 插入广告")
        except ValueError as e:
            print(f"插入广告参数错误: {e}")
    
    if args.replace:
        # 替换广告
        try:
            index_str, ad_segments_str = args.replace.split(':', 1)
            index = int(index_str)
            ad_segments = ad_segments_str.split(',')
            
            ad_content = create_ad_content(ad_segments)
            processor.replace_ad(index - 1, ad_content)  # 用户输入从1开始,内部从0开始
            print(f"已替换广告时段 {index}")
        except ValueError as e:
            print(f"替换广告参数错误: {e}")
    
    # 保存输出文件
    if args.output:
        processor.save_to_file(args.output)
        print(f"已保存到 {args.output}")
    
    # 如果没有指定任何操作,显示基本信息
    if not any([args.extract, args.insert, args.replace, args.list, args.output]):
        ads = processor.extract_ads()
        print(f"M3U8文件解析完成:")
        print(f"  总段数: {len(processor.segments)}")
        print(f"  广告时段数: {len(ads)}")
        print(f"  使用 -h 参数查看帮助信息")

if __name__ == "__main__":
    main()
使用示例

1. 提取广告内容:
python m3u8_processor.py input.m3u8 -e

2. 列出所有广告时段:
python m3u8_processor.py input.m3u8 -l

3. 在指定位置插入广告:
python m3u8_processor.py input.m3u8 -i "10:ad1.ts,ad2.ts,ad3.ts" -o output.m3u8

4. 替换指定广告:
python m3u8_processor.py input.m3u8 -r "2:new_ad1.ts,new_ad2.ts" -o output.m3u8

TAG: m3u8