1.开启redis
redis-server
2.开启celery工作进程
celery -A tasks worker --loglevel=info --concurrency=4 --purge --logfile=celery.log
●tasks:项目名称
●–loglevel日志级别
●-concurrency并发数可简写为-c
●–purge启动时,清除以前积压的任务
●–logfile=celery.log,存放日志的地方
3.调用celery任务的函数
●delay():异步调用任务的方式。它不会立即返回结果,而是立即返回一个AsyncResult实例,你可以使用这个实例来检查任务的状态或等待结果
result = add.delay(args=(4, 6))
●apply_async():与delay方法不同,apply_async()允许你传递更多的选项和参数来定制任务的执行。例如,你可以设置任务的过期时间、重试策略等。
result = add.apply_async(args=(4, 6), countdown=10)
countdown/eta:设置任务开始执行之前的延迟时间。countdown 是以秒为单位的延迟时间,而 eta 是一个 datetime 对象,表示任务应该开始的确切时间。注意:这两个参数不能同时设置。
expires:设置任务的过期时间。如果在这个时间之后任务还没有被执行,那么它将被丢弃。
queue:指定任务应该被发送到的队列名称。
routing_key:用于路由任务的键。这通常与 AMQP(如 RabbitMQ)的路由功能相关。
priority:任务的优先级。不同的消息代理可能对优先级的处理方式不同。
link 或 link_error:链接到另一个任务,当当前任务成功或失败时执行。这可以用于构建任务链。
●apply():同步调用任务
result = my_task.apply(args=(arg1, arg2), kwargs={'key': 'value'})
●使用bind=True绑定任务实例:使用bind=True参数来绑定任务实例到任务函数。这样,任务函数内部就可以通过self访问到任务实例的方法和属性,如self.retry()
4.查询任务状态
id = '11214515451'
a = AsyncResult(id=id, app=app)
if a.successful():
result = a.get()
print(result)
elif a.failed():
print('任务失败')
elif a.status == 'PENDING':
print('任务等待中被执行')
elif a.status == 'RETRY':
print('任务异常后正在重试')
elif a.status == 'STARTED':
print('任务已经开始被执行')