淺談Python響應式類庫RxPy
一、基本概念
Reactive X中有幾個核心的概念,先來簡單介紹一下。
1.1、Observable和Observer(可觀察對象和觀察者)
首先是Observable和Observer,它們分別是可觀察對象和觀察者。Observable可以理解為一個異步的數據源,會發送一系列的值。Observer則類似於消費者,需要先訂閱Observable,然後才可以接收到其發射的值。可以說這組概念是設計模式中的觀察者模式和生產者-消費者模式的綜合體。
1.2、Operator(操作符)
另外一個非常重要的概念就是操作符瞭。操作符作用於Observable的數據流上,可以對其施加各種各樣的操作。更重要的是,操作符還可以鏈式組合起來。這樣的鏈式函數調用不僅將數據和操作分隔開來,而且代碼更加清晰可讀。一旦熟練掌握之後,你就會愛上這種感覺的。
1.3、Single(單例)
在RxJava和其變體中,還有一個比較特殊的概念叫做Single,它是一種隻會發射同一個值的Observable,說白瞭就是單例。當然如果你對Java等語言比較熟悉,那麼單例想必也很熟悉。
1.4、Subject(主體)
主體這個概念非常特殊,它既是Observable又是Observer。正是因為這個特點,所以Subject可以訂閱其他Observable,也可以將發射對象給其他Observer。在某些場景中,Subject會有很大的作用。
1.5、Scheduler(調度器)
默認情況下Reactive X隻運行在當前線程下,但是如果有需要的話,也可以用調度器來讓Reactive X運行在多線程環境下。有很多調度器和對應的操作符,可以處理多線程場景下的各種要求。
1.6、Observer和Observable
先來看看一個最簡單的例子,運行的結果會依次打印這些數字。這裡的of是一個操作符,可以根據給定的參數創建一個新的Observable。創建之後,就可以訂閱Observable,三個回調方法在對應的時機執行。一旦Observer訂閱瞭Observable,就會接收到後續Observable發射的各項值。
from rx import of ob = of(1, 2, 34, 5, 6, 7, 7) ob.subscribe( on_next=lambda i: print(f'Received: {i}'), on_error=lambda e: print(f'Error: {e}'), on_completed=lambda: print('Completed') )
這個例子看起來好像很簡單,並且看起來沒什麼用。但是當你瞭解瞭Rx的一些核心概念,就會理解到這是一個多麼強大的工具。更重要的是,Observable生成數據和訂閱的過程是異步的,如果你熟悉的話,就可以利用這個特性做很多事情。
1.7、操作符
在RxPy中另一個非常重要的概念就是操作符瞭,甚至可以說操作符就是最重要的一個概念瞭。幾乎所有的功能都可以通過組合各個操作符來實現。熟練掌握操作符就是學好RxPy的關鍵瞭。操作符之間也可以用pipe函數連接起來,構成復雜的操作鏈。
from rx import of, operators as op import rx ob = of(1, 2, 34, 5, 6, 7, 7) ob.pipe( op.map(lambda i: i ** 2), op.filter(lambda i: i >= 10) ).subscribe(lambda i: print(f'Received: {i}'))
在RxPy中有大量操作符,可以完成各種各樣的功能。我們來簡單看看其中一些常用的操作符。如果你熟悉Java8的流類庫或者其他函數式編程類庫的話,應該對這些操作符感到非常親切。
1.8、創建型操作符
首先是創建Observable的操作符,列舉瞭一些比較常用的創建型操作符。
1.9、過濾型操作符
過濾型操作符的主要作用是對Observable進行篩選和過濾。
1.10、轉換型操作符
1.11、算術操作符
1.12、Subject
Subject是一種特殊的對象,它既是Observer又是Observable。不過這個對象一般不太常用,但是假如某些用途還是很有用的。所以還是要介紹一下。下面的代碼,因為訂閱的時候第一個值已經發射出去瞭,所以隻會打印訂閱之後才發射的值。
from rx.subject import Subject, AsyncSubject, BehaviorSubject, ReplaySubject # Subject同時是Observer和Observable print('--------Subject---------') subject = Subject() subject.on_next(1) subject.subscribe(lambda i: print(i)) subject.on_next(2) subject.on_next(3) subject.on_next(4) subject.on_completed() # 2 3 4
另外還有幾個特殊的Subject,下面來介紹一下。
1.13、ReplaySubject
ReplaySubject是一個特殊的Subject,它會記錄所有發射過的值,不論什麼時候訂閱的。所以它可以用來當做緩存來使用。ReplaySubject還可以接受一個bufferSize參數,指定可以緩存的最近數據數,默認情況下是全部。
下面的代碼和上面的代碼幾乎完全一樣,但是因為使用瞭ReplaySubject,所以所有的值都會被打印。當然大傢也可以試試把訂閱語句放到其他位置,看看輸出是否會產生變化。
# ReplaySubject會緩存所有值,如果指定參數的話隻會緩存最近的幾個值 print('--------ReplaySubject---------') subject = ReplaySubject() subject.on_next(1) subject.subscribe(lambda i: print(i)) subject.on_next(2) subject.on_next(3) subject.on_next(4) subject.on_completed() # 1 2 3 4
1.14、BehaviorSubject
BehaviorSubject是一個特殊的Subject,它隻會記錄最近一次發射的值。而且在創建它的時候,必須指定一個初始值,所有訂閱它的對象都可以接收到這個初始值。當然如果訂閱的晚瞭,這個初始值同樣會被後面發射的值覆蓋,這一點要註意。
# BehaviorSubject會緩存上次發射的值,除非Observable已經關閉 print('--------BehaviorSubject---------') subject = BehaviorSubject(0) subject.on_next(1) subject.on_next(2) subject.subscribe(lambda i: print(i)) subject.on_next(3) subject.on_next(4) subject.on_completed() # 2 3 4
1.15、AsyncSubject
AsyncSubject是一個特殊的Subject,顧名思義它是一個異步的Subject,它隻會在Observer完成的時候發射數據,而且隻會發射最後一個數據。因此下面的代碼僅僅會輸出4.假如註釋掉最後一行co_completed調用,那麼什麼也不會輸出。
# AsyncSubject會緩存上次發射的值,而且僅會在Observable關閉後開始發射 print('--------AsyncSubject---------') subject = AsyncSubject() subject.on_next(1) subject.on_next(2) subject.subscribe(lambda i: print(i)) subject.on_next(3) subject.on_next(4) subject.on_completed() # 4
1.16、Scheduler
雖然RxPy算是異步的框架,但是其實它默認還是運行在單個線程之上的,因此如果使用瞭某些會阻礙線程運行的操作,那麼程序就會卡死。當然針對這些情況,我們就可以使用其他的Scheduler來調度任務,保證程序能夠高效運行。
下面的例子創建瞭一個ThreadPoolScheduler,它是基於線程池的調度器。兩個Observable用subscribe_on方法指定瞭調度器,因此它們會使用不同的線程來工作。
import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as op import multiprocessing import time import threading import random def long_work(value): time.sleep(random.randint(5, 20) / 10) return value pool_schedular = ThreadPoolScheduler(multiprocessing.cpu_count()) rx.range(5).pipe( op.map(lambda i: long_work(i + 1)), op.subscribe_on(pool_schedular) ).subscribe(lambda i: print(f'Work 1: {threading.current_thread().name}, {i}')) rx.of(1, 2, 3, 4, 5).pipe( op.map(lambda i: i * 2), op.subscribe_on(pool_schedular) ).subscribe(lambda i: print(f'Work 2: {threading.current_thread().name}, {i}'))
如果你觀察過各個操作符的API的話,可以發現大部分操作符都支持可選的Scheduler參數,為操作符指定一個調度器。如果操作符上指定瞭調度器的話,會優先使用這個調度器;其次的話,會使用subscribe方法上指定的調度器;如果以上都沒有指定的話,就會使用默認的調度器。
二、應用場景
好瞭,介紹瞭一些Reactive X的知識之後,下面來看看如何來使用Reactive X。在很多應用場景下,都可以利用Reactive X來抽象數據處理,把概念簡單化。
2.1、防止重復發送
很多情況下我們都需要控制事件的發生間隔,比如有一個按鈕不小心按瞭好幾次,隻希望第一次按鈕生效。這種情況下可以使用debounce操作符,它會過濾Observable,小於指定時間間隔的數據會被過濾掉。debounce操作符會等待一段時間,直到過瞭間隔時間,才會發射最後一次的數據。如果想要過濾後面的數據,發送第一次的數據,則要使用throttle_first操作符。
下面的代碼可以比較好的演示這個操作符,快速按回車鍵發送數據,註意觀察按鍵和數據顯示之間的關系,還可以把throttle_first操作符換成debounce操作符,然後再看看輸出會發生什麼變化,還可以完全註釋掉pipe中的操作符,再看看輸出會有什麼變化。
import rx from rx import operators as op from rx.subject import Subject import datetime # debounce操作符,僅在時間間隔之外的可以發射 ob = Subject() ob.pipe( op.throttle_first(3) # op.debounce(3) ).subscribe( on_next=lambda i: print(i), on_completed=lambda: print('Completed') ) print('press enter to print, press other key to exit') while True: s = input() if s == '': ob.on_next(datetime.datetime.now().time()) else: ob.on_completed() break
2.2、操作數據流
如果需要對一些數據進行操作,那麼同樣有一大堆操作符可以滿足需求。當然這部分功能並不是Reactive X獨有的,如果你對Java 8的流類庫有所瞭解,會發現這兩者這方面的功能幾乎是完全一樣的。
下面是個簡單的例子,將兩個數據源結合起來,然後找出來其中所有的偶數。
import rx from rx import operators as op from rx.subject import Subject import datetime # 操作數據流 some_data = rx.of(1, 2, 3, 4, 5, 6, 7, 8) some_data2 = rx.from_iterable(range(10, 20)) some_data.pipe( op.merge(some_data2), op.filter(lambda i: i % 2 == 0), # op.map(lambda i: i * 2) ).subscribe(lambda i: print(i))
再或者一個利用reduce的簡單例子,求1-100的整數和。
import rx from rx import operators as op from rx.subject import Subject import datetime rx.range(1, 101).pipe( op.reduce(lambda acc, i: acc + i, 0) ).subscribe(lambda i: print(i))
以上就是淺談Python響應式類庫RxPy的詳細內容,更多關於Python響應式類庫RxPy的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- JavaScript中rxjs與 Observable 兩大類操作符解析
- Rxjs map, mergeMap 和 switchMap 的區別與聯系
- RxJava構建流基本原理示例解析
- RxJava實戰之訂閱流基本原理示例解析
- Python中使用subprocess庫創建附加進程