前言
在文章《webmagic核心设计和运行机制分析》中已经提到WebMagic内部是通过生产者/消费者模式来实现的,本篇我们就分析一下WebMagic的源代码,先从爬虫入口类main方法开始。
爬虫入口类main方法
public static void main(String[] args) {
Spider.create(new GithubRepoPageProcessor())
//从https://github.com/code4craft开始抓
.addUrl("https://github.com/code4craft")
//设置Scheduler,使用Redis来管理URL队列
.setScheduler(new RedisScheduler("localhost"))
//设置Pipeline,将结果以json方式保存到文件
.addPipeline(new JsonFilePipeline("D:\\data\\webmagic"))
//开启5个线程同时执行
.thread(5)
//启动爬虫
.run();
}
通过官方给出的创建爬虫入口类的样例代码可以看到,启动爬虫是调用Spider.run()
方法。
Spider类源码分析
1. Spider.run()
方法
checkRunningStat()
检查运行状态,不是很重要,跳过。
@Override
public void run() {
// 检查运行状态
checkRunningStat();
// 初始化组件
initComponent();
logger.info("Spider {} started!",getUUID());
// 死循环从Scheduler中拉取Request(Request中封装了url)
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
final Request request = scheduler.poll(this);
if (request == null) {
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
break;
}
// wait until new url added
// 当Scheduler中不存在Request时,线程等待
waitNewUrl();
} else {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
// 处理Request,核心方法
processRequest(request);
// 调用监听器onSuccess()方法
onSuccess(request);
} catch (Exception e) {
// 调用监听器onError()方法
onError(request);
logger.error("process request " + request + " error", e);
} finally {
pageCount.incrementAndGet();
// 唤醒线程
signalNewUrl();
}
}
});
}
}
stat.set(STAT_STOPPED);
// release some resources
if (destroyWhenExit) {
close();
}
logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}
2. initComponent()
初始化组件:设置默认Downloader实现,初始化线程池
//初始化组件
protected void initComponent() {
if (downloader == null) {
// 默认使用HttpClientDownloader
this.downloader = new HttpClientDownloader();
}
if (pipelines.isEmpty()) {
pipelines.add(new ConsolePipeline());
}
downloader.setThread(threadNum);
// 初始化线程池
if (threadPool == null || threadPool.isShutdown()) {
if (executorService != null && !executorService.isShutdown()) {
threadPool = new CountableThreadPool(threadNum, executorService);
} else {
threadPool = new CountableThreadPool(threadNum);
}
}
if (startRequests != null) {
for (Request request : startRequests) {
addRequest(request);
}
startRequests.clear();
}
startTime = new Date();
}
3. waitNewUrl()
/ signalNewUrl()
:配合Scheduler对象实现生产者/消费者模式
// 线程等待
private void waitNewUrl() {
newUrlLock.lock();
try {
//double check
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
return;
}
newUrlCondition.await(emptySleepTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("waitNewUrl - interrupted, error {}", e);
} finally {
newUrlLock.unlock();
}
}
// 线程唤醒
private void signalNewUrl() {
try {
newUrlLock.lock();
newUrlCondition.signalAll();
} finally {
newUrlLock.unlock();
}
}
在Spider类属性中包含了默认Scheduler实现类QueueScheduler的对象scheduler,而在QueueScheduler类中默认使用内存阻塞队列来存储Request。
// Spider类属性
protected Scheduler scheduler = new QueueScheduler();
public class QueueScheduler extends DuplicateRemovedScheduler implements MonitorableScheduler {
// 默认使用内存阻塞队列来存储Request对象
private BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();
@Override
public void pushWhenNoDuplicate(Request request, Task task) {
queue.add(request);
}
@Override
public Request poll(Task task) {
return queue.poll();
}
@Override
public int getLeftRequestsCount(Task task) {
return queue.size();
}
@Override
public int getTotalRequestsCount(Task task) {
return getDuplicateRemover().getTotalRequestsCount(task);
}
}
看到这里,有人可能会疑惑当阻塞队列中没有Request对象时,线程会卡死在Spider.run()
方法中waitNewUrl()
上。
其实只要我们看Spider.addUrl()
和Spider.addRequest()
两个方法的源码,就会发现其中调用了线程唤醒方法Spider.signalNewUrl()
,而在爬虫入口类中必然会调addUrl()
或addRequest()
其中一个来设置起始url,所以不存在线程运行卡死的情况。
public Spider addUrl(String... urls) {
for (String url : urls) {
addRequest(new Request(url));
}
// 调用线程唤醒方法
signalNewUrl();
return this;
}
public Spider addRequest(Request... requests) {
for (Request request : requests) {
addRequest(request);
}
// 调用线程唤醒方法
signalNewUrl();
return this;
}
private void addRequest(Request request) {
if (site.getDomain() == null && request != null && request.getUrl() != null) {
site.setDomain(UrlUtils.getDomain(request.getUrl()));
}
// 将request推送到scheduler内存阻塞队列中去
scheduler.push(request, this);
}
这样就构成了一个典型的生产者/消费者模式代码实现。
4. processRequest()
:爬虫业务逻辑的核心方法
首先调用Downloader下载网页,再调用自定义的PageProcessor解析网页文本并从中提取出目标数据,最后调用自定义的Pipeline持久化目标数据。
private void processRequest(Request request) {
// 调用Downloader对象下载网页
Page page = downloader.download(request, this);
if (page.isDownloadSuccess()){
// 下载成功
onDownloadSuccess(request, page);
} else {
onDownloaderFail(request);
}
}
private void onDownloadSuccess(Request request, Page page) {
if (site.getAcceptStatCode().contains(page.getStatusCode())){
// 调用自定义的PageProcessor对象(通过入口类设置)解析封装网页的Page对象
pageProcessor.process(page);
// 添加后续需要爬取的url字符串,推送Request到Scheduler中去
extractAndAddRequests(page, spawnUrl);
if (!page.getResultItems().isSkip()) {
for (Pipeline pipeline : pipelines) {
// 调用自定义的Pipeline对象(通过入口类设置)持久化目标数据,通过ResultItems对象封装传递
pipeline.process(page.getResultItems(), this);
}
}
} else {
logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
}
sleep(site.getSleepTime());
return;
}
private void onDownloaderFail(Request request) {
if (site.getCycleRetryTimes() == 0) {
sleep(site.getSleepTime());
} else {
// for cycle retry
// 下载失败后重试
doCycleRetry(request);
}
}
protected void extractAndAddRequests(Page page, boolean spawnUrl) {
// 添加target Request
if (spawnUrl && CollectionUtils.isNotEmpty(page.getTargetRequests())) {
for (Request request : page.getTargetRequests()) {
// 推送Request到Scheduler内存阻塞队列中去
addRequest(request);
}
}
}
5. onSuccess()
/ onError()
:监听Request处理成功或失败的情况
只有需要自定义SpiderListener
监听器时才会使用到,这里不是重点,不再赘述,具体使用方法可参考源码 https://github.com/xiawq87/sp… 中的实现。
小结
通过上述源码分析的过程,让我们可以大体了解WebMagic内部生产者/消费者模式的实现方式。