<sup id="u4we4"></sup>
  • celery筆記一之celery介紹、啟動和運行結果跟蹤-環球熱文

    來源:博客園

    本文首發于公眾號:Hunter后端原文鏈接:celery筆記一之celery介紹、啟動和運行結果跟蹤


    【資料圖】

    本篇筆記內容如下:

    celery 介紹celery 準備celery 啟動和異步任務的運行運行結果跟蹤1、celery 介紹

    celery 大致有兩種應用場景,一種是異步任務,一種是定時任務。

    比如說在一個接口請求中,某個函數執行所需的時間過長,而前端頁面并不是立刻需要在接口中獲取處理結果,可以將這個函數作為異步任務,先返回給前端處理中的信息,在后臺單獨運行這個函數,這就是異步任務。

    另一個比如說某個函數需要每天晚上運行一遍,不可能人天天守著后臺手動執行一遍這個函數,那么就可以用 celery 來實現這個定時的周期任務。

    接下來介紹一下 celery 的組成:

    task

    這個任務就是我們前面舉的例子的異步任務或者是定時任務,即為 task,我們可以定義這些任務,然后發送到 broker

    broker

    broker 可以理解成消息中間件,用于獲取異步或者定時任務,形成一個或多個消息隊列,然后發送給 worker 處理這些消息

    broker 的形式可以是 Redis,RabbitMQ 或者其他,這里我們使用 Redis 作為消息中間件

    worker

    worker 是處理消息的程序,獲取 broker 中的消息,然后在 worker 中執行,然后根據配置決定將處理結果發送到 backend

    result_backend

    在 worker 處理完消息之后會有 return 或者沒有返回結果,都會根據配置將結果發送出來,可以配置成發送到 redis 中,也可以將之存儲到 database 中

    beat

    主要用于調用定時任務,根據設定好的定時任務,比如每天晚上十點執行某個函數,beat 則會在相應的時間將這個 task 發送給 broker,然后 worker 獲取任務進行處理

    定時任務除了說的每天晚上十點這種周期任務,也可以是間隔任務,比如說每隔多少秒,多少分鐘執行一次

    注意:異步任務的發送是不經過 beat 處理,直接發送給 broker 的

    在上面的結構中,broker 需要將相應的服務比如 redis 運行起來,而 worker 和 beat 需要在手動用程序運行,而且每次更改了定時策略之后需要重新啟動 beat 和 worker 才能生效。

    2、celery 準備

    接下來我們實現一個最簡單的異步任務,在執行異步任務前,我們做如下的準備工作

    1.安裝依賴

    我們需要安裝一下 celery 和 redis 的依賴:

    pip3 install celery==5.1.2 -i https://mirrors.aliyun.com/pypi/simple/pip3 install redis==3.5.3 -i https://mirrors.aliyun.com/pypi/simple/
    2.消息中間件

    這里我們用到的消息中間件是 redis,可以去官網下載一個 redis 啟動,也可以使用 docker 來執行安裝。

    我在之前的 docker 系列筆記中有介紹過如何拉取鏡像和運行容器,我們這里直接使用 docker 來運行:

    docker run -itd -p 6379:6379 redis:latest
    3.異步任務準備

    我們準備一個最簡單的 add() 函數,放在 tasks.py 文件中:

    # tasks.pyfrom celery import Celeryapp = Celery("tasks", broker="redis://localhost/0", backend="redis://localhost/1")@app.taskdef add(x, y):     return x + y

    在這段代碼里,我們引入 Celery 模塊,并將其實例化為 app,且配置了 broker 參數,表示消息隊列都會被放在 redis 的第一個數據庫下

    指定的 backend 參數則表示函數運行的結果被放在 redis 的第二個數據庫下

    然后用 @app.task 修飾 add 函數,表示它是 app 下的 task 任務

    以上,我們的準備工作就完成了,接下來嘗試運行這個異步任務

    3、celery 啟動和異步任務的運行

    說是 celery 的啟動,其實是 worker 的啟動,中間件是 redis,已經在前面的步驟中啟動了。

    我們在 tasks.py 所在的文件夾下執行下面的命令:

    celery -A tasks worker -l INFO

    在這里,tasks 是我們任務所在的文件名,worker 表示啟動的是 worker 程序

    -l INFO 則會在控制臺打印出 worker 接收到的消息詳情,如果不執行,則信息流不會被打印出來

    執行了上面的程序后,可以看到控制臺會輸出下面這種信息:

    -------------- celery@localhost v5.1.2 (sun-harmonics)--- ***** ----- -- ******* ---- Darwin-21.4.0-x86_64-i386-64bit 2022-07-17 23:56:09- *** --- * --- - ** ---------- [config]- ** ---------- .> app:         tasks:0x7fc8ddf3df98- ** ---------- .> transport:   redis://localhost:6379/0- ** ---------- .> results:     disabled://- *** --- * --- .> concurrency: 12 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** -----  -------------- [queues]                .> celery           exchange=celery(direct) key=celery                [tasks]  . tasks.add[2022-07-17 23:56:09,685: INFO/MainProcess] Connected to redis://localhost:6379/0[2022-07-17 23:56:09,699: INFO/MainProcess] mingle: searching for neighbors[2022-07-17 23:56:10,737: INFO/MainProcess] mingle: all alone[2022-07-17 23:56:10,780: INFO/MainProcess] celery@localhost ready.

    則表示 worker 啟動成功

    執行異步任務

    在另一個 shell 窗口,進入 python 的交互界面,輸入以下命令:

    from tasks import addres = add.delay(1,2)

    add 是我們需要執行的異步任務的函數名

    delay 是異步任務執行的特有方法,這個其實是 apply_async() 函數的簡便寫法,不帶任何參數,apply_async() 除了可以實現異步任務的功能,還可以指定多少時間后執行,比如說二十秒后執行,這個在后面的筆記中我們再介紹。

    而異步任務的返回我們這里用 res 來定義,它是一個包含了這個任務所有執行信息對象,有任務狀態(是否執行成功),有返回結果(add() 函數的return),還有這個 task 特有的標識 id等信息

    至此,我們的一個異步任務的執行就完成了,我們可以在下一步查看它的運行結果等信息。

    4、運行結果跟蹤

    接下來,我們在 tasks.py 中建立下面幾個函數,來測試我們對結果的跟蹤:

    # tasks.pyimport timefrom celery import Celeryapp = Celery("tasks", broker="redis://localhost/0", backend="redis://localhost/1")@app.taskdef add(x, y):    return x + y@app.taskdef div(x, y):    return x / y@app.taskdef test_not_finished():    time.sleep(30)    return True

    然后重新運行 worker:

    celery -A tasks worker -l INFO

    然后引入和執行函數:

    from tasks import add, div, test_not_finished

    獲取延時任務的結果

    res = add.delay(1, 2)print(res.result)# 也可以使用 get() print(res.get())

    get() 函數也可以加個超時的設置:

    res.get(timeout=2)

    但是這樣需要注意,因為如果超時了還未獲取到結果,程序就會報錯

    判斷函數運行是否完成

    print(res.ready())

    打印出的結果為 True 則表示函數運行完成

    我們可以測試函數為完成的狀態:

    res2 = test_not_finished.delay()

    在這個函數里,我們設置了 30s 的休眠,所以在 30s 內我們打印結果可以看到 res2.ready() 是為 False 的:

    print(res2.ready())

    獲取task id

    每個被執行的 task 都有各自對應的 id 作為它們的唯一鍵:

    print(res.id)

    查看任務執行的狀態

    # 任務執行是否失敗,返回 布爾型數據is_failed = res.failed()# 任務執行是否成功,返回布爾型數據is_successful = res.successful()# 執行的任務所處的狀態state = res.state# state 的值會在 PENDING,STARTED,SUCCESS,RETRY,FAILURE 這幾種狀態中,分別是 待處理中,任務已經開始,成功,重試中,失敗

    報錯處理

    如果執行的延時任務在程序中報錯,比如我們定義的 div() 函數,我們傳入的除數為 0 的話,在程序中是會報錯的,我們使用 get() 來獲取結果的話程序是會報錯的:

    res3 = div.delay(3, 0)res3.get()# 返回會報錯

    但是我們可以使用 propagate=False 參數來忽略程序的報錯:

    res3.get(propagate=False)

    這樣我們獲取的就不是程序報錯,而是程序報錯的信息作為結果返回

    使用 res3.state 發現返回的結果是 FAILURE

    當延時任務在程序中報錯,它的返回值就不會是正確的,我們可以通過 res3.traceback 是否有值來判斷函數運行過程中是有報錯:

    if res3.traceback:    print("延時任務報錯")else:    print("程序正常運行,可以獲取返回值")

    result資源釋放

    因為 backend 會使用資源來保存和傳輸結果,為了確保資源被釋放,所以在執行完異步任務后,你必須對每一個結果調用 get() 或者 forget() 函數

    result.get() 函數獲取結果

    result.forget() 在 backend 刪掉該數據

    在官方文檔上,意思是 get() 和 forget() 方法都可以釋放資源,但是經過我測試,貌似只有 forget() 函數會釋放資源

    查看是否資源被釋放也很簡單,登錄到對應的 backend,我這里是 redis,使用 redis-cli 或者通過 docker 進入 redis:

    select 1keys*

    查看相應的 task id 是否還在列表就可以知道該資源是否被釋放

    如果不想手動釋放資源,可以在配置里設置一個過期時間,那么結果就會在指定時間段后被釋放:

    app.conf.update(result_expires=60)

    這個我們可以在后面的配置里再詳細介紹。

    以上就是本篇筆記全部內容,下一篇筆記我們將介紹如何建立一個 celery 項目、配置的幾種方法及一些基本的配置。

    如果想獲取更多后端相關文章,可掃碼關注閱讀:

    標簽:

    推薦

    財富更多》

    動態更多》

    熱點

    欧美xxxxxxxxxx,91热久久免费频精品动漫99,午夜拍拍福利视频蜜桃视频,91aaa免费观看在线观看资源
    <sup id="u4we4"></sup>
  • 主站蜘蛛池模板: 欧洲多毛裸体XXXXX| 狠狠色噜噜狠狠狠狠97不卡 | 丰满肥臀风间由美系列| 色与欲影视天天看综合网| 性一交一乱一伦一| 伊人色在线观看| 2021国产精品自在拍在线播放| 最近的2019中文字幕hd| 国产一级毛片高清视频完整版| 一级毛片60分钟在线播放久草高清在线 | 欧美激情videossex护士| 夫妇野外交换hd中文小说| 亚洲线精品一区二区三区| 1024手机看片基地| 日韩在线|中文| 四虎影视884aa·com| avtt天堂网手机版亚洲| 欧美丰满白嫩bbxx| 国产精品亚洲专区无码WEB| 久久精品国产亚洲AV麻豆网站| 美女免费视频黄的| 国内精品久久久久精品| 亚洲欧美中文日韩在线| 麻豆国产一区二区在线观看| 成人狠狠色综合| 亚洲激情视频图片| 57pao国产成视频免费播放| 日韩免费观看的一级毛片| 午夜无码人妻av大片色欲| 78成人精品电影在线播放 | 中文字幕影片免费在线观看| 特级全黄一级毛片视频| 国产无套粉嫩白浆在线观看| 中国成人在线视频| 欧美日韩在线影院| 国产三级精品三级在线观看| avidolzhd| 日本欧美视频在线观看| 俄罗斯一级成人毛片| 国产福利免费视频| 婷婷五月综合色中文字幕|