在實(shí)際開發(fā)中我們經(jīng)常會碰上一些重復(fù)性或周期性的任務(wù),比如像每天定時(shí)爬取某個(gè)網(wǎng)站的數(shù)據(jù)、一定周期定時(shí)運(yùn)行代碼訓(xùn)練模型等,類似這類的任務(wù)通常需要我們手動來進(jìn)行設(shè)定或調(diào)度,以便其能夠在我們設(shè)定好的時(shí)間內(nèi)運(yùn)行。
在 Windows 上我們可以通過計(jì)劃任務(wù)來手動實(shí)現(xiàn),而在 Linux 系統(tǒng)上往往我們會用到更多關(guān)于 crontab 的相關(guān)操作。但手動管理并不是一個(gè)很好的選擇,如果我們需要有十幾個(gè)不同的定時(shí)任務(wù)需要管理,那么每次通過人工來進(jìn)行干預(yù)未免有些笨拙,那這時(shí)候就真的是「人工智能」了。
所以將這些定時(shí)任務(wù)的調(diào)度代碼化才是能夠讓我們很好地從這種手動管理的純?nèi)肆Σ僮髦薪饷摮鰜怼?/p>
在 Python 生態(tài)中對于定時(shí)任務(wù)的一些操作主要有那么幾個(gè):
所以為了滿足能夠相對復(fù)雜的時(shí)間條件,又不需要在前期的環(huán)境搭建上花費(fèi)很多時(shí)間的前提下,選擇 APScheduler 來對我們的調(diào)度任務(wù)或定時(shí)任務(wù)進(jìn)行管理是個(gè)性價(jià)比極高的選擇。而本文主要會帶你快速上手有關(guān) APScheduler 的使用。
雖然說官方文檔上的內(nèi)容不是很多,而且所列舉的 API 不是很多,但這側(cè)面也反映了這一框架的簡單易用。所以在使用 APScheduler 之前,我們需要對這個(gè)框架的一些概念簡單了解,主要有那么以下幾個(gè):
所謂的觸發(fā)器就是用以觸發(fā)定時(shí)任務(wù)的組件,在 APScheduler 中主要是指時(shí)間觸發(fā)器,并且主要有三類時(shí)間觸發(fā)器可供使用:
任務(wù)持久化主要是用于將設(shè)定好的調(diào)度任務(wù)進(jìn)行存儲,即便是程序因?yàn)橐馔馇闆r,如斷電、電腦或服務(wù)器重啟時(shí),只要重新運(yùn)行程序時(shí),APScheduler 就會根據(jù)對存儲好的調(diào)度任務(wù)結(jié)果進(jìn)行判斷,如果出現(xiàn)已經(jīng)過期但未執(zhí)行的情況會進(jìn)行相應(yīng)的操作。
APScheduler 為我們提供了多種持久化任務(wù)的途徑,默認(rèn)是使用 memory 也就是內(nèi)存的形式,但內(nèi)存并不是持久化最好的方式。最好的方式則是通過像數(shù)據(jù)庫這樣的載體來將我們的定時(shí)任務(wù)寫入到磁盤當(dāng)中,只要磁盤沒有損壞就能將數(shù)據(jù)給恢復(fù)。
APScheduler 支持的且常用的數(shù)據(jù)庫主要有:
通常我們可以在創(chuàng)建 Scheduler 實(shí)例時(shí)創(chuàng)建,或是單獨(dú)為任務(wù)指定。配置的方式相對簡單,我們只需要指定對應(yīng)的數(shù)據(jù)庫鏈接即可。
執(zhí)行器顧名思義就是執(zhí)行我們?nèi)蝿?wù)的對象,在計(jì)算機(jī)內(nèi)通常要么是 CPU 調(diào)度任務(wù),要么是單獨(dú)維護(hù)一個(gè)線程來運(yùn)行任務(wù)。所以 APScheduler 里的執(zhí)行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 這樣的線程池和進(jìn)程池兩種。
當(dāng)然如果是和協(xié)程或異步相關(guān)的任務(wù)調(diào)度,還可以使用對應(yīng)的 AsyncIOExecutor、TwistedExecutor 和 GeventExecutor 三種執(zhí)行器。
調(diào)度器的選擇主要取決于你當(dāng)前的程序環(huán)境以及 APScheduler 的用途。根據(jù)用途的不同,APScheduler 又提供了以下幾種調(diào)度器:
通常情況下如果不是和 Web 項(xiàng)目或應(yīng)用集成共存,那么往往都首選 BlockingScheduler 調(diào)度器來進(jìn)行操作,它會在當(dāng)前進(jìn)程中啟動相應(yīng)的線程來進(jìn)行任務(wù)調(diào)度與處理;反之,如果是和 Web 項(xiàng)目或應(yīng)用共存,那么需要選擇 BackgroundScheduler 調(diào)度器,因?yàn)樗粫蓴_當(dāng)前應(yīng)用的線程或進(jìn)程狀況。
基于對以上的概念和組件認(rèn)識,我們就能基本上摸清 APScheduler 的運(yùn)行流程:
雖然 APScheduler 里面的概念和組件看起來有點(diǎn)多,但在使用上并不算很復(fù)雜,我們可以通過本節(jié)的示例就能夠很快使用。
選擇對應(yīng)的 scheduler
在使用之前我們需要先實(shí)例化一個(gè) scheduler 對象,所有的 scheduler 對象都被放在了 apscheduler.schedulers 模塊下,我們可以直接通過查看 API 文檔或者借助 IDE 補(bǔ)全的提示來獲取相應(yīng)的 scheduler 對象。
這里我直接選取了最基礎(chǔ)的 BlockingScheduler:
# main.py from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler()
配置 scheduler
對于 scheduler 的一些配置我們可以直接在實(shí)例化對象時(shí)就進(jìn)行配置,當(dāng)然也可以在創(chuàng)建實(shí)例化對象之后再進(jìn)行配置。
實(shí)例化時(shí)進(jìn)行參數(shù)配置:
# main.py from datetime import datetime from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.blocking import BlockingScheduler # 任務(wù)持久化 使用 SQLite jobstores = { 'default': SQLAlchemyJobStore(url = 'sqlite:///jobs.db') } # 執(zhí)行器配置 executors = { 'default': ThreadPoolExecutor(20), } # 關(guān)于 Job 的相關(guān)配置,見官方文檔 API job_defaults = { 'coalesce': False, 'next_run_time': datetime.now() } scheduler = BlockingScheduler( jobstores = jobstores, executors = executors, job_defaults = job_defaults, timezone = 'Asia/Shanghai' )
或是通過 scheduler.configure 方法進(jìn)行同樣的操作:
scheduler = BlockingScheduler() scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai')
創(chuàng)建 scheduler 對象之后,我們需要調(diào)用其下的 add_job() 或是 scheduled_job() 方法來將我們需要執(zhí)行的函數(shù)進(jìn)行注冊。前者是以傳參的形式指定對應(yīng)的函數(shù)名,而后者則是以裝飾器的形式直接對我們要執(zhí)行的函數(shù)進(jìn)行修飾。
比如我現(xiàn)在有一個(gè)輸出此時(shí)此刻時(shí)間的函數(shù) now():
from datetime import datetime def now(trigger): print(f"trigger:{trigger} -> {datetime.now()}")
然后我打算每 5 秒的時(shí)候運(yùn)行一次,那我們使用 add_job() 可以這樣寫:
if __name__ == '__main__': scheduler.add_job(now, trigger = "interval", args = ("interval",), seconds = 5) scheduler.start()
在調(diào)用 start() 方法之后調(diào)度器就會開始執(zhí)行,并在控制臺上看到對應(yīng)的結(jié)果了:
trigger:interval -> 2021-01-16 21:19:43.356674
trigger:interval -> 2021-01-16 21:19:46.679849
trigger:interval -> 2021-01-16 21:19:48.356595
當(dāng)然使用 @scheduled_job 的方式來裝飾我們的任務(wù)或許會更加自由一些,于是上面的例子就可以寫成這樣:
@scheduler.scheduled_job(trigger = "interval", args = ("interval",), seconds = 5) def now(trigger): print(f"trigger:{trigger} -> {datetime.now()}") if __name__ == '__main__': scheduler.start()
運(yùn)行之后就會在控制臺看到同樣的結(jié)果了。
不過需要注意的是,添加任務(wù)一定要在 start() 方法執(zhí)行前調(diào)用,否則會找不到任務(wù)或是拋出異常。
如果你是正在做有關(guān)的 Web 項(xiàng)目且存在一些定時(shí)任務(wù),那么得益于 APScheduler 由于多樣的調(diào)度器,我們能夠?qū)⑵浜臀覀兊捻?xiàng)目結(jié)合到一起。
如果你正在使用 Flask,那么 Flask-APScheduler 這一別人寫好的第三方包裝庫就很適合你,雖然它沒有相關(guān)的文檔,但只要你了解了前面我所介紹的有關(guān)于 APScheduler 的概念和組件,你就能很輕易地看懂這個(gè)第三方庫倉庫里的示例代碼。
如果你使用的不是 Flask 框架,那么 APScheduler 本身也提供了一些對任務(wù)或作業(yè)的增刪改查操作,我們可以自己編寫一套合適的 API。
這里我使用的是 FastAPI 這一目前流行的 Web 框架。demo 項(xiàng)目結(jié)構(gòu)如下:
temp-scheduler ├── config.py # 配置項(xiàng) ├── main.py # API 文件 └── scheduler.py # APScheduler 相關(guān)設(shè)置
這里我們需要的依賴不多,只需要簡單幾個(gè)即可:
pip install fastapi apscheduler sqlalchemy uvicorn
如果項(xiàng)目中模塊過多,那么使用一個(gè)文件或模塊來進(jìn)行統(tǒng)一管理是最好的選擇。這里的 config.py 我們主要像 Flask 的配置那樣簡單設(shè)定:
from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.blocking import BlockingScheduler class SchedulerConfig: JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///job.db")} EXECUTORS = {"default": ThreadPoolExecutor(20)} JOB_DEFAULTS = {"coalesce": False} @classmethod def to_dict(cls): return { "jobstores": cls.JOBSTORES, "executors": cls.EXECUTORS, "job_defaults": cls.JOB_DEFAULTS, }
在 SchedulerConfig 配置項(xiàng)中我們可以自己實(shí)現(xiàn)一個(gè) to_dict() 類方法,以便我們后續(xù)傳參時(shí)通過解包的方式直接傳入配置參數(shù)即可。
scheduler.py 模塊的設(shè)定也比較簡單,即設(shè)定對應(yīng)的 scheduler 調(diào)度器即可。由于是演示 demo 我還將要定期執(zhí)行的任務(wù)也放在了這個(gè)模塊當(dāng)中:
import logging from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from config import SchedulerConfig scheduler = BackgroundScheduler() logger = logging.getLogger(__name__) def init_scheduler() -> None: # config scheduler scheduler.configure(**SchedulerConfig.to_dict()) logger.info("scheduler is running...") # schedule test scheduler.add_job( func=mytask, trigger="date", args=("APScheduler Initialize.",), next_run_time=datetime.now(), ) scheduler.start() def mytask(message: str) -> None: print(f"[{datetime.now()}] message: {message}")
在這一部分中:
在 main.py 模塊就主要存放著我們由 FastAPI 所構(gòu)建的相關(guān) API。如果在后續(xù)開發(fā)時(shí)存在多個(gè)接口,此時(shí)就需要將不同接口放在不同模塊文件中,以達(dá)到路由的分發(fā)與管理,類似于 Flask 的藍(lán)圖模式。
import logging import uuid from datetime import datetime from typing import Any, Dict, Optional, Sequence, Union from fastapi import FastAPI from pydantic import BaseModel from scheduler import init_scheduler, mytask, scheduler logger = logging.getLogger(__name__) app = FastAPI(title="APScheduler API") app.add_event_handler("startup", init_scheduler) class Job(BaseModel): id: Union[int, str, uuid.UUID] name: Optional[str] = None func: Optional[str] = None args: Optional[Sequence[Optional[str]]] = None kwargs: Optional[Dict[str, Any]] = None executor: Optional[str] = None misfire_grace_time: Optional[str] = None coalesce: Optional[bool] = None max_instances: Optional[int] = None next_run_time: Optional[Union[str, datetime]] = None @app.post("/add") def add_job( message: str, trigger: str, trigger_args: Optional[dict], id: Union[str, int, uuid.UUID], ): try: scheduler.add_job( func=mytask, trigger=trigger, kwargs={"message": message}, id=id, **trigger_args, ) except Exception as e: logger.exception(e.args) return {"status_code": 0, "message": "添加失敗"} return {"status_code": 1, "message": "添加成功"} @app.delete("/delete/{id}") def delete_job(id: Union[str, int, uuid.UUID]): """delete exist job by id""" try: scheduler.remove_job(job_id=id) except Exception: return dict( message="刪除失敗", status_code=0, ) return dict( message="刪除成功", status_code=1, ) @app.put("/reschedule/{id}") def reschedule_job( id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict] ): try: scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args) except Exception as e: logger.exception(e.args) return dict( message="修改失敗", status_code=0, ) return dict( message="修改成功", status_code=1, ) @app.get("/job") def get_all_jobs(): jobs = None try: job_list = scheduler.get_jobs() if job_list: jobs = [Job(**task.__getstate__()) for task in job_list] except Exception as e: logger.exception(e.args) return dict( message="查詢失敗", status_code=0, jobs=jobs, ) return dict( message="查詢成功", status_code=1, jobs=jobs, ) @app.get("/job/{id}") def get_job_by_id(id: Union[int, str, uuid.UUID]): jobs = [] try: job = scheduler.get_job(job_id=id) if job: jobs = [Job(**job.__getstate__())] except Exception as e: logger.exception(e.args) return dict( message="查詢失敗", status_code=0, jobs=jobs, ) return dict( message="查詢成功", status_code=1, jobs=jobs, )
以上代碼看起來很多,其實(shí)核心的就那么幾點(diǎn):
FastAPI 對象 app 的初始化。這里用到的 add_event_handler() 方法就有點(diǎn)像 Flask 中的 before_first_request,會在 Web 服務(wù)請求伊始進(jìn)行操作,理解為初始化相關(guān)的操作即可。
API 接口路由。路由通過 app 對象下的對應(yīng) HTTP 方法來實(shí)現(xiàn),如 GET、POST、PUT 等。這里的裝飾器用法其實(shí)也和 Flask 很類似,就不多贅述。
scheduler 對象的增刪改查。從 scheduler.py 模塊中引入我們創(chuàng)建好的 scheduler 對象之后就可以直接用來做增刪改查的操作:
完成以上的所有操作之后,我們就可以打開控制臺,進(jìn)入到該目錄下并激活我們的虛擬環(huán)境,之后運(yùn)行:
uvicorn main:app
之后我們就能在 FastAPI 默認(rèn)的地址 http://127.0.0.1:8000/docs 中看到關(guān)于全部接口的 Swagger 文檔頁面了:
fastapi 集成的 swagger 頁面
之后我們可以直接在文檔里面或使用 Postman 來自己進(jìn)行接口測試即可。
本文介紹了有關(guān)于 APScheduler 框架的概念及其用法,并進(jìn)行了簡單的實(shí)踐。
得益于 APScheduler 的模塊化設(shè)計(jì)才可以讓我們更方便地去理解、使用它,并將其運(yùn)用到我們實(shí)際的開發(fā)過程中。
從 APScheduler 目前的 Github 倉庫代碼以及 issue 來看,作者已經(jīng)在開始重構(gòu) 4.0 版本,當(dāng)中的一些源代碼和 API 也有較大的變動,相信在 4.0 版本中將會引入更多的新特性。
但如果現(xiàn)階段你正打算使用或已經(jīng)使用 APScheduler 用于實(shí)際生產(chǎn)中,那么希望本文能對會你有所幫助。
到此這篇關(guān)于5分鐘快速掌握Python定時(shí)任務(wù)框架的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Python 定時(shí)任務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
標(biāo)簽:綿陽 百色 淮安 秦皇島 周口 合肥 周口 綏化
巨人網(wǎng)絡(luò)通訊聲明:本文標(biāo)題《5分鐘快速掌握Python定時(shí)任務(wù)框架的實(shí)現(xiàn)》,本文關(guān)鍵詞 5分鐘,快速,掌握,Python,定時(shí),;如發(fā)現(xiàn)本文內(nèi)容存在版權(quán)問題,煩請?zhí)峁┫嚓P(guān)信息告之我們,我們將及時(shí)溝通與處理。本站內(nèi)容系統(tǒng)采集于網(wǎng)絡(luò),涉及言論、版權(quán)與本站無關(guān)。