爬取视频-星辰影院视频案例

爬取视频基础

爬取流程

  1. 爬取m3u8
  2. 通过m3u8下载视频
  3. 合并视频

m3u8

样例

1
2
3
4
5
6
7
8
9
10
11
12
13
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-KEY:METHOD=AES-128,URI="key.key"
#EXTINF:2.000000,
https://v11.ltdmq.com/sdv11/ts/1.ts
#EXT-X-DISCONTINUITY
#EXTINF:2.000000,
https://v11.ltdmq.com/sdv11/ts/2.ts
#EXTINF:2.000000,
https://v11.ltdmq.com/sdv11/ts/3.ts
  • 如果有#EXT-X-KEY,说明视频被加密,需要将下载的视频按METHOD的方法解密,密钥通过请求URI获取
  • 通过请求下载所有.ts结尾的文件(并解密),从上往下合并所有ts视频
  • #EXTINF:2.000000 表示这个视频的时长
  • 可以通过ffmpeg合并视频,也可以用windows的命令os.system("copy /b 1.ts+2.ts+3.ts test.mp4")

星辰影院视频案例

网站

流程

电视剧详情页 -> 每一集详情页 -> 第一层m3u8 -> 第二层m3u8 -> 下载视频 -> 合并视频

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import requests
from lxml import etree
import os
import re
import aiohttp
import aiofiles
import asyncio
import subprocess

# 切换工作目录到当前文件目录
os.chdir(os.path.dirname(__file__))

headers = {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"}
# 控制并发任务的数量
semaphore = asyncio.Semaphore(30)

# 获取每一集详情页url列表
def get_eps_url(url):
resp = requests.get(url=url, headers=headers)
resp.encoding = resp.apparent_encoding
tree = etree.HTML(resp.text)
eps = tree.xpath('//div[@id="tab_con_playlist_1"]/ul//a/@href')
eps = ["https://barbizon.com.cn" + ep for ep in eps]
return eps

# 获取第一层m3u8
def get_m3u8_1(url):
resp = requests.get(url=url, headers=headers)
resp.encoding = resp.apparent_encoding
# 页面源代码中"url":"https:\/\/v11.fentvoss.com\/sdv11\/202406\/06\/x6PX25F8h83\/video\/index.m3u8"
pattern = re.compile(r'vod_class.*?(https.*?\.m3u8)')
m3u8 = pattern.findall(resp.text)[0]
m3u8 = "".join(m3u8.split("\\"))
return m3u8

# 获取第二层m3u8并返回所有ts文件的url列表
def get_videos(url):
# 第二层m3u8
resp = requests.get(url=url, headers=headers)
t = [s.strip() for s in resp.text.split("\n") if s]
m3u8 = "https://v11.fentvoss.com/sdv11/202406/06/x6PX25F8h83/video/" + t[-1]
# 从m3u8中解析url
resp = requests.get(url=m3u8, headers=headers)
video_urls = [s.strip() for s in resp.text.split("\n") if s and not s.startswith("#")]
return video_urls[:50] #测试,只下载前五十个视频

# 异步下载视频,保存为name
async def download_a_video(url, name):
async with semaphore:
async with aiofiles.open(name, mode="wb") as f:
async with aiohttp.ClientSession() as session:
async with await session.get(url=url, headers=headers) as resp:
await f.write(await resp.content.read())

# 通过视频url列表下载视频
async def download_videos(url):
tasks = []
#下载视频
for i in range(len(url)):
video_url = url[i]
file_name = f"test/{i}.ts"

task = asyncio.create_task(download_a_video(video_url, file_name))
tasks.append(task)
await asyncio.wait(tasks)

#使用ffmpeg合并视频
def merge_ts():
ts_files = [f"test/{i}.ts" for i in range(len(os.listdir("test")))]
with open("filelist.txt", "w") as f:
for ts_file in ts_files:
f.write(f"file '{ts_file}'\n")
subprocess.run(f"ffmpeg -f concat -safe 0 -i filelist.txt -c copy test.mp4")


def main():
# 主站 https://barbizon.com.cn/
# 电视剧详情页url
url = "https://barbizon.com.cn/voddetail/175787.html"
# 详情页url列表,某一集详情页url:https://barbizon.com.cn/play/175787-1-1.html
eps = get_eps_url(url)
# 设置事件循环策略,不然报错RuntimeError: Event loop is closed
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

# 测试只爬取1集
for i in range(1):
epurl = eps[i]
# 解析详情页,拿第一层m3u8
m3u8 = get_m3u8_1(epurl)
# 拿视频url列表
video_urls = get_videos(m3u8)
# 下载视频
asyncio.run(download_videos(video_urls))
# 合并视频
merge_ts()


if __name__ == "__main__":
main()