目錄
- 一、停止線程
- 二、線程信號(hào)的傳遞
- 三、信號(hào)量
- 四、總結(jié)
一、停止線程
利用Threading庫(kù)我們可以很方便地創(chuàng)建線程,讓它按照我們的想法執(zhí)行我們想讓它執(zhí)行的事情,從而加快程序運(yùn)行的效率。然而有一點(diǎn)坑爹的是,線程創(chuàng)建之后,就交給了操作系統(tǒng)執(zhí)行,我們無(wú)法直接結(jié)束一個(gè)線程,也無(wú)法給它發(fā)送信號(hào),無(wú)法調(diào)整它的調(diào)度,也沒(méi)有其他高級(jí)操作。如果想要相關(guān)的功能,只能自己開(kāi)發(fā)。
怎么開(kāi)發(fā)呢?
我們創(chuàng)建線程的時(shí)候指定了target等于一個(gè)我們想讓它執(zhí)行的函數(shù),這個(gè)函數(shù)并不一定是全局函數(shù),實(shí)際上也可以是一個(gè)對(duì)象中的函數(shù)。如果是對(duì)象中的函數(shù),那么我們就可以在這個(gè)函數(shù)當(dāng)中獲取到對(duì)象中的其他信息,我們可以利用這一點(diǎn)來(lái)實(shí)現(xiàn)手動(dòng)控制線程的停止。
說(shuō)起來(lái)好像不太好理解,但是看下代碼真的非常簡(jiǎn)單:
import time
from threading import Thread
class TaskWithSwitch:
def __init__(self):
self._running = True
def terminate(self):
self._running = False
def run(self, n):
while self._running and n > 0:
print('Running {}'.format(n))
n -= 1
time.sleep(1)
c = TaskWithSwitch()
t = Thread(target=c.run, args=(10, ))
t.start()
c.terminate()
t.join()
如果你運(yùn)行這段代碼,會(huì)發(fā)現(xiàn)屏幕上只輸出了10,因?yàn)槲覀儗running這個(gè)字段置為False之后,下次循環(huán)的時(shí)候不再滿足循環(huán)條件,它就會(huì)自己退出了。

如果我們想要用多線程來(lái)讀取IO,由于IO可能存在堵塞,所以可能會(huì)出現(xiàn)線程一直無(wú)法返回的情況。也就是說(shuō)我們?cè)谘h(huán)內(nèi)部卡死了,這個(gè)時(shí)候單純用_running來(lái)判斷還是不夠的,我們需要在線程內(nèi)部設(shè)置計(jì)時(shí)器,防止循環(huán)內(nèi)部的卡死。
class IOTask:
def __init__(self):
self._running = True
def terminate(self):
self._running = False
def run(self, sock):
# 在socket中設(shè)置計(jì)時(shí)器
sock.settimeout(10)
while self._running:
try:
# 由于設(shè)置了計(jì)時(shí)器,所以這里不會(huì)永久等待
data = sock.recv(1024)
break
except socket.timeout:
continue
return
二、線程信號(hào)的傳遞
我們之所以如此費(fèi)勁才能控制線程的運(yùn)行,主要原因是線程的狀態(tài)是不可知的,并且我們無(wú)法直接操作它,因?yàn)樗潜徊僮飨到y(tǒng)管理的。我們運(yùn)行的主線程和創(chuàng)建出來(lái)的線程是獨(dú)立的,兩者之間并沒(méi)有從屬關(guān)系,所以想要實(shí)現(xiàn)對(duì)線程的狀態(tài)進(jìn)行控制,往往需要我們通過(guò)其他手段來(lái)實(shí)現(xiàn)。
我們來(lái)思考一個(gè)場(chǎng)景,假設(shè)我們有一個(gè)任務(wù),需要在另外一個(gè)線程運(yùn)行結(jié)束之后才能開(kāi)始執(zhí)行。要想要實(shí)現(xiàn)這一點(diǎn),就必須對(duì)線程的狀態(tài)有所感知,需要其他線程傳遞出信號(hào)來(lái)才行。我們可以使用threading中的Event工具來(lái)實(shí)現(xiàn)這一點(diǎn)。Event工具就是可以用來(lái)傳遞信號(hào)的,就好像是一個(gè)開(kāi)關(guān),當(dāng)一個(gè)線程執(zhí)行完成之后,會(huì)去啟動(dòng)這個(gè)開(kāi)關(guān)。而這個(gè)開(kāi)關(guān)控制著另外一段邏輯的運(yùn)行。
我們來(lái)看下樣例代碼:
import time
from threading import Thread, Event
def run_in_thread():
time.sleep(1)
print('Thread is running')
t = Thread(target=run_in_thread)
t.start()
print('Main thread print')
我們?cè)诰€程里面就只做了輸出一行提示符,沒(méi)有其他任何邏輯。由于我們?cè)趓un_in_thread函數(shù)當(dāng)中沉睡了1s,所以一定是先輸出Main thread print再輸出的Thread is running。假設(shè)這個(gè)線程是一個(gè)很重要的任務(wù),我們希望主線程能夠等待它運(yùn)行到一個(gè)階段再往下執(zhí)行,我們應(yīng)該怎么辦呢?
注意,這里說(shuō)的是運(yùn)行到一個(gè)階段,并不是運(yùn)行結(jié)束。運(yùn)行結(jié)束我們很好處理,可以通過(guò)join來(lái)完成。但如果不是運(yùn)行結(jié)束,而是運(yùn)行完成了某一個(gè)階段,當(dāng)然通過(guò)join也可以,但是會(huì)損害整體的效率。這個(gè)時(shí)候我們就必須要用上Event了。加上Event之后,我們?cè)賮?lái)看下代碼:
import time
from threading import Thread, Event
def run_in_thread(event):
time.sleep(1)
print('Thread is running')
# set一下event,這樣外面wait的部分就會(huì)被啟動(dòng)
event.set()
# 初始化Event
event = Event()
t = Thread(target=run_in_thread, args=(event, ))
t.start()
# event等待set
event.wait()
print('Main thread print')
整體的邏輯沒(méi)有太多的修改,主要的是增加了幾行關(guān)于Event的使用代碼。
我們?nèi)绻玫紼vent,最好在代碼當(dāng)中只使用一次。當(dāng)然通過(guò)Event中的clear方法我們可以重置Event的值,但問(wèn)題是我們沒(méi)辦法保證重置的這個(gè)邏輯會(huì)在wait之前執(zhí)行。如果是在之后執(zhí)行的,那么就會(huì)問(wèn)題,并且在debug的時(shí)候會(huì)異常痛苦,因?yàn)閎ug不是必現(xiàn)的,而是有時(shí)候會(huì)出現(xiàn)有時(shí)候不會(huì)出現(xiàn)。這種情況往往都是因?yàn)槎嗑€程的使用問(wèn)題。
所以如果要多次使用開(kāi)關(guān)和信號(hào)的話,不要使用Event,可以使用信號(hào)量。
三、信號(hào)量
Event的問(wèn)題在于如果多個(gè)線程在等待Event的發(fā)生,當(dāng)它一旦被set的時(shí)候,那么這些線程都會(huì)同時(shí)執(zhí)行。但有時(shí)候我們并不希望這樣,我們希望可以控制這些線程一個(gè)一個(gè)地運(yùn)行。如果想要做到這一點(diǎn),Event就無(wú)法滿足了,而需要使用信號(hào)量。
信號(hào)量和Event的使用方法類似,不同的是,信號(hào)量可以保證每次只會(huì)啟動(dòng)一個(gè)線程。因?yàn)檫@兩者的底層邏輯不太一致,對(duì)于Event來(lái)說(shuō),它更像是一個(gè)開(kāi)關(guān)。一旦開(kāi)關(guān)啟動(dòng),所有和這個(gè)開(kāi)關(guān)關(guān)聯(lián)的邏輯都會(huì)同時(shí)執(zhí)行。而信號(hào)量則像是許可證,只有拿到許可證的線程才能執(zhí)行工作,并且許可證一次只發(fā)一張。
想要使用信號(hào)量并不需要自己開(kāi)發(fā),thread庫(kù)當(dāng)中為我們提供了現(xiàn)成的工具——Semaphore,我們來(lái)看它的使用代碼:
# 工作線程
def worker(n, sema):
# 等待信號(hào)量
sema.acquire()
print('Working', n)
# 初始化
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
t = threading.Thread(target=worker, args=(n, sema,))
t.start()
在上面的代碼當(dāng)中我們創(chuàng)建了10個(gè)線程,雖然這些線程都被啟動(dòng)了,但是都不會(huì)執(zhí)行邏輯,因?yàn)閟ema.acquire是一個(gè)阻塞方法,沒(méi)有監(jiān)聽(tīng)到信號(hào)量是會(huì)一直掛起等待。

當(dāng)我們釋放信號(hào)量之后,線程被啟動(dòng),才開(kāi)始了執(zhí)行。我們每釋放一個(gè)信號(hào),則會(huì)多啟動(dòng)一個(gè)線程。這里面的邏輯應(yīng)該不難理解。
四、總結(jié)
在并發(fā)場(chǎng)景當(dāng)中,多線程的使用絕不是多啟動(dòng)幾個(gè)線程做不同的任務(wù)而已,我們需要線程間協(xié)作,需要同步、獲取它們的狀態(tài),這是非常不容易的。一不小心就會(huì)出現(xiàn)幽靈bug,時(shí)顯時(shí)隱,這也是并發(fā)問(wèn)題讓人頭疼的主要原因。
以上就是分析Python感知線程狀態(tài)的解決方案之Event與信號(hào)量的詳細(xì)內(nèi)容,更多關(guān)于Python 感知線程狀態(tài) Event與信號(hào)量的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
您可能感興趣的文章:- 像線程一樣管理進(jìn)程的Python multiprocessing庫(kù)
- Python爬蟲之線程池的使用
- Python多線程編程之threading模塊詳解
- 深入理解python多線程編程
- Python一些線程的玩法總結(jié)