用户注册



邮箱:

密码:

用户登录


邮箱:

密码:
记住登录一个月忘记密码?

发表随想


还能输入:200字
云代码 - python代码库

协程初步完善版本

2019-08-02 作者:云代码会员举报

[python]代码库

import time
import random
import asyncio
import aiohttp
from aiohttp_requests import requests
from queue import Queue
import random
import json
# import pymssql
from concurrent import futures
import time
import asyncio
from queue import Queue
from threading import Thread
class myasync():
    def __init__(self,pool):
        self.pool=pool
        self.new_loop = asyncio.new_event_loop()
        self.thread = Thread(target=self.start_loop, args=(self.new_loop,))
        self.thread.setDaemon(True)
        self.thread.start()
        self.tasks=[]
    def start_loop(self,loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    def add(self,coroutine):
        while True:
            if self.pan():
                task=asyncio.run_coroutine_threadsafe(coroutine, self.new_loop)
                self.tasks.append(task)
                return True
            else:
                time.sleep(1)
    def pan(self,):
        if len(self.tasks)<self.pool:
            return True
        n=0
        for task in self.tasks:
            if "state=finished" in str(task):
                del self.tasks[n]
                return True
            n+=1
        else:
            return False
    def join(self,):
        while True:
            for task in self.tasks:
                if "state=finished" not in str(task):
                    break
            else:
                return True
            time.sleep(1)
    def __del__(self,):
        self.new_loop.close()
  
 
 
class proxies():
    def __init__(self,miao):
        self.miao=miao
        self.tm=time.time()
        self.ip=self.get_ip()
    def getip(self,):
        ntm=time.time()
        shi_cha=ntm-self.tm
        if shi_cha>self.miao:
            self.tm=time.time()
            self.ip=self.get_ip()
            return self.ip
        else:
            return self.ip
    def get_ip(self,):
        import requests
        url = 'http://http.tiqu.qingjuhe.cn/getip?num=1&type=1&pack=17534&port=1&lb=1&pb=4&regions='
        response = requests.get(url)
        ip = response.text.strip()
        httpip = 'http://' + ip
        print(httpip)
        print('=' * 50 + 'IP' + '=' * 50)
        with open('IP.txt', 'ab+') as f:
            f.write(httpip.encode('utf-8'))
            f.write('\n'.encode('utf-8'))
            f.close()
        return httpip
 
myip=proxies(5)#控制获取ip的实际请求数量,5秒之内返回同一ip


# ip=myip.getip()
ip=None
 
num = 0
stopset = set()
okset = set()
# 数据库
# conn = pymssql.connect("127.0.0.1", "sa", "zxc1230.", "GJC_Key")
# cursor = conn.cursor()
async  def to_url(url):
    global num,ip
    num += 1
    # print(num)
    trynum=0
    while True:
        trynum+=1
        if trynum>5:
            return None
        try:
            Headers = {'Referer': 'https://www.aliexpress.com/'}
            reponse = await requests.get(url=url, headers=Headers,proxy=ip) #, proxy=httpip
        except Exception as EX:
            print(repr(EX))
            print('-' * 50 + 'RequestError' + '-' * 50)
            ip=myip.getip()
        else:
            status = reponse.status
            text = await reponse.text()
            if 'window.null' in text and status == 200:
                xx = json.loads(text.replace('window.null=', ''))
                # 获取关键词
                try:
                    keystrlist = ["('" + key.replace("'", "''") + "')" for key in
                                  set([keydict['keywords'] for keydict in xx['keyWordDTOs']])]
                except KeyError as KE:
                    print(repr(KE))
                else:
                    keystr = ','.join(keystrlist)
                    return keystr
            else:
                # 返回的不是源码错误
                print(text)
                print('-'*50+'Error'+'-'*50)
# 生产新的url 和 写入数据
async def get_data(url):
    global URLlist,okset
    keystr = await to_url(url)
    if keystr:
        # print(keystr)
        # 写入数据库
        sql = 'insert into OneKey(key_key) values%s' % str(keystr)
        # N_Y = await write_data(sql)
        N_Y=True
        if N_Y:
            okset.add(url)
            print('ok___' + str(len(okset)))
        else:
            print(keystr)
            print('-' * 50 + 'SqlError' + '-' * 50)
        urlset = await nurl(url)
        for uu in urlset:
            # print("增加")
            URLlist.append(uu)
# 写入数据库
async def write_data(sql):
    try:
        cursor.execute(sql)
        conn.commit()
    except Exception as EE:
        print(repr(EE))
        print('-' * 50 + 'SqlError' + '-' * 50)
        return False
    else:
        return True
async def nurl(URL):
    # sleeptime = random.randrange(20, 120)
    # await asyncio.sleep(sleeptime)
    # 形成新的链接
    urlsplit = URL.split('&')
    return set([urlsplit[0] + chr(num) + '&' + str(int(time.time() * 1000)) for num in range(97,123)] + [urlsplit[0] + str(num) + '&' + str(int(time.time() * 1000)) for num in range(0, 10)])
if __name__ == '__main__':
    mnc=myasync(500)#开启两个进程池
    URLlist = ['https://connectkeyword.aliexpress.com/lenoIframeJson.htm?keyword=%s&_=%d' % (
                chr(key), int(time.time() * 1000)) for key in range(97, 123)] + [
              'https://connectkeyword.aliexpress.com/lenoIframeJson.htm?keyword=%s&_=%d' % (
              str(key), int(time.time() * 1000)) for key in range(0, 10)]
    while True:
        print(len(URLlist))
        if not URLlist:
            break
        URLlist2=URLlist
        URLlist=[]
        for url in URLlist2:
            mnc.add(get_data(url))
        mnc.join()#等待协程运行完毕











分享到:
更多

网友评论    (发表评论)


发表评论:

评论须知:

  • 1、评论每次加2分,每天上限为30;
  • 2、请文明用语,共同创建干净的技术交流环境;
  • 3、若被发现提交非法信息,评论将会被删除,并且给予扣分处理,严重者给予封号处理;
  • 4、请勿发布广告信息或其他无关评论,否则将会删除评论并扣分,严重者给予封号处理。