python使用pywinauto驅動微信客戶端實現公眾號爬蟲

項目地址

https://github.com/fancyerii/wechat-gongzhonghao-crawler

pywinauto簡介

pywinauto是一個python的工具,可以用於控制Windows的GUI程序。詳細的文檔可以參考這裡。

WechatAutomator類

自動化微信的代碼封裝在瞭類WechatAutomator裡,完整的代碼可以參考這裡。這裡簡要的介紹一下其中的主要方法:

init_window

這個方法完成類的初始化,它的代碼為:

    def init_window(self, exe_path=r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe",
                    turn_page_interval=3,
                    click_url_interval=1,
                    win_width=1000,
                    win_height=600):
        app = Application(backend="uia").connect(path=exe_path)
        self.main_win = app.window(title=u"微信", class_name="WeChatMainWndForPC")
        self.main_win.set_focus()
        self.app = app
        self.visible_top = 70
        self.turn_page_interval = turn_page_interval
        self.click_url_interval = click_url_interval
        self.browser = None
        self.win_width = win_width
        self.win_height = win_height
        # 為瞭讓移動窗口,同時使用非uia的backend,這是pywinauto的uia的一個bug
        self.app2 = Application().connect(path=exe_path)
        self.move_window()

我們首先來看函數的參數:

  • exe_path
    • 微信程序的地址
  • turn_page_interval
    • 抓取翻頁時的時間間隔,默認3s
  • click_url_interval
    • 在抓取一頁的url時的間隔,默認1s
  • win_width
    • 設置窗口的寬度
  • win_height
    • 設置窗口的高度,如果顯示器的分辨率較大,可以設置的更加高一些,從而一頁包含的文章數更多一些,從而翻頁少一點。註意:一定要保證窗口完全可見,也就是說win_height不能大於實際分辨率的高度!

這個函數的主要功能是構建Application對象從而通過pywinauto實現控制,這裡使用的是uia的backend,然後設置窗口的大小並且把窗口移到最左上角。因為根據so文章,pywinauto 0.6.8存在bug,隻能通過win32的backend來移到窗口,所以構造瞭self.app2然後調用move_window()函數把窗口移到最左上角。

crawl_gongzhonghao

這個函數實現瞭某個公眾號的文章抓取。它的基本控制邏輯如下:

  • 首先通過搜索框根據名字搜索公眾號並且點擊它。
  • 對於當前頁點擊所有的鏈接並且下載其內容。
  • 使用PAGE_DOWN鍵往下翻頁
  • 需要判斷是否繼續抓取

第一個是通過locate_user函數實現,後面會介紹。第二個是通過process_page函數實現,後面也會介紹。判斷是否繼續抓取的邏輯為:

  • 如果翻頁超過max_pages,則停止抓取
  • 如果碰到某個url曾經抓取過,那說明之前的文章都已經抓取過瞭,則停止抓取
  • 如果lastest_date不是None並且一篇文章的發佈日期早於它,則停止抓取

所以我們通常會在第一次抓取的時候把max_pages設置的很大(比如100),然後通過latest_date來抓到指定的日期。而之後的抓取則設置max_pages為較小的值(比如默認的6),這樣隻要爬蟲在兩次抓取之間公眾號的更新不超過6頁,那麼就不會漏掉文章。具體的邏輯可以參考main.py,它會把抓取的文章通過http請求發給Server,並且每次抓取的時候從Server查詢抓取過的文章存放到states這個list裡states[i][“url”]就存儲瞭第i篇文章的url。

    def crawl_gongzhonghao(self, account_name, articles, states, detail,
                           max_pages=6, latest_date=None, no_item_retry=3):
        logger.debug(account_name)
        if not self.locate_user(account_name):
            return False
        last_visited_titles = set()
        visited_urls = set()
        self.turn_page_up(min(20, max_pages * 2))

        pagedown_retry = 0
        last_visited_titles = []
        for page in range(0, max_pages):
            items = []
            last_visited_titles = self.process_page(account_name, items, last_visited_titles, states, visited_urls, detail)
            articles.extend(items)

            if len(items) == 0:
                pagedown_retry += 1
                if pagedown_retry >= no_item_retry:
                    s = "break because of retry {}".format(pagedown_retry)
                    logger.debug(s)
                    WechatAutomator.add_to_detail(s, detail)
                    break
            else:
                pagedown_retry = 0

            if len(items) > 0 and latest_date is not None:
                html = items[-1][-1]
                pub_date = WechatAutomator.get_pubdate(html)
                if pub_date and pub_date < latest_date:
                    s = "stop because {} < {}".format(pub_date, latest_date)
                    logger.debug(s)
                    WechatAutomator.add_to_detail(s, detail)
                    break

            url_exist = False
            for item in items:
                if WechatAutomator.url_in_states(item[0], states):
                    s = "stop because url exist {}".format(item[0])
                    logger.debug(s)
                    WechatAutomator.add_to_detail(s, detail)
                    url_exist = True
                    break
            if url_exist:
                break

            self.click_right()
            self.main_win.type_keys("{PGDN}")
            time.sleep(self.turn_page_interval)

        self.turn_page_up(page * 2)

        return True

locate_user

locate_user函數的控制流程為:

  • 找到左上角的搜索框並且點擊它獲得焦點
  • 使用ctrl+a選中可能有的文字(之前的bug?)並且使用後退鍵刪除它們
  • 輸入公眾號名稱
  • 在彈出的list裡點擊這個公眾號名稱從而進入公眾號
    def locate_user(self, user, retry=5):
        if not self.main_win:
            raise RuntimeError("you should call init_window first")

        search_btn = self.main_win.child_window(title="搜索", control_type="Edit")
        self.click_center(search_btn)

        self.main_win.type_keys("^a")
        self.main_win.type_keys("{BACKSPACE}")
        self.main_win.type_keys(user)
        for i in range(retry):
            time.sleep(1)
            try:
                search_list = self.main_win.child_window(title="搜索結果")
                match_result = search_list.child_window(title=user, control_type="ListItem")
                self.click_center(match_result)
                return True
            except:
                pass

        return False

這裡主要就是通過child_window函數進行定位,關於它的用法這裡不介紹。關於怎麼定位元素的方法可以使用Inspect.exe或者print_control_identifiers函數,具體參考這裡。

process_page

這個函數是最主要的抓取代碼,它處理當前一頁的內容,它的控制流程如下:

  • 構建當前頁的tree
  • 使用recursive_get函數遍歷這顆樹並且找到每篇文章對應的element
  • 遍歷每一篇文章
    • 如果文章的名字在上一頁出現過,則跳過
    • 獲得這篇文章的坐標信息
    • 如果文章不可見(rect.top >= win_rect.bottom or rect.bottom <= self.visible_top)則跳過
    • 計算點擊的坐標
    • 點擊文章打開新的窗口
    • 在新的窗口中點擊【復制鏈接】按鈕
    • 從剪貼板復制鏈接url
    • 通過url下載文章內容並且parse發佈日期

邏輯比較簡單,但是有一些很trick的地方:

  • 微信翻頁的實現
    • 微信客戶端的翻頁和瀏覽器不同,它的內容是累加的,比如第一頁3篇文章,往下翻一頁可能變成6篇文章,再翻可能變成9篇。這個時候這9篇文章都是在tree中的,隻不過最後3篇的坐標(top和bottom)是空間的。
  • 能否點擊 一篇文章對應的框(圖)可能是部分可見的,甚至它的top非常接近屏幕的最下方,這個時候可能點不瞭。如下圖所示:

與此類似的是右上角的黑色頭部(不能滾到並且會遮擋)也有一定空間,如下圖所示:

  • 點擊的位置

因為這個框可能很窄(bottom-top很小)並且可能在很靠上或者靠下的位置。所以有如下代碼:

    # 計算可見的高度
    visible_height = min(rect.bottom, win_rect.bottom) - max(rect.top, win_rect.top+self.visible_top)
    # 太窄的不點擊,希望下次翻頁後能顯示更多像素從而可以點擊,
    # 但是如果微信的某個文章的框的高度小於10個像素,那麼這篇文章就無法被點擊
    # 不過作者目前為發現這麼窄的文章
    if visible_height < 10:
        continue
    
    # 如果某個文章的框太大,則拋出異常,目前為止為發現這樣的問題。
    if rect.bottom - rect.top >= win_rect.bottom - self.visible_top:
        raise RuntimeError("{}-{}>={}-{}".format(rect.bottom, rect.top,
                                                 win_rect.bottom, self.visible_top))
    # 如果下部部分可見,那麼點擊上方是比較”安全“的
    if rect.bottom >= win_rect.bottom:
        click_up = True
    # 如果下部完全可見,則點擊下方是”安全“的
    else:
        click_up = False

完整代碼如下:

    def process_page(self, account_name, items, lastpage_clicked_titles, states, visited_urls, detail):
        clicked_titles = set()
        text = self.main_win.child_window(title=account_name, control_type="Text", found_index=0)
        parent = text
        while parent:
            parent = parent.parent()
            if '會話列表' == parent.element_info.name:
                break
        paths = [0, 2, 0, 0, 0, 1, 0]
        for idx in paths:
            parent = parent.children()[idx]

        elems = []
        self.recursive_get(parent, elems)
        win_rect = self.main_win.rectangle()
        for elem in elems:
            rect = elem.rectangle()

            if elem.element_info.name in lastpage_clicked_titles:
                continue

            if rect.top >= win_rect.bottom or rect.bottom <= self.visible_top:
                continue

            visible_height = min(rect.bottom, win_rect.bottom) - max(rect.top, win_rect.top+self.visible_top)
            if visible_height < 10:
                continue

            if rect.bottom - rect.top >= win_rect.bottom - self.visible_top:
                raise RuntimeError("{}-{}>={}-{}".format(rect.bottom, rect.top,
                                                         win_rect.bottom, self.visible_top))
            if rect.bottom >= win_rect.bottom:
                click_up = True
            else:
                click_up = False
            if self.is_bad_elem(elem):
                s = "not good elem {}".format(elem.element_info.name[0:10])
                logger.debug(s)
                WechatAutomator.add_to_detail(s, detail)
                continue

            try:
                self.click_url(rect, win_rect, click_up)
                copy_btn = self.browser.child_window(title="復制鏈接地址")
                self.click_center(copy_btn, click_main=False)
                url = clipboard.GetData()
                if elem.element_info.name != '圖片':
                    clicked_titles.add(elem.element_info.name)
                if url and not url in visited_urls:
                    visited_urls.add(url)
                    html = None
                    try:
                        html = requests.get(url).text
                    except:
                        s = "fail get {}".format(url)
                        logger.debug(s)
                        WechatAutomator.add_to_detail(s, detail)

                    items.append((url, rect, elem.element_info.name, html))

            except:
                traceback.print_exc()
                pass
            finally:
                if self.browser:
                    try:
                        self.browser.close()
                    except:
                        pass
                    self.browser = None

            time.sleep(self.click_url_interval)

        return clicked_titles

以上就是python使用pywinauto驅動微信客戶端實現公眾號爬蟲的詳細內容,更多關於python 公眾號爬蟲的資料請關註WalkonNet其它相關文章!

推薦閱讀: