一、具体步骤
1、导包
import os
import random
import csv
import time
import queue
import threading
import requests
from lxml import etree
2、定义几个全局的变量(方便下面的类使用)
headers = {
‘User-Agent’: ‘Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.162 Safari/537.36’,
}
path = os.path.join(os.path.dirname(__file__), ‘products.csv’)
3、定义一个生产者,用来抓取网页上的数据
class Procuder(threading.Thread):
“””
创建一个生产者(获取全部的url地址)
“””
def __init__(self, page_queue, data_queue, *args, **kwargs):
super(Procuder, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.data_queue = data_queue
def run(self):
while True:
if self.page_queue.empty():
break
url = self.page_queue.get()
self.parse_page(url)
def parse_page(self, url):
“””
定义一个根据url请求数据的方法
:param url:
:return:
“””
response = requests.get(url=url, headers=headers)
time.sleep(random.randrange(3))
if response.status_code == 200:
html = etree.HTML(response.text)
trs = html.xpath(‘//table[@class=”table table-hover”]/tbody/tr’)
for tr in trs:
tds = tr.xpath(‘./td’)
name = tds[0].xpath(‘./text()’)[0]
price = tds[1].xpath(‘./text()’)[0]
unit = tds[2].xpath(‘./text()’)[0]
address = tds[3].xpath(‘./text()’)[0]
date = tds[4].xpath(‘./text()’)[0]
# print(name, price, unit, address, date)
# 加入队列中
self.data_queue.put(({‘name’: name, ‘price’: price, ‘unit’: unit, ‘address’: address, ‘date’: date}))
4、定义一个消费者用来从生产者那边获取数据并下载
class Consumer(threading.Thread):
“””
定义一个消费者用来存储数据
“””
def __init__(self, page_queue, data_queue, *args, **kwargs):
super(Consumer, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.data_queue = data_queue
def run(self):
while True:
if self.page_queue.empty() and self.data_queue.empty():
break
# 从队列中获取数据
data_row = self.data_queue.get()
print(‘开始写入==>’, data_row)
with open(path, ‘a’, newline=”, encoding=’utf8′) as f:
# # 写入到本地的csv文件中
table_headers = [‘name’, ‘price’, ‘unit’, ‘address’, ‘date’]
writer = csv.DictWriter(f, fieldnames=table_headers)
# # 写入头部
# writer.writeheader()
# 一行一行写入数据
writer.writerow(data_row)
5、定义一个运行的main函数
def main():
“””
定义一个主运行方法
:return:
“””
if os.path.exists(path):
os.remove(path)
page_queue = queue.Queue(100)
data_queue = queue.Queue(1000)
# 创建爬取页面
for i in range(1, 101):
page_queue.put(“http://www.gznw.gov.cn/priceInfo/getPriceInfoByAreaId.jx?areaid=22572&page={0}”.format(str(i)))
# 创建线程
for x in range(5):
p = Procuder(page_queue, data_queue)
c = Consumer(page_queue, data_queue)
p.start()
c.start()
if __name__ == ‘__main__’:
main()
import os
import random
import csv
import time
import queue
import threading
import requests
from lxml import etree
headers = {
‘User-Agent’: ‘Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.162 Safari/537.36’,
}
path = os.path.join(os.path.dirname(__file__), ‘products.csv’)
class Procuder(threading.Thread):
“””
创建一个生产者(获取全部的url地址)
“””
def __init__(self, page_queue, data_queue, *args, **kwargs):
super(Procuder, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.data_queue = data_queue
def run(self):
while True:
if self.page_queue.empty():
break
url = self.page_queue.get()
self.parse_page(url)
def parse_page(self, url):
“””
定义一个根据url请求数据的方法
:param url:
:return:
“””
response = requests.get(url=url, headers=headers)
time.sleep(random.randrange(3))
if response.status_code == 200:
html = etree.HTML(response.text)
trs = html.xpath(‘//table[@class=”table table-hover”]/tbody/tr’)
for tr in trs:
tds = tr.xpath(‘./td’)
name = tds[0].xpath(‘./text()’)[0]
price = tds[1].xpath(‘./text()’)[0]
unit = tds[2].xpath(‘./text()’)[0]
address = tds[3].xpath(‘./text()’)[0]
date = tds[4].xpath(‘./text()’)[0]
# print(name, price, unit, address, date)
# 加入队列中
self.data_queue.put(({‘name’: name, ‘price’: price, ‘unit’: unit, ‘address’: address, ‘date’: date}))
class Consumer(threading.Thread):
“””
定义一个消费者用来存储数据
“””
def __init__(self, page_queue, data_queue, *args, **kwargs):
super(Consumer, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.data_queue = data_queue
def run(self):
while True:
if self.page_queue.empty() and self.data_queue.empty():
break
# 从队列中获取数据
data_row = self.data_queue.get()
print(‘开始写入==>’, data_row)
with open(path, ‘a’, newline=”, encoding=’utf8′) as f:
# # 写入到本地的csv文件中
table_headers = [‘name’, ‘price’, ‘unit’, ‘address’, ‘date’]
writer = csv.DictWriter(f, fieldnames=table_headers)
# # 写入头部
# writer.writeheader()
# 一行一行写入数据
writer.writerow(data_row)
def main():
“””
定义一个主运行方法
:return:
“””
if os.path.exists(path):
os.remove(path)
page_queue = queue.Queue(100)
data_queue = queue.Queue(1000)
# 创建爬取页面
for i in range(1, 101):
page_queue.put(“http://www.gznw.gov.cn/priceInfo/getPriceInfoByAreaId.jx?areaid=22572&page={0}”.format(str(i)))
# 创建线程
for x in range(5):
p = Procuder(page_queue, data_queue)
c = Consumer(page_queue, data_queue)
p.start()
c.start()
if __name__ == ‘__main__’:
main()