之前有写过用单线程建立代理ip池,但是大家很快就会发现,用单线程来一个个测试代理ip实在是太慢了,跑一次要很久才能结束,完全无法忍受。所以这篇文章就是换用多线程来建立ip池,会比用单线程快很多。之所以用多线程而不是多进程,是因为测试时间主要是花费在等待网络传递数据上,处理本地计算的时间很短,用多线程能更好地发挥单核性能,而且多线程开销比多进程开销小得多。当然,单核性能会有极限,如果想再提高性能就需要多进程和多线程混用了。当然这里说的是用CPython作为解释器时候的情况,因为绝大多数人用的都是CPython,所以以下说的都是这种情况。
受限于个人学识,对多进程和多线程的理解也不是很深刻,如果以后有机会会写写关于并发编程的文章。CPython因为GIL锁的原因,多线程无法发挥多核性能,但是可以用多进程来发挥多核性能。注意GIL锁不是python语言特性,只是CPython解释器的原因。任何python线程在执行前,都必须获得GIL锁,然后每执行100条字节码,解释器就自动释放GIL锁,让别的线程执行。所以python线程只能交替执行,即使有多个线程跑在多核CPU上,也只能利用一个核。
其实程序主体在之前的文章已经写好了,我们需要的只是稍微做点改进,以适合多线程编程。我的思路是,设置一个线程专门用来爬取待测试ip,其他线程获取待测试ip进行测试。这也是分布式编程的思想。
我们首先设置一个队列,用来储存待测试ip。
thread_lock = threading.Lock() test_ip_list = Queue()
然后对之前的函数进行一些修改。
def download_page(url, timeout=10): headers=hidden_reptile.random_header() data = requests.get(url, headers=headers, timeout=timeout) return data def test_ip(test_url): while True: if test_ip_list.empty(): return ip = test_ip_list.get() proxies = { 'http': ip[0]+':'+ip[1], 'https': ip[0] + ':' + ip[1] } try_ip = ip[0] try: r=requests.get(test_url,headers=hidden_reptile.random_header(),proxies=proxies,timeout=10) if r.status_code == 200: r.encoding = 'gbk' result=re.search('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}',r.text) result=result.group() if result[:9]==try_ip[:9]: print('%s:%s 测试通过' % (ip[0],ip[1])) thread_lock.acquire() with open('proxy_ip.txt', 'a') as f: f.write(ip[0] + ':' + ip[1] + '\n') thread_lock.release() else: print('%s:%s 携带代理失败,使用了本地IP' %(ip[0],ip[1])) else: print('%s:%s 请求码不是200' %(ip[0],ip[1])) except Exception as e: print(e) print('%s:%s 错误' %(ip[0],ip[1])) def get_proxies(page_num, ip_url_list): for ip_url in ip_url_list: for page in range(1, page_num+1): print("抓取第%d页代理IP" %page) url= ip_url.format(page) r=download_page(url) r.encoding='utf-8' pattern = re.compile('<td class="country">.*?alt="Cn" />.*?</td>.*?<td>(.*?)</td>.*?<td>(.*?)</td>', re.S) ip_list= re.findall(pattern, r.text) for ip in ip_list: test_ip_list.put(ip) time.sleep(10) print('{}抓取结束'.format(ip_url))
注意写入文件的时候需要加进程锁,因为写入的是同一个文件,不加线程锁的话可能一个线程写入到一半,就被其他线程抢了,然后写入其他东西。所有的待测试ip都来自python队列test_ip_list,对其进行操作的时候不用添加线程锁,因为它自带了线程锁。
最后,写运行部分。
if __name__ == '__main__': number_of_threads = 8 total_pages = 20 threads = [] url = ["http://www.xicidaili.com/nt/{}"] test_url = 'http://ip.tool.chinaz.com/' t = threading.Thread(target=get_proxies, args=(total_pages, url)) t.setDaemon(True) t.start() threads.append(t) time.sleep(1) for i in range(1, number_of_threads): t = threading.Thread(target=test_ip, args=(test_url,)) t.setDaemon(True) threads.append(t) t.start() for thread in threads: thread.join()
如果有其他可以爬取ip的网址可以加到url列表中,total_page是总共爬取的页数。开了第一个线程之后暂停1s,是在等待它添加待测试ip进入队列中。