python使用pywinauto驅動微信客戶端實現公眾號爬蟲
項目地址
https://github.com/fancyerii/wechat-gongzhonghao-crawler
pywinauto簡介
pywinauto是一個python的工具,可以用於控制Windows的GUI程序。詳細的文檔可以參考這裡。
WechatAutomator類
自動化微信的代碼封裝在瞭類WechatAutomator裡,完整的代碼可以參考這裡。這裡簡要的介紹一下其中的主要方法:
init_window
這個方法完成類的初始化,它的代碼為:
def init_window(self, exe_path=r"C:\Program Files (x86)\Tencent\WeChat\WeChat.exe", turn_page_interval=3, click_url_interval=1, win_width=1000, win_height=600): app = Application(backend="uia").connect(path=exe_path) self.main_win = app.window(title=u"微信", class_name="WeChatMainWndForPC") self.main_win.set_focus() self.app = app self.visible_top = 70 self.turn_page_interval = turn_page_interval self.click_url_interval = click_url_interval self.browser = None self.win_width = win_width self.win_height = win_height # 為瞭讓移動窗口,同時使用非uia的backend,這是pywinauto的uia的一個bug self.app2 = Application().connect(path=exe_path) self.move_window()
我們首先來看函數的參數:
- exe_path
- 微信程序的地址
- turn_page_interval
- 抓取翻頁時的時間間隔,默認3s
- click_url_interval
- 在抓取一頁的url時的間隔,默認1s
- win_width
- 設置窗口的寬度
- win_height
- 設置窗口的高度,如果顯示器的分辨率較大,可以設置的更加高一些,從而一頁包含的文章數更多一些,從而翻頁少一點。註意:一定要保證窗口完全可見,也就是說win_height不能大於實際分辨率的高度!
這個函數的主要功能是構建Application對象從而通過pywinauto實現控制,這裡使用的是uia的backend,然後設置窗口的大小並且把窗口移到最左上角。因為根據so文章,pywinauto 0.6.8存在bug,隻能通過win32的backend來移到窗口,所以構造瞭self.app2然後調用move_window()函數把窗口移到最左上角。
crawl_gongzhonghao
這個函數實現瞭某個公眾號的文章抓取。它的基本控制邏輯如下:
- 首先通過搜索框根據名字搜索公眾號並且點擊它。
- 對於當前頁點擊所有的鏈接並且下載其內容。
- 使用PAGE_DOWN鍵往下翻頁
- 需要判斷是否繼續抓取
第一個是通過locate_user函數實現,後面會介紹。第二個是通過process_page函數實現,後面也會介紹。判斷是否繼續抓取的邏輯為:
- 如果翻頁超過max_pages,則停止抓取
- 如果碰到某個url曾經抓取過,那說明之前的文章都已經抓取過瞭,則停止抓取
- 如果lastest_date不是None並且一篇文章的發佈日期早於它,則停止抓取
所以我們通常會在第一次抓取的時候把max_pages設置的很大(比如100),然後通過latest_date來抓到指定的日期。而之後的抓取則設置max_pages為較小的值(比如默認的6),這樣隻要爬蟲在兩次抓取之間公眾號的更新不超過6頁,那麼就不會漏掉文章。具體的邏輯可以參考main.py,它會把抓取的文章通過http請求發給Server,並且每次抓取的時候從Server查詢抓取過的文章存放到states這個list裡states[i][“url”]就存儲瞭第i篇文章的url。
def crawl_gongzhonghao(self, account_name, articles, states, detail, max_pages=6, latest_date=None, no_item_retry=3): logger.debug(account_name) if not self.locate_user(account_name): return False last_visited_titles = set() visited_urls = set() self.turn_page_up(min(20, max_pages * 2)) pagedown_retry = 0 last_visited_titles = [] for page in range(0, max_pages): items = [] last_visited_titles = self.process_page(account_name, items, last_visited_titles, states, visited_urls, detail) articles.extend(items) if len(items) == 0: pagedown_retry += 1 if pagedown_retry >= no_item_retry: s = "break because of retry {}".format(pagedown_retry) logger.debug(s) WechatAutomator.add_to_detail(s, detail) break else: pagedown_retry = 0 if len(items) > 0 and latest_date is not None: html = items[-1][-1] pub_date = WechatAutomator.get_pubdate(html) if pub_date and pub_date < latest_date: s = "stop because {} < {}".format(pub_date, latest_date) logger.debug(s) WechatAutomator.add_to_detail(s, detail) break url_exist = False for item in items: if WechatAutomator.url_in_states(item[0], states): s = "stop because url exist {}".format(item[0]) logger.debug(s) WechatAutomator.add_to_detail(s, detail) url_exist = True break if url_exist: break self.click_right() self.main_win.type_keys("{PGDN}") time.sleep(self.turn_page_interval) self.turn_page_up(page * 2) return True
locate_user
locate_user函數的控制流程為:
- 找到左上角的搜索框並且點擊它獲得焦點
- 使用ctrl+a選中可能有的文字(之前的bug?)並且使用後退鍵刪除它們
- 輸入公眾號名稱
- 在彈出的list裡點擊這個公眾號名稱從而進入公眾號
def locate_user(self, user, retry=5): if not self.main_win: raise RuntimeError("you should call init_window first") search_btn = self.main_win.child_window(title="搜索", control_type="Edit") self.click_center(search_btn) self.main_win.type_keys("^a") self.main_win.type_keys("{BACKSPACE}") self.main_win.type_keys(user) for i in range(retry): time.sleep(1) try: search_list = self.main_win.child_window(title="搜索結果") match_result = search_list.child_window(title=user, control_type="ListItem") self.click_center(match_result) return True except: pass return False
這裡主要就是通過child_window函數進行定位,關於它的用法這裡不介紹。關於怎麼定位元素的方法可以使用Inspect.exe或者print_control_identifiers函數,具體參考這裡。
process_page
這個函數是最主要的抓取代碼,它處理當前一頁的內容,它的控制流程如下:
- 構建當前頁的tree
- 使用recursive_get函數遍歷這顆樹並且找到每篇文章對應的element
- 遍歷每一篇文章
- 如果文章的名字在上一頁出現過,則跳過
- 獲得這篇文章的坐標信息
- 如果文章不可見(rect.top >= win_rect.bottom or rect.bottom <= self.visible_top)則跳過
- 計算點擊的坐標
- 點擊文章打開新的窗口
- 在新的窗口中點擊【復制鏈接】按鈕
- 從剪貼板復制鏈接url
- 通過url下載文章內容並且parse發佈日期
邏輯比較簡單,但是有一些很trick的地方:
- 微信翻頁的實現
- 微信客戶端的翻頁和瀏覽器不同,它的內容是累加的,比如第一頁3篇文章,往下翻一頁可能變成6篇文章,再翻可能變成9篇。這個時候這9篇文章都是在tree中的,隻不過最後3篇的坐標(top和bottom)是空間的。
- 能否點擊 一篇文章對應的框(圖)可能是部分可見的,甚至它的top非常接近屏幕的最下方,這個時候可能點不瞭。如下圖所示:
與此類似的是右上角的黑色頭部(不能滾到並且會遮擋)也有一定空間,如下圖所示:
- 點擊的位置
因為這個框可能很窄(bottom-top很小)並且可能在很靠上或者靠下的位置。所以有如下代碼:
# 計算可見的高度 visible_height = min(rect.bottom, win_rect.bottom) - max(rect.top, win_rect.top+self.visible_top) # 太窄的不點擊,希望下次翻頁後能顯示更多像素從而可以點擊, # 但是如果微信的某個文章的框的高度小於10個像素,那麼這篇文章就無法被點擊 # 不過作者目前為發現這麼窄的文章 if visible_height < 10: continue # 如果某個文章的框太大,則拋出異常,目前為止為發現這樣的問題。 if rect.bottom - rect.top >= win_rect.bottom - self.visible_top: raise RuntimeError("{}-{}>={}-{}".format(rect.bottom, rect.top, win_rect.bottom, self.visible_top)) # 如果下部部分可見,那麼點擊上方是比較”安全“的 if rect.bottom >= win_rect.bottom: click_up = True # 如果下部完全可見,則點擊下方是”安全“的 else: click_up = False
完整代碼如下:
def process_page(self, account_name, items, lastpage_clicked_titles, states, visited_urls, detail): clicked_titles = set() text = self.main_win.child_window(title=account_name, control_type="Text", found_index=0) parent = text while parent: parent = parent.parent() if '會話列表' == parent.element_info.name: break paths = [0, 2, 0, 0, 0, 1, 0] for idx in paths: parent = parent.children()[idx] elems = [] self.recursive_get(parent, elems) win_rect = self.main_win.rectangle() for elem in elems: rect = elem.rectangle() if elem.element_info.name in lastpage_clicked_titles: continue if rect.top >= win_rect.bottom or rect.bottom <= self.visible_top: continue visible_height = min(rect.bottom, win_rect.bottom) - max(rect.top, win_rect.top+self.visible_top) if visible_height < 10: continue if rect.bottom - rect.top >= win_rect.bottom - self.visible_top: raise RuntimeError("{}-{}>={}-{}".format(rect.bottom, rect.top, win_rect.bottom, self.visible_top)) if rect.bottom >= win_rect.bottom: click_up = True else: click_up = False if self.is_bad_elem(elem): s = "not good elem {}".format(elem.element_info.name[0:10]) logger.debug(s) WechatAutomator.add_to_detail(s, detail) continue try: self.click_url(rect, win_rect, click_up) copy_btn = self.browser.child_window(title="復制鏈接地址") self.click_center(copy_btn, click_main=False) url = clipboard.GetData() if elem.element_info.name != '圖片': clicked_titles.add(elem.element_info.name) if url and not url in visited_urls: visited_urls.add(url) html = None try: html = requests.get(url).text except: s = "fail get {}".format(url) logger.debug(s) WechatAutomator.add_to_detail(s, detail) items.append((url, rect, elem.element_info.name, html)) except: traceback.print_exc() pass finally: if self.browser: try: self.browser.close() except: pass self.browser = None time.sleep(self.click_url_interval) return clicked_titles
以上就是python使用pywinauto驅動微信客戶端實現公眾號爬蟲的詳細內容,更多關於python 公眾號爬蟲的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- mysql如何獲取時間整點
- Pygame Rect區域位置的使用(圖文)
- python playwright之元素定位示例詳解
- MYSQL SQL查詢近7天一個月的數據的操作方法
- JS實現視頻彈幕效果