Celery是一个基于Python开发的分布式任务队列,关于Celery的更多介绍可以参考官网。
要使用Celery
我们需要一个Broker
和Backend
,这里我们使用Redis
。
- GitHub官网:https://github.com/celery/celery
- Docs:http://docs.celeryproject.org/en/latest/
- Broker:接收和发送任务消息
- Backend:存储任务执行结果
Celery不支持Windows
- redis
这里为了方便起见,我们使用Docker
来快速启动一个Redis
服务。
➜ ~ docker pull redis
➜ ~ docker run -d --name redis -p 6379:6379 redis
- celery
我已经通过pyenv
创建好了一个虚拟环境,切换过去即可
➜ ~ pyenv activate venv
(venv) ➜ ~ pip install celery
因为我们的Broker
和Backend
使用的都是Redis
,所以还需要安装redis
模块
(venv) ➜ ~ pip install redis
创建一个文件tasks.py
,用来存放任务列表
from celery import Celery
app = Celery('tasks', broker='redis://localhost',backend='redis://localhost')
@app.task
def add(x, y):
return x + y
Celery
的第一个参数是这个APP的名字,broker
和backend
指定了要使用消息代理的URL。
上面我们定义了一个任务叫add
,它返回x
和y
的和。
(venv) ➜ ~ celery -A tasks worker --loglevel=info
执行启动命令的时候,一定要和tasks.py
在同一级目录下,执行成功的输出应该如下:
(venv) ➜ ~ celery -A tasks worker --loglevel=info
[email protected] v4.1.0 (latentcall)
Darwin-17.3.0-x86_64-i386-64bit 2018-01-22 14:33:49
[config]
.> app: tasks:0x1069cd3c8
.> transport: redis://localhost:6379//
.> results: redis://localhost/
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2018-01-22 14:33:49,334: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-01-22 14:33:49,350: INFO/MainProcess] mingle: searching for neighbors
[2018-01-22 14:33:50,376: INFO/MainProcess] mingle: all alone
[2018-01-22 14:33:50,394: INFO/MainProcess] [email protected] ready.
新打开一个终端并切换到虚拟环境中,然后进入Python交互模式下
>>> from tasks import add
# 如果执行函数的时候不加delay则表示当做函数来执行
>>> add(1,2)
3
# 异步任务添加成功后返回一个AsyncResult实例
>>> add.delay(1,2)
<AsyncResult: e167ae38-0e53-4592-ad0d-6c685f16168c>
>>> result = add.delay(1,2)
# get获取执行结果,如果任务没有执行完成就执行get则会阻塞住
>>> result.get()
3
为了测试延迟异步效果,我们在tasks.py里面新加一个任务叫sleep
from celery import Celery
import time
app = Celery('tasks', broker='redis://localhost',backend='redis://localhost')
@app.task
def add(x, y):
return x + y
@app.task
def sleep(t):
time.sleep(t)
return str(time.time())
任务添加完成之后我们需要关闭celery worker
,然后在启动,当我们再次启动的时候回发现刚添加的任务sleep
已经加入到tasks中了。
(venv) ➜ ~ celery -A tasks worker --loglevel=info
[email protected] v4.1.0 (latentcall)
Darwin-17.3.0-x86_64-i386-64bit 2018-01-22 14:34:27
[config]
.> app: tasks:0x105647400
.> transport: redis://localhost:6379//
.> results: redis://localhost/
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
. tasks.sleep
[2018-01-22 14:34:27,548: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-01-22 14:34:27,563: INFO/MainProcess] mingle: searching for neighbors
[2018-01-22 14:34:28,588: INFO/MainProcess] mingle: all alone
[2018-01-22 14:34:28,619: INFO/MainProcess] [email protected] ready.
再次进入python交互环境调用任务
>>> from tasks import sleep
# 创建任务,sleep 10秒
>>> task = sleep.delay(10)
# 在任务还没有执行完成的之后get结果,设置超时时间为1秒,从结果可以看出,任务并没有执行完成,抛出了一个celery.exceptions.TimeoutError的异常
>>> task.get(timeout=1)
Traceback (most recent call last):
File "/Users/shengan/.pyenv/versions/venv/lib/python3.6/site-packages/celery/backends/async.py", line 256, in _wait_for_pending
on_interval=on_interval):
File "/Users/shengan/.pyenv/versions/venv/lib/python3.6/site-packages/celery/backends/async.py", line 55, in drain_events_until
raise socket.timeout()
socket.timeout
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/shengan/.pyenv/versions/venv/lib/python3.6/site-packages/celery/result.py", line 194, in get
on_message=on_message,
File "/Users/shengan/.pyenv/versions/venv/lib/python3.6/site-packages/celery/backends/async.py", line 189, in wait_for_pending
for _ in self._wait_for_pending(result, **kwargs):
File "/Users/shengan/.pyenv/versions/venv/lib/python3.6/site-packages/celery/backends/async.py", line 260, in _wait_for_pending
raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.
如果你要想要知道一个耗时的任务有没有执行完毕,通过上述的测试可以捕获异常,然后做对应的处理,但是celery提供了一个更方便的方法。
>>> result.ready()
False
>>> result.ready()
True
>>> result.get()
'1516603282.2424462'
关于AsyncResult的结果对象可以参考:http://docs.celeryproject.org/en/latest/reference/celery.result.html
# 项目目录
(venv) ➜ ~ mkdir celery_tasks
# celery的配置文件
(venv) ➜ ~ touch celery_tasks/celery.py
# 任务1
(venv) ➜ ~ touch celery_tasks/task_1.py
# 任务2
(venv) ➜ ~ touch celery_tasks/task_2.py
- celery_tasks/celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('celery_tasks',
broker='redis://localhost',
backend='redis://localhost',
include=['celery_tasks.task_1', 'celery_tasks.task_2'])
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
- celery_tasks/task_1.py
from __future__ import absolute_import
from .celery import app
@app.task
def add(x, y):
return x + y
- celery_tasks/task_2.py
from __future__ import absolute_import
import time
from .celery import app
@app.task
def sleep(t):
time.sleep(t)
return str(time.time())
- 启动
(venv) ➜ ~ celery -A celery_tasks worker -l info
[email protected] v4.1.0 (latentcall)
Darwin-17.3.0-x86_64-i386-64bit 2018-01-22 14:59:33
[config]
.> app: celery_tasks:0x109e29ef0
.> transport: redis://localhost:6379//
.> results: redis://localhost/
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_tasks.task_1.add
. celery_tasks.task_2.sleep
[2018-01-22 14:59:34,042: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-01-22 14:59:34,072: INFO/MainProcess] mingle: searching for neighbors
[2018-01-22 14:59:35,109: INFO/MainProcess] mingle: all alone
[2018-01-22 14:59:35,148: INFO/MainProcess] [email protected] ready.
新开一个窗口测试
(venv) ➜ ~ python
>>> from celery_tasks import task_1,task_2
>>> task_1.add.delay(1,2)
<AsyncResult: 8621cc57-ddf1-4188-a21d-78678611c735>
>>> task_2.sleep.delay(5)
<AsyncResult: a842caa1-3162-460c-b49a-860797f7784b>
- 启动
(venv) ➜ ~ celery multi start celery_demo -A celery_tasks -l info
celery multi v4.1.0 (latentcall)
> Starting nodes...
> [email protected]: OK
- 查看进程
(venv) ➜ ~ ps -ef | grep celery_demo | grep -v grep
501 30862 1 0 3:03下午 ?? 0:00.72 /Users/shengan/.pyenv/versions/3.6.3/envs/venv/bin/python -m celery worker -A celery_tasks -l info --logfile=celery_demo%I.log --pidfile=celery_demo.pid [email protected]
501 30900 30862 0 3:03下午 ?? 0:00.01 /Users/shengan/.pyenv/versions/3.6.3/envs/venv/bin/python -m celery worker -A celery_tasks -l info --logfile=celery_demo%I.log --pidfile=celery_demo.pid [email protected]
501 30901 30862 0 3:03下午 ?? 0:00.01 /Users/shengan/.pyenv/versions/3.6.3/envs/venv/bin/python -m celery worker -A celery_tasks -l info --logfile=celery_demo%I.log --pidfile=celery_demo.pid [email protected]
501 30902 30862 0 3:03下午 ?? 0:00.01 /Users/shengan/.pyenv/versions/3.6.3/envs/venv/bin/python -m celery worker -A celery_tasks -l info --logfile=celery_demo%I.log --pidfile=celery_demo.pid [email protected]
501 30903 30862 0 3:03下午 ?? 0:00.01 /Users/shengan/.pyenv/versions/3.6.3/envs/venv/bin/python -m celery worker -A celery_tasks -l info --logfile=celery_demo%I.log --pidfile=celery_demo.pid [email protected]
一个守护进程,worker
个数等于CPU
个数。
- 重启
(venv) ➜ ~ celery multi restart celery_demo -A celery_tasks -l info
celery multi v4.1.0 (latentcall)
> Stopping nodes...
> [email protected]: TERM -> 31132
> Waiting for 1 node -> 31132.....
> [email protected]: OK
> Restarting node [email protected]: OK
> Waiting for 1 node -> None...
- 停止
(venv) ➜ ~ celery multi stop celery_demo -A celery_tasks -l info
celery multi v4.1.0 (latentcall)
> Stopping nodes...
> [email protected]: TERM -> 30862
- 等待任务执行完毕后停止
(venv) ➜ ~ celery multi stopwait celery_demo -A celery_tasks -l info
Celery
不仅可以做异步任务,还可以做定时任务,它还与django
做了深度结合,可以使用django-celery-result库将任务执行的结果存放到django的models中,django-celery-beat库会将定时任务的规则存入到数据库中,而不用通过配置文件来定义。