任务调用

从最小工作进程示例中可以看到,任务采用add.delay()的方式调用,如果是直接调用add(),那么指定的任务将在调用进程上执行,而不是在任务队列中执行。

要在任务队列上调用任务,需要使用.delay()方法,或者采用.apply_async()方法。其中.delay().apply_async()的别名。.apply_async()常用有以下几种调用方式。

  • .apply_async(arg, kwarg=value),直接在任务队列上调用指定方法,并传递相应参数。
  • .apply_async(countdown=10),10秒后在任务队列上调用指定方法。
  • .apply_async(eta=now + timedelta(seconds=10)),使用eta参数定义10秒后在任务队列上调用指定方法。
  • countdown=60, expires=120,1分钟后调用指定方法,但2分钟后尚未执行就取消任务。

除了以上几种调用方式以外,.apply_async()还可以接收以下这些参数来完成其他功能。

  • link,可以接收一个函数或者函数列表,用于连续的执行任务,通常采用task.s()来定义子任务传递给这个参数。
  • link_error,定义当任务出错时需要调用的错误处理函数。
  • retry,布尔值,用于指定在任务失败时是否要重试。
  • retry_policy,设定任务重试策略,需要传递一个字典类型值。
    • max_reties,设定任务最大重试次数。
    • interval_start,设定两次重试之间间隔的秒数。
    • interval_step,设定多次重试之间时间间隔的递增倍率。
    • interval_max,设定两次重试之间时间间隔的最大值。
  • queue,设定任务使用的不同路由队列。

在很多情况下,我们经常需要连续的调用一些方法,这就需要Celery提供的一项功能:「任务签名」。跟编程语言中的函数签名概念一致,任务签名也是生成一个能够代表函数的内容供Celery调用。任务签名一般使用signature()函数定义,格式为signature('任务函数名', args=(), **kwargs)。其中args需要传递传给任务函数的参数。**kwargs中则与.apply_async()方法能够接收的参数相同。例如创建前面add()任务延迟10秒执行的签名就是这样:signature('tasks.add', args=(1, 1), countdown=10)。任务签名可以直接传递给.apply_async()方法的link参数使用。

在一般情况下,子任务比任务签名更加常用。子任务可以使用.subtask((参数), **kwargs)来创建。子任务还可以使用.subtask()的快捷方式.s(params, **kwargs)创建。

借助于子任务和link参数,可以实现任务的链式调用,例如:add.apply_async((2, 2), link=add.s(6)),前一个任务的调用结果会作为子任务的第一个参数传入子任务执行。