from Crypto.Cipher import AES |
from Crypto.Util.Padding import pad |
from concurrent.futures import ThreadPoolExecutor |
import requests |
import m3u8 |
import glob |
import os |
import time |
import logging |
|
|
logging.getLogger( "urllib3" ).setLevel(logging.WARNING) |
|
|
def AESDecrypt(cipher_text, key, iv): |
cipher_text = pad(data_to_pad = cipher_text, block_size = AES.block_size) |
aes = AES.new(key = key, mode = AES.MODE_CBC, iv = iv) |
cipher_text = aes.decrypt(cipher_text) |
# clear_text = unpad(padded_data=cipher_text, block_size=AES.block_size) |
return cipher_text |
|
|
headers = { 'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' |
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36' } |
|
|
def get_ts(url): |
try : |
response = requests.get(url, verify = False ) |
response.raise_for_status() |
response.encoding = 'utf-8' |
return response.content |
except Exception as err: |
print (err) |
return b'' |
|
|
def save_ts(url, index): |
filename = os.path.join( './video' , str (index).zfill( 5 ) + '.ts' ) |
with open (filename, 'wb' ) as f: |
ts = get_ts(url) |
f.write(ts) |
print (filename + ' is ok!' ) |
|
|
if __name__ = = '__main__' : |
playlist = m3u8.load(uri = './data/index.m3u8' ) |
|
# 线程池,引入index可以防止合成时视频发生乱序 |
with ThreadPoolExecutor(max_workers = 10 ) as pool: |
for index, seg in enumerate (playlist.segments): |
pool.submit(save_ts, seg.uri, index) |
|
key = get_ts(playlist.keys[ - 1 ].uri) |
|
files = glob.glob(os.path.join( './video' , '*.ts' )) |
for file in files: |
with open ( file , 'rb' ) as fr, open ( './video_de/baitoushan.mp4' , 'ab' ) as fw: |
content = fr.read() |
encontent = AESDecrypt(content, key = key, iv = key) |
fw.write(encontent) |
print ( file + ' is ok!' ) |
思路: |
# 第1步,加载m3u8文件,也可以直接使用m3u8文件对应的url |
playlist = m3u8.load(uri = './data/index.m3u8' ) |
|
# 第2步,提取URL |
for seg in playlist.segments: |
print (seg.uri) |
|
# 第3步,下载ts视频 |
with open ( 'xxxxx.ts' , 'wb' ) as f: |
ts = get_ts(url) |
f.write(ts) |
|
# 第4步,解密 |
cipher_text = pad(data_to_pad = cipher_text, block_size = AES.block_size) |
aes = AES.new(key = key, mode = AES.MODE_CBC, iv = iv) |
cipher_text = aes.decrypt(cipher_text) |
|
# 第5步,合成 |
files = glob.glob(os.path.join( './video' , '*.ts' )) |
for file in files: |
with open ( file , 'rb' ) as fr, open ( './video_de/baitoushan.mp4' , 'ab' ) as fw: |
content = fr.read() |
fw.write(content ) |