用户注册



邮箱:

密码:

用户登录


邮箱:

密码:
记住登录一个月忘记密码?

发表随想


还能输入:200字
云代码 - python代码库

视频爬取,下载

2022-08-28 作者: Best乄威尔举报

[python]代码库

import re,requests,openpyxl,time,bs4,threading
from lxml import etree

header={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.70'}

i,n=1,0
Name=[]
web_url={}
addr={}

lock=threading.RLock()
def getUrl():
    url = 'https://www.c345y.com/vmovie/sm/'
    global i,web_url
    lock.acquire()
    if i==1:
        url=url
    else:
        url=url+'index_'+str(i)+'.html'
    print('----------------' + str(i) + '----------')
    i+=1
    lock.release()
    try:
        res=requests.get(url=url,headers=header,timeout=30)
        res.close()
        time.sleep(0.5)
        res.encoding='utf8'
        html=etree.HTML(res.text)
        itms=html.xpath('/html/body/div[6]/div/div[2]//dl')[0]
        url = itms.xpath('//dl/dt/a/@href')
        name = itms.xpath('//dl/dt/a/@title')
        lock.acquire()
        for j in range(0, len(name)):
            web_url[name[j]] = 'https://www.c234y.com' + url[j + 4]
        lock.release()
    except:
        print('访问'+url+'失败!')

def gerAddr():
    global addr,n,mz,dz
    lock.acquire()
    print('----------------' + str(n+1) + '----------')
    try:
        print(mz[n], dz[n])
        mz1=mz[n]
        dz1=dz[n]
        n += 1
        lock.release()
        res2 = requests.get(url=dz1,headers=header,timeout=30)
        res2.close()
        html2 = etree.HTML(res2.text)
        addr[mz1] = html2.xpath('//*[@id="url"]/@value')[0]
        time.sleep(0.5)
    except:
        print('访问失败')
        n += 1
        #if lock.
        lock.release()

print(time.asctime())
for k in range(0,16):
    t1=threading.Thread(target=getUrl)
    t1.start()
    t2=threading.Thread(target=getUrl)
    t2.start()
    t3=threading.Thread(target=getUrl)
    t3.start()
    t4=threading.Thread(target=getUrl)
    t4.start()
    t5=threading.Thread(target=getUrl)
    t5.start()
    t1.join()
    t2.join()
    t3.join()
    t4.join()
    t5.join()

print(time.asctime())
mz=[]
dz=[]
file='影片列表.xlsx'
print(web_url.items())
for a,b in web_url.items():
    mz.append(a)
    dz.append(b)
print(mz,dz)

wb=openpyxl.load_workbook(file)
sheet=wb['Sheet1']
sheet=wb.active
sheet['A1']='序号'
sheet['B1']='影片名'
sheet['C1']='播放网址'
sheet['D1']='下载地址'
for nu in range(0,len(web_url)):
    print(mz[nu - 2], dz[nu - 2])
    sheet['A'+str(nu+2)]= str(nu+1)
    sheet['B' + str(nu+2)] = mz[nu]
    sheet['C' + str(nu+2)] = dz[nu]
wb.save(file)
print(len(mz))
flag=len(mz)
print(time.asctime())
while flag>0:
    t1=threading.Thread(target=gerAddr)
    t1.start()
    t2=threading.Thread(target=gerAddr)
    t2.start()
    t3=threading.Thread(target=gerAddr)
    t3.start()
    t4=threading.Thread(target=gerAddr)
    t4.start()
    t5=threading.Thread(target=gerAddr)
    t5.start()
    t1.join()
    t2.join()
    t3.join()
    t4.join()
    t5.join()
    flag-=5
print(addr.items())
print('----------------')
#sheet=openpyxl.load_workbook('test.xlsx')
sheet=wb.active
for nu2 in range(0,len(addr)):
    sheet['D' + str(nu2+2)] = addr.get(mz[nu2])
time.sleep(2)
wb.save(file)
print(time.asctime())

# --下载部分---
import requests,openpyxl,threading,time,re

header={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36 Edg/104.0.1293.70'}
lock=threading.RLock()

##失败重试代码##
# file = '影片列表.xlsx'
# path = 'D:\AV\\'
# n = 0
# name=[]
# url=[]
# wb = openpyxl.load_workbook(file)
# sheet = wb.active
# sheet = wb['Sheet']
# num=[]
# with open('失败统计.txt','r',encoding='utf-8') as f:
#     lines=f.readlines()
# for line in lines:
#     nu = re.findall(r'------ (.*?)----',line)[0]
#     num.append(nu)
#     name.append(sheet['B' + str(int(nu) + 1)].value)
#     url.append(sheet['D' + str(int(nu) + 1)].value)

file='影片列表.xlsx'
path='D:\AV\\'
n=1
wb = openpyxl.load_workbook(file)
sheet = wb['Sheet']
sheet = wb.active
name=sheet['B']
url=sheet['D']
def download():
    global n
    lock.acquire()
    n1=n
    n+=1
    lock.release()
    ##失败重试代码##
    # if n1>=len(num):
    #     print('----已完成----')
    #     return None
    # else:
    name1=name[n1]
    url1=url[n1]
    time1=time.time()

    print('---', str(n1),'---', name1,'----开始下载', time.asctime())
    try:
        res=requests.get(url=url1,headers=header,timeout=30)
        with open(path+str(n1)+'、 '+name1+'.mp4','wb') as f:
            f.write(res.content)
            print('------',str(n1)+'----' + name1 + '下载----成功-----用时',time.time()-time1)
    except:
        print('------',str(n1)+'----'+name1+'下载----失败-----用时',time.time()-time1)
    ##失败重试代码##
    #----------------------------------------------------------------------------#
    # print('---', num[n1], '---', name1, '----开始下载', time.asctime())
    # try:
    #     res = requests.get(url=url1, headers=header, timeout=300)
    #     with open(path +num[n1] + '、 ' + name1 + '.mp4', 'wb') as f:
    #         f.write(res.content)
    #         print('------', num[n1] + '----' + name1 + '下载----成功-----用时', time.time() - time1)
    # except:
    #     print('------', num[n1] + '----' + name1 + '下载----失败-----用时', time.time() - time1)

for k in range(0,50):
    t1=threading.Thread(target=download)
    t1.start()
    time.sleep(0.2)
    t2=threading.Thread(target=download)
    t2.start()
    time.sleep(0.2)
    t3=threading.Thread(target=download)
    t3.start()
    time.sleep(0.2)
    t4=threading.Thread(target=download)
    t4.start()
    time.sleep(0.2)
    t5=threading.Thread(target=download)
    t5.start()

    t1.join()
    t2.join()
    t3.join()
    t4.join()
    t5.join()



网友评论    (发表评论)


发表评论:

评论须知:

  • 1、评论每次加2分,每天上限为30;
  • 2、请文明用语,共同创建干净的技术交流环境;
  • 3、若被发现提交非法信息,评论将会被删除,并且给予扣分处理,严重者给予封号处理;
  • 4、请勿发布广告信息或其他无关评论,否则将会删除评论并扣分,严重者给予封号处理。


扫码下载

加载中,请稍后...

输入口令后可复制整站源码

加载中,请稍后...