资讯专栏INFORMATION COLUMN

爬虫框架Webmagic源码分析之Spider

邹立鹏 / 2709人阅读

摘要:获取正在运行的线程数,用于状态监控。之后初始化组件主要是初始化线程池将到中,初始化开始时间等。如果线程池中运行线程数量为,并且默认,那么就停止退出,结束爬虫。

本系列文章,针对Webmagic 0.6.1版本

一个普通爬虫启动代码

public static void main(String[] args) {
    Spider.create(new GithubRepoPageProcessor())
            从https:github.com/code4craft开始抓    
            .addUrl("https://github.com/code4craft")
            //设置Scheduler,使用Redis来管理URL队列
            .setScheduler(new RedisScheduler("localhost"))
            //设置Pipeline,将结果以json方式保存到文件
            .addPipeline(new JsonFilePipeline("D:datawebmagic"))
            //开启5个线程同时执行
            .thread(5)
            //启动爬虫
            .run();
}

1、spider可配置插拔组件:

Downloader 提供自定义的Downloader,默认为HttpClientDownloader
Pipeline 提供自定义的Pipeline,可以配置多个,多个Pipeline链式处理结果。默认为ConsolePipeline
Scheduler 提供自定义的调度器,默认为QueueScheduler
PageProcessor 页面处理组件,开发者爬虫的实现
ExecutorService 可以用于提供自己实现的线程池来监控,默认为Fixed ExecutorService
SpiderListener 页面状态监听器,提供每个页面成功和错误的回调。可配置多个。

其中有:WebMagic四大组件:Pipeline,Scheduler,Downloader和PageProcesser 。这和Python中的Scrapy的理念是一致的。但是Scrapy还有一些中间件的概念,从结构图中便可以看出区别

2、状态变量:

stat 0,初始化;1,运行中;2,已停止
pageCount 已经抓取的页面数。注意:这里统计的是GET请求的页面,POST请求的页面不在统计的范围之内。具体原因见DuplicateRemovedScheduler类
startTime:开始时间,可用于计算耗时。
emptySleepTime 最大空闲等待时间,默认30s。如果抓取队列为空,且url队列为空的最大等待时长,超过该时间,就认为爬虫抓取完成,停止运行。
threadNum : 启用的线程数,默认1.
threadPool:这是Webmagic提供的CountableThreadPool实例,内部封装了ExecutorService,CountableThreadPool 提供了额外的获取线程运行数的方法,此外为防止大量urls入池等待,提供了阻塞方式管理urls入池。(后续细说)
destroyWhenExit:默认true。是否在调用stop()时立即停止所有任务并退出。
spawUrl : 默认为true,是否抓取除了入口页面starturls之外的其他页面(targetRequests).

3、需要配置的项:

Site 全局站点配置,如UA,timeout,sleep等
PageProcessor 页面处理组件,开发者爬虫的实现
Request 配置入口页面url,可以多个。
uuid ,可选,Spider的名字,用于分析和日志。

需要注意的是:每个修改配置的方法都进行了checkIfRunning检查,如果检查当前Spider正在运行,它会抛出IllegalStateException。

所有配置方法都return this,便于链式调用,类似于builder模式。

4、运行方式:

Spider实现了Runnable接口(还有一个Webmagic自己的Task接口)。
run(),跟普通的Runnable一样,阻塞式运行,会阻塞当前线程直至Spider运行结束。
runAsync(),就是new一个Thread来运行当前Spider这个Runnable,异步运行。
start(),runAsync()的别名方法,异步运行。

5、状态相关方法

stop(),结束当前爬虫的运行,内部只是简单地修改一下状态,如果设置了destroyWhenExit=true(默认就是true)那么会立即停止所有任务并清除资源,否则并不会停止正在线程池中运行的线程,也不会销毁线程池。

getThreadAlive() 获取正在运行的线程数,用于状态监控。

6、核心代码分析

public void run() {
        checkRunningStat();
        initComponent();
        logger.info("Spider " + getUUID() + " started!");
        while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
            final Request request = scheduler.poll(this);
            if (request == null) {
                if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
                    break;
                }
                // wait until new url added
                waitNewUrl();
            } else {
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            processRequest(request);
                            onSuccess(request);
                        } catch (Exception e) {
                            onError(request);
                            logger.error("process request " + request + " error", e);
                        } finally {
                            pageCount.incrementAndGet();
                            signalNewUrl();
                        }
                    }
                });
            }
        }
        stat.set(STAT_STOPPED);
        // release some resources
        if (destroyWhenExit) {
            close();
        }
    }

首先通过checkRunningStat()来检查并设置运行状态,如果已经在运行了,那么会抛出IllegalStateException。之后初始化组件(主要是初始化Downloader、线程池、将starturls push到Scheduler中,初始化开始时间等)。之后进入循环,从scheduler中poll出Request给线程池去执行。如果scheduler中没有request了:继而判断是否有线程在运行和是否设置了立即退出标志,如果设置了立即退出循环,否则调用waitNewUrl()等待有新的url被加入。
waitNewUrl()采用RetreentLock和Condition来进行超时阻塞,一旦阻塞时间超过emptySleepTime就返回。如果线程池中运行线程数量为0,并且exitWhenComplete=true(默认),那么就停止退出,结束爬虫。如果exitWhenComplete=false,那么需要开发者手动调用stop()来停止退出爬虫,并调用close()来清理资源。

通过processRequest来处理抓取url的整个流程,代码如下:

protected void processRequest(Request request) {
        Page page = downloader.download(request, this);
        if (page == null) {
            sleep(site.getSleepTime());
            onError(request);
            return;
        }
        // for cycle retry
        if (page.isNeedCycleRetry()) {
            extractAndAddRequests(page, true);
            sleep(site.getRetrySleepTime());
            return;
        }
        pageProcessor.process(page);
        extractAndAddRequests(page, spawnUrl);
        if (!page.getResultItems().isSkip()) {
            for (Pipeline pipeline : pipelines) {
                pipeline.process(page.getResultItems(), this);
            }
        }
        //for proxy status management
        request.putExtra(Request.STATUS_CODE, page.getStatusCode());
        sleep(site.getSleepTime());
    }

它在内部调用downloader下载页面得到Page(Page代表了一个页面),然后判断是否需要重试(needCycleRetry标志会在downloader下载页面发生异常时被设置为true,同时会把自己本身request加到targetRequests当中),如果需要,则抽取targetRequests到scheduler当中。如果都没问题,继续调用我们实现的页面处理器进行处理,之后再抽取我们在页面处理器中放入的targetRequests(即需要继续抓取的url)到scheduler当中。之后便是调用pipeline进行处理(一般做持久化操作,写到数据库、文件之类的),但是如果我们在页面处理器中为page设置了skip标志,那么就不会调用pipeline进行处理。
当然其中还包括一些重试休眠时间、继续抓取等待时间等来更好地控制爬虫抓取频率。

说完processRequest,我们回到run()继续分析,处理完之后,就是调用监听器,告诉其成功还是失败,最后抓取数加+1,然后通知新url被加入(通知waitNewUrl()可以返回继续了)。

需要说明的一点是,Spider类中的状态管理大量用到了Jdk Atomic原子包下的CAS并发原子类。

7、CountableThreadPool

前面说过Spider采用的线程池对象CountableThreadPool内部封装了ExecutorService,CountableThreadPool 提供了额外的获取线程运行数的方法,此外为防止大量urls入池等待,提供了阻塞方式管理urls入池。
阻塞方式的实现是通过ReentrantLock和它的Condition来实现的。具体代码如下:

public void execute(final Runnable runnable) {
        if (threadAlive.get() >= threadNum) {
            try {
                reentrantLock.lock();
                while (threadAlive.get() >= threadNum) {
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                    }
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        threadAlive.incrementAndGet();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    runnable.run();
                } finally {
                    try {
                        reentrantLock.lock();
                        threadAlive.decrementAndGet();
                        condition.signal();
                    } finally {
                        reentrantLock.unlock();
                    }
                }
            }
        });
    }

逻辑是这样的,如果正在运行的线程数threadAlive超过允许的线程数,就阻塞等待,直至收到某个线程结束通知。

罗嗦一句,这里的线程安全控制,主要是用到了JDK atomic包来表示状态和ReentrantLock、Condition来控制达到类似生产者消费者的阻塞机制。

关于Spider就分析到这里,后续主题待定。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66862.html

相关文章

  • 爬虫框架WebMagic源码分析系列目录

    摘要:爬虫框架源码分析之爬虫框架源码分析之爬虫框架源码分析之爬虫框架源码分析之爬虫框架源码分析之之进阶 爬虫框架Webmagic源码分析之Spider爬虫框架WebMagic源码分析之Scheduler爬虫框架WebMagic源码分析之Downloader爬虫框架WebMagic源码分析之Selector爬虫框架WebMagic源码分析之SeleniumWebMagic之Spider进阶

    wayneli 评论0 收藏0
  • 爬虫框架WebMagic源码分析Scheduler

    摘要:包主要实现类,这是一个抽象类,实现了通用的模板方法,并在方法内部判断错误重试去重处理等。重置重复检查就是清空,获取请求总数也就是获取的。至于请求总数统计,就是返回中维护的的大小。 Scheduler是Webmagic中的url调度器,负责从Spider处理收集(push)需要抓取的url(Page的targetRequests)、并poll出将要被处理的url给Spider,同时还负责...

    TIGERB 评论0 收藏0
  • 爬虫框架WebMagic源码分析Downloader

    摘要:方法,首先判断是否有这是在中配置的,如果有,直接调用的将相应内容转化成对应编码字符串,否则智能检测响应内容的字符编码。 Downloader是负责请求url获取返回值(html、json、jsonp等)的一个组件。当然会同时处理POST重定向、Https验证、ip代理、判断失败重试等。 接口:Downloader 定义了download方法返回Page,定义了setThread方法来...

    104828720 评论0 收藏0
  • WebMagicSpider进阶

    摘要:实际运行中就发现了一个有趣的现象。爬虫抓取的速度超过了我用给它推送的速度,导致爬虫从获取不到同时此刻线程池所有线程都已停止。如何管理设置,避免返回,且没有工作线程时退出循环。退出检测循环说明结束了,手动调用来是退出调度循环,终止爬虫。 Webmagic源码分析系列文章,请看这里 从解决问题开始吧。 问题描述:由于数据库的数据量特别大,而且公司没有搞主从读写分离,导致从数据库读取数据比较...

    Zhuxy 评论0 收藏0
  • 优雅的使用WebMagic框架写Java爬虫

    摘要:优雅的使用框架,爬取唐诗别苑网的诗人诗歌数据同时在几种动态加载技术中对比作选择虽然差不多两年没有维护,但其本身是一个优秀的爬虫框架的实现,源码中有很多值得参考的地方,特别是对爬虫多线程的控制。 优雅的使用WebMagic框架,爬取唐诗别苑网的诗人诗歌数据 同时在几种动态加载技术(HtmlUnit、PhantomJS、Selenium、JavaScriptEngine)中对比作选择 We...

    leejan97 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<