摘要:包主要實現類,這是一個抽象類,實現了通用的模板方法,并在方法內部判斷錯誤重試去重處理等。重置重復檢查就是清空,獲取請求總數也就是獲取的。至于請求總數統計,就是返回中維護的的大小。
Scheduler是Webmagic中的url調度器,負責從Spider處理收集(push)需要抓取的url(Page的targetRequests)、并poll出將要被處理的url給Spider,同時還負責對url判斷是否進行錯誤重試、及去重處理、以及總頁面數、剩余頁面數統計等。
主要接口:
Scheduler,定義了基本的push和poll方法。基本接口。
MonitorableScheduler,繼承自Scheduler的接口,定義了獲取剩余url請求數和總請求數的方法。便于監控。
core包主要實現類:
DuplicateRemovedScheduler,這是一個抽象類,實現了通用的push模板方法,并在push方法內部判斷錯誤重試、去重處理等。去重策略采用的是HashSetDuplicateRemover類,這個會在稍后說明。
PriorityScheduler,內置兩個優先級隊列(+,-)和一個非優先級阻塞隊列的調度器。
QueueScheduler,內置一個阻塞隊列的調度器。這是默認采用的。
URL去重策略:
DuplicateRemover:去重接口,含有判斷是否重復,重置重復檢查,獲取請求總數的方法。
HashSetDuplicateRemover:DuplicateRemover的實現類,內部維護了一個并發安全的HashSet。
先說下去重策略的具體實現。核心代碼如下:
public class HashSetDuplicateRemover implements DuplicateRemover { private Seturls = Collections.newSetFromMap(new ConcurrentHashMap ()); @Override public boolean isDuplicate(Request request, Task task) { return !urls.add(getUrl(request)); } 。。。 @Override public void resetDuplicateCheck(Task task) { urls.clear(); } @Override public int getTotalRequestsCount(Task task) { return urls.size(); } }
去重策略類很簡單,就是維護一個并發安全的HashSet。然后通過add方法是否成功來判斷是否是重復的url。重置重復檢查就是清空set,獲取請求總數也就是獲取set的size。簡單明了。但是你以為去重就這么點,那么你錯了。繼續看。
public abstract class DuplicateRemovedScheduler implements Scheduler { private DuplicateRemover duplicatedRemover = new HashSetDuplicateRemover(); @Override public void push(Request request, Task task) { if (shouldReserved(request) || noNeedToRemoveDuplicate(request) || !duplicatedRemover.isDuplicate(request, task)) { pushWhenNoDuplicate(request, task); } } protected boolean shouldReserved(Request request) { return request.getExtra(Request.CYCLE_TRIED_TIMES) != null; } protected boolean noNeedToRemoveDuplicate(Request request) { return HttpConstant.Method.POST.equalsIgnoreCase(request.getMethod()); } protected void pushWhenNoDuplicate(Request request, Task task) { } }
DuplicateRemovedScheduler是一個抽象類,提供了push的通用模板,并為子類提供了pushWhenNoDuplicate用于實現自己的策略。push方法用于同一處理去重和重試機制。
首先判斷是否需要進行錯誤重試,如果需要,那么就直接push到隊列中,否則判斷請求是否為POST方法,如果是直接加入隊列,(這里需要注意的是,POST請求的url不會被加入HashSetDuplicateRemover維護的urls集合,故而也不會被加入到最終的getTotalRequestsCount的統計中,所以最終我們獲取的統計信息只是針對GET請求的。),否則進行去重判斷。
根據不同調度器的實現,pushWhenNoDuplicate的實現方式不一樣。
在PriorityScheduler中內置兩個優先級隊列(+,-)和一個非優先級阻塞隊列的調度器,其pushWhenNoDuplicate代碼如下:
public void pushWhenNoDuplicate(Request request, Task task) { if (request.getPriority() == 0) { noPriorityQueue.add(request); } else if (request.getPriority() > 0) { priorityQueuePlus.put(request); } else { priorityQueueMinus.put(request); } }
根據Request是否設置priority屬性,以及是否為正、負來決定加入到哪個隊列中。因為這影響了后續poll的先后順序。
在QueueScheduler中內置一個阻塞隊列的調度器。其pushWhenNoDuplicate代碼如下:
public void pushWhenNoDuplicate(Request request, Task task) { queue.add(request); }
就是簡單地將其加入隊列中。
以上就是關于URL去重及push的機制,接下來說明poll思路:
在PriorityScheduler中,poll順序為plus隊列>noPriority隊列>minus隊列。
public synchronized Request poll(Task task) { Request poll = priorityQueuePlus.poll(); if (poll != null) { return poll; } poll = noPriorityQueue.poll(); if (poll != null) { return poll; } return priorityQueueMinus.poll(); }
在QueueScheduler中,簡單粗暴。
public Request poll(Task task) { return queue.poll(); }
至于url請求總數統計,就是返回HashSetDuplicateRemover中維護的urls set的大小。這里再次羅嗦一次:最終我們獲取的統計信息只是針對GET請求的。
public int getTotalRequestsCount(Task task) { return getDuplicateRemover().getTotalRequestsCount(task); }
當然extensions擴展模塊中還有些Scheduler實現,比如RedisScheduler用作集群支持,FileCacheQueueScheduler用來斷點續爬支持等。由于本系列文章是先分析核心包,后續分析擴展包,所以關于這部分,后續補充。
RedisScheduler
思路是采用set來存儲已經抓取過的url,list來存儲待抓url隊列,hash來存儲序列化數據(哈希中的鍵為url的SHA值,值為Request的json序列化字符串)。所有數據類型的鍵都是基于Spider的UUID來生成的,也就是說每個Spider實例所擁有的都是不同的。
@Override public boolean isDuplicate(Request request, Task task) { Jedis jedis = pool.getResource(); try { return jedis.sadd(getSetKey(task), request.getUrl()) > 0; } finally { pool.returnResource(jedis); } } @Override protected void pushWhenNoDuplicate(Request request, Task task) { Jedis jedis = pool.getResource(); try { jedis.rpush(getQueueKey(task), request.getUrl()); if (request.getExtras() != null) { String field = DigestUtils.shaHex(request.getUrl()); String value = JSON.toJSONString(request); jedis.hset((ITEM_PREFIX + task.getUUID()), field, value); } } finally { pool.returnResource(jedis); } } @Override public synchronized Request poll(Task task) { Jedis jedis = pool.getResource(); try { String url = jedis.lpop(getQueueKey(task)); if (url == null) { return null; } String key = ITEM_PREFIX + task.getUUID(); String field = DigestUtils.shaHex(url); byte[] bytes = jedis.hget(key.getBytes(), field.getBytes()); if (bytes != null) { Request o = JSON.parseObject(new String(bytes), Request.class); return o; } Request request = new Request(url); return request; } finally { pool.returnResource(jedis); } }
@Override public int getLeftRequestsCount(Task task) { Jedis jedis = pool.getResource(); try { Long size = jedis.llen(getQueueKey(task)); return size.intValue(); } finally { pool.returnResource(jedis); } } @Override public int getTotalRequestsCount(Task task) { Jedis jedis = pool.getResource(); try { Long size = jedis.scard(getSetKey(task)); return size.intValue(); } finally { pool.returnResource(jedis); } }
這些代碼都很好理解,只要有點redis基礎的都沒問題,這里就不再贅述了。
至于RedisPriorityScheduler就是采用有序的zset來存儲plus、min隊列,list來存儲noprioprity隊列。
FileCacheQueueScheduler
思路是維護兩個文件.cursor.txt,.urls.txt 前者由于存儲一個數字,這個數字代表了讀取.urls.txt的行數。后者用來存儲所有的urls。初始化時從兩個文件讀取內存中,并初始化urls集和queue隊列、同時初始化flush線程定時flush內容到文件中。當poll和pushWhenNoDuplicate時和原來邏輯差不多,只不過加了寫文件的步驟。
需要注意的是:FileCacheQueueScheduler實現了自己的去重規則,而不是直接使用DuplicateRemovedScheduler父類的去重規則。不過原理都一樣,都是通過Set來去重。
以上就是關于調度器的部分,下篇主題待定。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/66885.html
摘要:獲取正在運行的線程數,用于狀態監控。之后初始化組件主要是初始化線程池將到中,初始化開始時間等。如果線程池中運行線程數量為,并且默認,那么就停止退出,結束爬蟲。 本系列文章,針對Webmagic 0.6.1版本 一個普通爬蟲啟動代碼 public static void main(String[] args) { Spider.create(new GithubRepoPageP...
摘要:爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之爬蟲框架源碼分析之之進階 爬蟲框架Webmagic源碼分析之Spider爬蟲框架WebMagic源碼分析之Scheduler爬蟲框架WebMagic源碼分析之Downloader爬蟲框架WebMagic源碼分析之Selector爬蟲框架WebMagic源碼分析之SeleniumWebMagic之Spider進階
摘要:實際運行中就發現了一個有趣的現象。爬蟲抓取的速度超過了我用給它推送的速度,導致爬蟲從獲取不到同時此刻線程池所有線程都已停止。如何管理設置,避免返回,且沒有工作線程時退出循環。退出檢測循環說明結束了,手動調用來是退出調度循環,終止爬蟲。 Webmagic源碼分析系列文章,請看這里 從解決問題開始吧。 問題描述:由于數據庫的數據量特別大,而且公司沒有搞主從讀寫分離,導致從數據庫讀取數據比較...
摘要:主要用于選擇器抽象類,實現類前面說的兩個接口,主要用于選擇器繼承。多個選擇的情形,每個選擇器各自獨立選擇,將所有結果合并。抽象類,定義了一些模板方法。這部分源碼就不做分析了。這里需要提到的一點是返回的不支持選擇,返回的對象支持選擇。 1、Selector部分:接口:Selector:定義了根據字符串選擇單個元素和選擇多個元素的方法。ElementSelector:定義了根據jsoup ...
摘要:有一個模塊其中實現了一個。但是感覺靈活性不大。接口如下它會獲得一個實例,你可以在里面進行任意的操作。本部分到此結束。 webmagic有一個selenium模塊,其中實現了一個SeleniumDownloader。但是感覺靈活性不大。所以我就自己參考實現了一個。 首先是WebDriverPool用來管理WebDriver池: import java.util.ArrayList; im...
閱讀 2044·2023-04-25 15:24
閱讀 1584·2019-08-30 12:55
閱讀 1621·2019-08-29 15:27
閱讀 476·2019-08-26 17:04
閱讀 2412·2019-08-26 10:59
閱讀 1806·2019-08-26 10:44
閱讀 2206·2019-08-22 16:15
閱讀 2594·2019-08-22 15:36