使用scrapy實現增量式爬取方式

實現爬蟲的增量式爬取有兩種方法,一是在獲得頁面解析的內容後判斷該內容是否已經被爬取過,二是在發送請求之前判斷要被請求的url是否已經被爬取過,前一種方法可以感知每個頁面的內容是否發生變化,能獲取頁面新增或者變化的內容,但是由於要對每個url發送請求,所以速度比較慢,而對網站服務器的壓力也比較大,後一種無法獲得頁面變化的內容,但是因為不用對已經爬取過的url發送請求,所以對服務器壓力比較小,速度比較快,適用於爬取新增網頁

下面用一個小說網站爬蟲的例子來介紹在scrapy中這兩種方式的實現

1.要爬取的信息

在scrapy中,信息通過item來封裝,這裡我定義兩個item,一個用於封裝每本小說的信息,一個用於封裝每個章節的信息

1.BookItem

class BookItem(scrapy.Item):
    _id = scrapy.Field() #小說id,用於定位章節信息,章節唯一
    novel_Name = scrapy.Field() #小說名稱
    novel_Writer = scrapy.Field()#小說作者
    novel_Type = scrapy.Field()#小說類型
    novel_Status = scrapy.Field()#小說狀態,連載或者完結
    novel_UpdateTime = scrapy.Field()#最後更新時間
    novel_Words = scrapy.Field() #總字數
    novel_ImageUrl = scrapy.Field()#封面圖片
    novel_AllClick = scrapy.Field()#總點擊
    novel_MonthClick = scrapy.Field()#月點擊
    novel_WeekClick = scrapy.Field()#周點擊
    novel_AllComm = scrapy.Field()#總推薦
    novel_MonthComm = scrapy.Field()#月推薦
    novel_WeekComm = scrapy.Field()#周推薦
    novel_Url = scrapy.Field()#小說url
    novel_Introduction = scrapy.Field()#小說簡介

2.ChapterItem

 class ChapterItem(scrapy.Item):
    chapter_Url = scrapy.Field()#章節url
    _id = scrapy.Field()#章節id
    novel_Name = scrapy.Field()#小說名稱
    chapter_Name = scrapy.Field()#章節名稱
    chapter_Content = scrapy.Field()#內容
    novel_ID = scrapy.Field()#小說id
    is_Error = scrapy.Field()#是否異常

2.解析信息

這裡我是用的是scrapy自帶的通用爬蟲模塊,隻需要指定信息解析方式,需要跟進的url就夠瞭

1.指定需要跟進的url和回調函數

  allowed_domains = ["23us.so"] #允許爬取的域名
  start_urls = ["http://www.23us.so/xiaoshuo/414.html"]#種子url
  #跟進的url
  rules=(
    Rule(LinkExtractor(allow=("xiaoshuo/\d*\.html")),callback="parse_book_message",follow=True),
    Rule(LinkExtractor(allow=("files/article/html/\d*?/\d*?.index.html")),callback="parse_book_chapter",follow=True),
    Rule(LinkExtractor(allow=("files/article/html/\d*?/\d*?/\d*?.html")),callback="parse_chapter_content",follow=True),
    Rule(LinkExtractor(allow=(".*")),follow=True),
  )

2.解析方法

1.解析書籍信息方法

#解析小說信息頁面
  def parse_book_message(self,response):
    if not response.body:
      print(response.url+"已經被爬取過瞭,跳過")
      return;
    ht = response.body.decode("utf-8")
    text = html.fromstring(ht)
    novel_Url = response.url
    novel_Name = text.xpath(".//dl[@id='content']/dd[1]/h1/text()")[0].split(" ")[0] if response.xpath(".//dl[@id='content']/dd[1]/h1/text()") else "None"
    novel_ImageUrl = text.xpath(".//a[@class='hst']/img/@src")[0] if response.xpath(".//a[@class='hst']/img/@src") else "None"
    novel_ID = int(response.url.split("/")[-1].split(".")[0]) if response.url.split("/")[-1].split(".") else "None"
    novel_Type = text.xpath(".//table[@id='at']/tr[1]/td[1]/a/text()") if response.xpath(".//table[@id='at']/tr[1]/td[1]/a/text()") else "None"
    novel_Writer = "".join(text.xpath(".//table[@id='at']/tr[1]/td[2]/text()")) if response.xpath(".//table[@id='at']/tr[1]/td[2]/text()") else "None"
    novel_Status = "".join(text.xpath(".//table[@id='at']/tr[1]/td[3]/text()")) if response.xpath(".//table[@id='at']/tr[1]/td[3]/text()") else "None"
    novel_Words = self.getNumber("".join(text.xpath(".//table[@id='at']/tr[2]/td[2]/text()"))) if response.xpath(".//table[@id='at']/tr[2]/td[2]/text()") else "None"
    novel_UpdateTime = "".join(text.xpath(".//table[@id='at']/tr[2]/td[3]/text()")) if response.xpath(".//table[@id='at']/tr[2]/td[3]/text()") else "None"
    novel_AllClick = int("".join(text.xpath(".//table[@id='at']/tr[3]/td[1]/text()"))) if response.xpath(".//table[@id='at']/tr[3]/td[1]/text()") else "None"
    novel_MonthClick = int("".join(text.xpath(".//table[@id='at']/tr[3]/td[2]/text()"))) if response.xpath(".//table[@id='at']/tr[3]/td[2]/text()") else "None"
    novel_WeekClick = int("".join(text.xpath(".//table[@id='at']/tr[3]/td[3]/text()"))) if response.xpath(".//table[@id='at']/tr[3]/td[3]/text()") else "None"
    novel_AllComm = int("".join(text.xpath(".//table[@id='at']/tr[4]/td[1]/text()"))) if response.xpath(".//table[@id='at']/tr[4]/td[1]/text()") else "None"
    novel_MonthComm = int("".join(text.xpath(".//table[@id='at']/tr[4]/td[3]/text()"))) if response.xpath(".//table[@id='at']/tr[4]/td[2]/text()") else "None"
    novel_WeekComm = int("".join(text.xpath(".//table[@id='at']/tr[4]/td[3]/text()"))) if response.xpath(".//table[@id='at']/tr[4]/td[3]/text()") else "None"
    pattern = re.compile('<p>(.*)<br')
    match = pattern.search(ht)
    novel_Introduction = "".join(match.group(1).replace("&nbsp;","")) if match else "None"
     #封裝小說信息類
    bookitem = BookItem(
          novel_Type = novel_Type[0],
          novel_Name = novel_Name,
          novel_ImageUrl = novel_ImageUrl,
          _id = novel_ID,   #小說id作為唯一標識符
          novel_Writer = novel_Writer,
          novel_Status = novel_Status,
          novel_Words = novel_Words,
          novel_UpdateTime = novel_UpdateTime,
          novel_AllClick = novel_AllClick,
          novel_MonthClick = novel_MonthClick,
          novel_WeekClick = novel_WeekClick,
          novel_AllComm = novel_AllComm,
          novel_MonthComm = novel_MonthComm,
          novel_WeekComm = novel_WeekComm,
          novel_Url = novel_Url,
          novel_Introduction = novel_Introduction,
    )
    return bookitem

2.解析章節信息

def parse_chapter_content(self,response):
    if not response.body:
      print(response.url+"已經被爬取過瞭,跳過")
      return;
    ht = response.body.decode('utf-8')
    text = html.fromstring(ht)
    soup = BeautifulSoup(ht)
    novel_ID = response.url.split("/")[-2]
    novel_Name = text.xpath(".//p[@class='fr']/following-sibling::a[3]/text()")[0]
    chapter_Name = text.xpath(".//h1[1]/text()")[0]
    '''
    chapter_Content = "".join("".join(text.xpath(".//dd[@id='contents']/text()")).split())
    if len(chapter_Content) < 25:
      chapter_Content = "".join("".join(text.xpath(".//dd[@id='contents']//*/text()")))
    pattern = re.compile('dd id="contents".*?>(.*?)</dd>')
    match = pattern.search(ht)
    chapter_Content = "".join(match.group(1).replace("&nbsp;","").split()) if match else "爬取錯誤"
    '''
    result,number = re.subn("<.*?>","",str(soup.find("dd",id='contents')))
    chapter_Content = "".join(result.split())
    print(len(chapter_Content))
    novel_ID = response.url.split("/")[-2]
    return ChapterItem(
          chapter_Url = response.url,
          _id=int(response.url.split("/")[-1].split(".")[0]),
          novel_Name=novel_Name,
          chapter_Name=chapter_Name,
          chapter_Content= chapter_Content,
          novel_ID = novel_ID,
          is_Error = len(chapter_Content) < 3000
          )

3.scrapy中實現增量式爬取的幾種方式

1.緩存

通過開啟緩存,將每個請求緩存至本地,下次爬取時,scrapy會優先從本地緩存中獲得response,這種模式下,再次請求已爬取的網頁不用從網絡中獲得響應,所以不受帶寬影響,對服務器也不會造成額外的壓力,但是無法獲取網頁變化的內容,速度也沒有第二種方式快,而且緩存的文件會占用比較大的內存,在setting.py的以下註釋用於設置緩存

#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = 'httpcache'
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'

這種方式比較適合內存比較大的主機使用,我的阿裡雲是最低配的,在爬取半個晚上接近27W個章節信息後,內存就用完瞭

2.對item實現去重

本文開頭的第一種方式,實現方法是在pipelines.py中進行設置,即在持久化數據之前判斷數據是否已經存在,這裡我用的是mongodb持久化數據,邏輯如下

  #處理書信息
  def process_BookItem(self,item):
    bookItemDick = dict(item)
    try:
      self.bookColl.insert(bookItemDick)
      print("插入小說《%s》的所有信息"%item["novel_Name"])
    except Exception:
      print("小說《%s》已經存在"%item["novel_Name"])
  #處理每個章節
  def process_ChapterItem(self,item):
    try:
      self.contentColl.insert(dict(item))
      print('插入小說《%s》的章節"%s"'%(item['novel_Name'],item['chapter_Name']))
    except Exception:
      print("%s存在瞭,跳過"%item["chapter_Name"])
  def process_item(self, item, spider):
    '''
    if isinstance(item,ChaptersItem):
      self.process_ChaptersItem(item)
    '''
    if isinstance(item,BookItem):
      self.process_BookItem(item)
    if isinstance(item,ChapterItem):
      self.process_ChapterItem(item)
    return item

兩種方法判斷mongodb中是否存在已有的數據,一是先查詢後插入,二是先設置唯一索引或者主鍵再直接插入,由於mongodb的特點是插入塊,查詢慢,所以這裡直接插入,需要將唯一信息設置為”_id”列,或者設置為唯一索引,在mongodb中設置方法如下

db.集合名.ensureIndex({"要設置索引的列名":1},{"unique":1})

需要用什麼信息實現去重,就將什麼信息設置為唯一索引即可(小說章節信息由於數據量比較大,用於查詢的列最好設置索引,要不然會非常慢),這種方法對於服務器的壓力太大,而且速度比較慢,我用的是第二種方法,即對已爬取的url進行去重

3.對url實現去重

對我而言,這種方法是最好的方法,因為速度快,對網站服務器的壓力也比較小,不過網上的資料比較少,後來在文檔中發現scrapy可以自定義下載中間件,才解決瞭這個問題

文檔原文如下

class scrapy.downloadermiddlewares.DownloaderMiddleware

process_request(request, spider) 當每個request通過下載中間件時,該方法被調用。

process_request() 必須返回其中之一: 返回 None 、返回一個 Response 對象、返回一個 Request對象或raise IgnoreRequest 。

如果其返回 None ,Scrapy將繼續處理該request,執行其他的中間件的相應方法,直到合適的下載器處理函數(downloadhandler)被調用, 該request被執行(其response被下載)。

如果其返回 Response 對象,Scrapy將不會調用 任何 其他的 process_request() 或process_exception() 方法,或相應地下載函數; 其將返回該response。 已安裝的中間件的process_response() 方法則會在每個response返回時被調用。

如果其返回 Request 對象,Scrapy則停止調用process_request方法並重新調度返回的request。當新返回的request被執行後,相應地中間件鏈將會根據下載的response被調用。

如果其raise一個 IgnoreRequest 異常,則安裝的下載中間件的 process_exception()方法會被調用。如果沒有任何一個方法處理該異常,則request的errback(Request.errback)方法會被調用。如果沒有代碼處理拋出的異常,則該異常被忽略且不記錄(不同於其他異常那樣)。

所以隻需要在process_request中實現去重的邏輯就可以瞭,代碼如下

class UrlFilter(object):
  #初始化過濾器(使用mongodb過濾)
  def __init__(self):
    self.settings = get_project_settings()
    self.client = pymongo.MongoClient(
      host = self.settings['MONGO_HOST'],
      port = self.settings['MONGO_PORT'])
    self.db = self.client[self.settings['MONGO_DB']]
    self.bookColl = self.db[self.settings['MONGO_BOOK_COLL']]
    #self.chapterColl = self.db[self.settings['MONGO_CHAPTER_COLL']]
    self.contentColl = self.db[self.settings['MONGO_CONTENT_COLL']]
  def process_request(self,request,spider):
    if (self.bookColl.count({"novel_Url":request.url}) > 0) or (self.contentColl.count({"chapter_Url":request.url}) > 0):
      return http.Response(url=request.url,body=None)

但是又會有一個問題,就是有可能下次開啟時,種子url已經被爬取過瞭,爬蟲會直接關閉,後來想到一個笨方法解決瞭這個問題,即在pipeline.py裡的open_spider方法中再爬蟲開啟時刪除對種子url的緩存

def open_spider(self,spider):            
    self.bookColl.remove({"novel_Url":"http://www.23us.so/xiaoshuo/414.html"})

4.結果

目前一個晚上爬取瞭大約1000部小說35W個章節的信息,還在繼續爬取中

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: