WxPython界面利用pubsub如何實現多線程控制

WxPython界面用pubsub實現多線程控制

用WxPython做界面時, 如果數據操作時間比較長,會使 WxPython 界面處於假死狀態,用戶體驗非常不好。

WxPython是利用pubsub來完成消息的傳送。

下面提供一個 WxPython界面利用pubsub 實現2個線程的控制的例子

實際使用, 隻要修改WorkThread1、WorkThread2 裡的 run 內容 及 MainFrame 裡的 updateDisplay 內容即可。

在此基礎上,可以實現多線程。

Python 3.7.3
wxPython          4.0.6
Pypubsub          4.0.3

在此之前有個單線程及進度條的例子,簡單需求可以參考這個

下面提供本文的代碼

# encoding: utf-8
"""
@author: 陳年椰子
@contact: [email protected]
@version: 1.0
@project:test 
@file: wx_thread.py
@time: 2022-3-24 15:34
說明
"""
 
import wx
from pubsub import pub
from time import sleep
import time
import threading
import sys
from random import random
 
 
# 線程調用耗時長代碼
class WorkThread1(threading.Thread):
    def __init__(self):
        """Init Worker Thread Class."""
        threading.Thread.__init__(self)
        self.breakflag = False
        self.start()
 
    def stop(self):
        self.breakflag = True
 
    # 耗時長的代碼
    def workproc(self):
        while True:
            if self.breakflag:
                pub.sendMessage("update", mstatus='查詢Thread中斷')
                sleep(2)
                break
            ts1 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
            t_sum = "查詢的結果"
            # print(ts1,t_sum)
            #  此處加的數據庫代碼
            pub.sendMessage("update", mstatus='{}:查詢最後10個數據並匯總{}'.format(ts1, t_sum))
            sleep(10)
        pub.sendMessage("update", mstatus='workdone')
        return ""
 
    def run(self):
        """Run Worker Thread."""
        pub.sendMessage("update", mstatus='workstart')
        self.workproc()
 
 
 
 
# 線程調用耗時長代碼
class WorkThread2(threading.Thread):
    def __init__(self):
        """Init Worker Thread Class."""
        threading.Thread.__init__(self)
        self.breakflag = False
        self.start()
 
    def stop(self):
        self.breakflag = True
 
    # 耗時長的代碼
    def workproc(self):
        while True:
            if self.breakflag:
                pub.sendMessage("update", mstatus='隨機數Thread中斷')
                sleep(2)
                break
            ts1 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
            t_info = "隨機數{}".format(str(random()))
            # print(ts1, t_info)
            #  此處加的數據庫代碼
            pub.sendMessage("update", mstatus='{}:產生{}'.format(ts1, t_info))
            sleep(1)
        pub.sendMessage("update", mstatus='workdone')
        return ""
 
    def run(self):
        """Run Worker Thread."""
        pub.sendMessage("update", mstatus='workstart')
        self.workproc()
 
 
 
class MainFrame(wx.Frame):
    """
    簡單的界面
    """
 
    def __init__(self, *args, **kw):
        # ensure the parent's __init__ is called
        super(MainFrame, self).__init__(*args, **kw)
        self.SetSize(size=(600, 400))
 
        # create a panel in the frame
        pnl = wx.Panel(self)
 
        # and put some text with a larger bold font on it
        self.st = wx.StaticText(pnl, label="分析工具 V 2022", pos=(25, 25))
        self.st2 = wx.StaticText(pnl, label="提示", pos=(25, 80))
        font = self.st.GetFont()
        font.PointSize += 2
        font = font.Bold()
 
        self.st.SetFont(font)
        self.st2.SetFont(font)
 
        # create a menu bar
        self.makeMenuBar()
 
        self.gauge = wx.Gauge(self, range=100, size=(500, 20))
        self.gauge.SetBezelFace(3)
        self.gauge.SetShadowWidth(3)
        sizer = wx.BoxSizer(wx.VERTICAL)
        sizer.Add(self.st, 0, wx.BOTTOM | wx.ALIGN_CENTER_VERTICAL, 0)
        sizer.Add(self.st2, 0, wx.BOTTOM | wx.ALIGN_CENTER_VERTICAL, 0)
        sizer.Add(self.gauge, 0, wx.BOTTOM | wx.ALIGN_CENTER_VERTICAL, 0)
 
        self.SetSizer(sizer)
 
        # and a status bar
        self.CreateStatusBar()
        self.SetStatusText("啟動完成!")
 
        pub.subscribe(self.updateDisplay, "update")
 
    def makeMenuBar(self):
        """
        A menu bar is composed of menus, which are composed of menu items.
        This method builds a set of menus and binds handlers to be called
        when the menu item is selected.
        """
 
        # Make a file menu with Hello and Exit items
        fileMenu = wx.Menu()
        # The "\t..." syntax defines an accelerator key that also triggers
        # the same event
        # helloItem = fileMenu.Append(-1, "&Hello...\tCtrl-H",
        #         "Help string shown in status bar for this menu item")
        self.startItem = fileMenu.Append(-1, "開始",
                                         "開始工作")
        self.stopItem = fileMenu.Append(-1, "停止",
                                        "中斷工作")
        fileMenu.AppendSeparator()
        self.exitItem = fileMenu.Append(-1, "退出",
                                        "退出")
 
        # Now a help menu for the about item
        helpMenu = wx.Menu()
        aboutItem = helpMenu.Append(-1, "關於",
                                    "WxPython 界面與線程通訊的例子")
 
        # Make the menu bar and add the two menus to it. The '&' defines
        # that the next letter is the "mnemonic" for the menu item. On the
        # platforms that support it those letters are underlined and can be
        # triggered from the keyboard.
        self.menuBar = wx.MenuBar()
        self.menuBar.Append(fileMenu, "工作")
        self.menuBar.Append(helpMenu, "信息")
 
        # Give the menu bar to the frame
        self.SetMenuBar(self.menuBar)
        self.stopItem.Enable(False)
 
        self.count = 0
 
        # Finally, associate a handler function with the EVT_MENU event for
        # each of the menu items. That means that when that menu item is
        # activated then the associated handler functin will be called.
        self.Bind(wx.EVT_MENU, self.OnStart, self.startItem)
        self.Bind(wx.EVT_MENU, self.OnStop, self.stopItem)
        self.Bind(wx.EVT_MENU, self.OnExit, self.exitItem)
        self.Bind(wx.EVT_MENU, self.OnAbout, aboutItem)
 
    def OnExit(self, event):
        """Close the frame, terminating the application."""
        try:
            self.work1.stop()
            self.work2.stop()
            sleep(2)
        except:
            pass
        self.Close(True)
        sys.exit()
 
    def OnStart(self, event):
        self.work1 = WorkThread1()
        self.work2 = WorkThread2()
 
    def OnStop(self, event):
        self.work1.stop()
        self.work2.stop()
 
    def OnAbout(self, event):
        """Display an About Dialog"""
        wx.MessageBox("分析工具 v2019",
                      "關於",
                      wx.OK | wx.ICON_INFORMATION)
 
    def updateDisplay(self, mstatus):
        """
        Receives data from thread and updates the display
        """
        # print('pub display', mstatus)
        if mstatus.find("workstart") >= 0:
            self.SetStatusText('開始工作,代碼不提供中斷線程語句,請等待!')
            self.startItem.Enable(False)
            self.stopItem.Enable(True)
            self.exitItem.Enable(False)
        if mstatus.find("workdone") >= 0:
            self.SetStatusText('完成!')
            self.stopItem.Enable(False)
            self.startItem.Enable(True)
            self.exitItem.Enable(True)
        else:
            if mstatus.find("查詢")>=0:
                self.st.SetLabel(mstatus)
            else:
                self.st2.SetLabel(mstatus)
            # if mstatus.find(",") > 0 and mstatus.find("計算") >= 0:
            #     mdata = mstatus.split(',')
            #     # 示范 , 實際使用需要傳送進度
            #     # print(int(mdata[0].replace('計算','')))
            #     g_count = int(mdata[0].replace('計算', ''))
            #     self.gauge.SetValue(g_count)
 
 
def test():
    app = wx.App()
    frm = MainFrame(None, title='分析工具')
    frm.Show()
    app.MainLoop()
 
 
if __name__ == "__main__":
    test()

運行後, 點擊 工作-開始。 2個線程開始工作,直到點擊工作-結束

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: