Skip to content

队列

简介

在构建 Web 应用程序时,您可能会有一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间执行时间过长。幸运的是,Laravel 允许您轻松创建可以在后台处理的队列作业。通过将耗时的任务移至队列,您的应用程序可以以极快的速度响应 Web 请求,并为客户提供更好的用户体验。

Laravel 队列为各种不同的队列后端提供了统一的队列 API,例如 Amazon SQSRedis,甚至关系数据库。

Laravel 的队列配置选项存储在应用程序的 config/queue.php 配置文件中。在此文件中,您将找到框架包含的每个队列驱动程序的连接配置,包括数据库、Amazon SQSRedisBeanstalkd 驱动程序,以及一个同步驱动程序(用于开发或测试期间立即执行作业)。还包含一个 null 队列驱动程序,它会丢弃队列作业。

NOTE

Laravel Horizon 是您的 Redis 队列的漂亮仪表板和配置系统。查看完整的 Horizon 文档 以获取更多信息。

连接与队列

在开始使用 Laravel 队列之前,了解「连接」和「队列」之间的区别非常重要。在您的 config/queue.php 配置文件中,有一个 connections 配置数组。此选项定义了到后端队列服务(如 Amazon SQS、Beanstalk 或 Redis)的连接。但是,任何给定的队列连接可能有多个「队列」,可以将其视为不同的队列作业堆栈或堆。

请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是当作业发送到给定连接时将分发到的默认队列。换句话说,如果您分发作业而没有明确定义应该分发到哪个队列,则作业将被放置在连接配置的 queue 属性中定义的队列上:

php
use App\Jobs\ProcessPodcast;

// 此作业被发送到默认连接的默认队列...
ProcessPodcast::dispatch();

// 此作业被发送到默认连接的 "emails" 队列...
ProcessPodcast::dispatch()->onQueue('emails');

某些应用程序可能不需要将作业推送到多个队列,而是更愿意拥有一个简单的队列。但是,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序特别有用,因为 Laravel 队列工作进程允许您按优先级指定应该处理哪些队列。例如,如果您将作业推送到 high 队列,您可以运行一个给予它们更高处理优先级的工作进程:

shell
php artisan queue:work --queue=high,default

驱动程序说明和先决条件

数据库

要使用 database 队列驱动程序,您需要一个数据库表来保存作业。通常,这包含在 Laravel 的默认 0001_01_01_000002_create_jobs_table.php 数据库迁移中;但是,如果您的应用程序不包含此迁移,您可以使用 make:queue-table Artisan 命令创建它:

shell
php artisan make:queue-table

php artisan migrate

Redis

要使用 redis 队列驱动程序,您应该在 config/database.php 配置文件中配置 Redis 数据库连接。

WARNING

redis 队列驱动程序不支持 serializercompression Redis 选项。

Redis 集群

如果您的 Redis 队列连接使用 Redis 集群,您的队列名称必须包含一个 键哈希标签。这是必需的,以确保给定队列的所有 Redis 键都放置在同一个哈希槽中:

php
'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', '{default}'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => null,
    'after_commit' => false,
],

阻塞

使用 Redis 队列时,您可以使用 block_for 配置选项来指定驱动程序在遍历工作进程循环并重新轮询 Redis 数据库之前应该等待作业可用的时间。

根据您的队列负载调整此值可能比持续轮询 Redis 数据库以获取新作业更有效。例如,您可以将值设置为 5,以指示驱动程序在等待作业可用时阻塞五秒钟:

php
'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', 'default'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => 5,
    'after_commit' => false,
],

WARNING

block_for 设置为 0 将导致队列工作进程无限期阻塞,直到作业可用。这也将阻止处理 SIGTERM 等信号,直到处理完下一个作业。

其他驱动程序先决条件

列出的队列驱动程序需要以下依赖项。这些依赖项可以通过 Composer 包管理器安装:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0 或 phpredis PHP 扩展
  • MongoDB: mongodb/laravel-mongodb

创建作业

生成作业类

默认情况下,应用程序的所有可队列作业都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当您运行 make:job Artisan 命令时会创建它:

shell
php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,向 Laravel 指示该作业应该推送到队列以异步运行。

NOTE

可以使用 stub 发布 自定义作业模板。

类结构

作业类非常简单,通常只包含一个 handle 方法,当队列处理作业时会调用该方法。首先,让我们看一个示例作业类。在此示例中,我们假设我们管理一个播客发布服务,需要在发布之前处理上传的播客文件:

php
<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建新的作业实例。
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行作业。
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的播客...
    }
}

在此示例中,请注意我们能够将 Eloquent 模型 直接传递到队列作业的构造函数中。由于作业使用的 Queueable trait,Eloquent 模型及其加载的关系将在作业处理时被优雅地序列化和反序列化。

如果您的队列作业在其构造函数中接受 Eloquent 模型,则只有模型的标识符将被序列化到队列上。当实际处理作业时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这种模型序列化方法允许将更小的作业负载发送到您的队列驱动程序。

handle 方法依赖注入

当队列处理作业时会调用 handle 方法。请注意,我们能够在作业的 handle 方法上类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。

如果您希望完全控制容器如何将依赖项注入到 handle 方法中,可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收作业和容器。在回调中,您可以自由地以任何方式调用 handle 方法。通常,您应该从 App\Providers\AppServiceProvider 服务提供者boot 方法中调用此方法:

php
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
    return $job->handle($app->make(AudioProcessor::class));
});

WARNING

二进制数据(如原始图像内容)应该在传递给队列作业之前通过 base64_encode 函数传递。否则,当作业被放置在队列上时,可能无法正确序列化为 JSON。

队列关系

由于当作业排队时,所有加载的 Eloquent 模型关系也会被序列化,因此序列化的作业字符串有时会变得相当大。此外,当作业被反序列化并从数据库重新检索模型关系时,它们将被完整检索。在作业排队过程中序列化模型之前应用的任何先前关系约束在作业反序列化时将不会被应用。因此,如果您希望使用给定关系的子集,应该在队列作业中重新约束该关系。

或者,为了防止关系被序列化,您可以在设置属性值时在模型上调用 withoutRelations 方法。此方法将返回没有其加载关系的模型实例:

php
/**
 * 创建新的作业实例。
 */
public function __construct(
    Podcast $podcast,
) {
    $this->podcast = $podcast->withoutRelations();
}

如果您使用 PHP 构造函数属性提升 并希望指示 Eloquent 模型不应序列化其关系,可以使用 WithoutRelations 属性:

php
use Illuminate\Queue\Attributes\WithoutRelations;

/**
 * 创建新的作业实例。
 */
public function __construct(
    #[WithoutRelations]
    public Podcast $podcast,
) {}

为方便起见,如果您希望序列化所有模型而不包含关系,可以将 WithoutRelations 属性应用于整个类,而不是将属性应用于每个模型:

php
<?php

namespace App\Jobs;

use App\Models\DistributionPlatform;
use App\Models\Podcast;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\WithoutRelations;

#[WithoutRelations]
class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建新的作业实例。
     */
    public function __construct(
        public Podcast $podcast,
        public DistributionPlatform $platform,
    ) {}
}

如果作业接收的是 Eloquent 模型的集合或数组而不是单个模型,则当作业被反序列化和执行时,该集合中的模型将不会恢复其关系。这是为了防止处理大量模型的作业过度使用资源。

唯一作业

WARNING

唯一作业需要支持 的缓存驱动程序。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动程序支持原子锁。

WARNING

唯一作业约束不适用于批次中的作业。

有时,您可能希望确保在任何时候队列上只有一个特定作业的实例。您可以通过在作业类上实现 ShouldBeUnique 接口来实现这一点。此接口不需要您在类上定义任何其他方法:

php
<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    // ...
}

在上面的示例中,UpdateSearchIndex 作业是唯一的。因此,如果该作业的另一个实例已经在队列上且尚未完成处理,则不会分发该作业。

在某些情况下,您可能希望定义一个使作业唯一的特定「键」,或者您可能希望指定一个超时,超过该超时后作业不再保持唯一。为此,您可以使用 UniqueFor 属性并在作业类上定义一个 uniqueId 方法:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Queue\Attributes\UniqueFor;

#[UniqueFor(3600)]
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * 产品实例。
     *
     * @var \App\Models\Product
     */
    public $product;

    /**
     * 获取作业的唯一 ID。
     */
    public function uniqueId(): string
    {
        return $this->product->id;
    }
}

在上面的示例中,UpdateSearchIndex 作业按产品 ID 唯一。因此,在现有作业完成处理之前,任何具有相同产品 ID 的作业新分发都将被忽略。此外,如果现有作业在一小时内未被处理,唯一锁将被释放,另一个具有相同唯一键的作业可以被分发到队列。

WARNING

如果您的应用程序从多个 Web 服务器或容器分发作业,您应该确保所有服务器都与同一个中央缓存服务器通信,以便 Laravel 可以准确确定作业是否唯一。

在处理开始之前保持作业唯一

默认情况下,唯一作业在作业完成处理或失败所有重试尝试后「解锁」。但是,可能存在您希望作业在处理之前立即解锁的情况。为此,您的作业应该实现 ShouldBeUniqueUntilProcessing 契约而不是 ShouldBeUnique 契约:

php
<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
    // ...
}

唯一作业锁

在幕后,当分发 ShouldBeUnique 作业时,Laravel 会尝试使用 uniqueId 锁获取一个 。如果锁已被持有,则不会分发作业。当作业完成处理或失败所有重试尝试时,此锁将被释放。默认情况下,Laravel 将使用默认缓存驱动程序获取此锁。但是,如果您希望使用另一个驱动程序获取锁,可以定义一个 uniqueVia 方法,该方法返回应该使用的缓存驱动程序:

php
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    // ...

    /**
     * 获取唯一作业锁的缓存驱动程序。
     */
    public function uniqueVia(): Repository
    {
        return Cache::driver('redis');
    }
}

NOTE

如果您只需要限制作业的并发处理,请改用 WithoutOverlapping 作业中间件。

加密作业

Laravel 允许您通过 加密 确保作业数据的隐私和完整性。首先,只需将 ShouldBeEncrypted 接口添加到作业类。一旦将此接口添加到类中,Laravel 将在将作业推送到队列之前自动加密您的作业:

php
<?php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
    // ...
}

作业中间件

作业中间件允许您在队列作业执行周围包装自定义逻辑,从而减少作业本身的样板代码。例如,考虑以下 handle 方法,它利用 Laravel 的 Redis 速率限制功能,每五秒只允许处理一个作业:

php
use Illuminate\Support\Facades\Redis;

/**
 * 执行作业。
 */
public function handle(): void
{
    Redis::throttle('key')->allow(1)->every(5)->then(function () {
        // 获取锁,处理播客...
    }, function () {
        // 无法获取锁...

        return $this->release(5);
    });
}

虽然此代码有效,但 handle 方法变得杂乱,因为它被 Redis 速率限制逻辑污染了。此外,其他作业的速率限制逻辑必须复制到其他作业中。

我们可以将速率限制逻辑提取到作业中间件中,而不是在 handle 方法中处理速率限制。Laravel 作业中间件就像路由中间件一样,允许您将逻辑包装在作业执行周围。通过在作业类上定义 middleware 方法来注册作业中间件:

php
use App\Jobs\Middleware\RateLimited;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [new RateLimited('redis')];
}

作业中间件可以在任何地方创建,但通常放在 app/Jobs/Middleware 目录中。以下是速率限制中间件的示例:

php
<?php

namespace App\Jobs\Middleware;

use Closure;
use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * 处理队列作业。
     *
     * @param  \Closure(object): mixed  $next
     */
    public function handle(object $job, Closure $next): void
    {
        Redis::throttle('key')
            ->allow(1)->every(5)
            ->then(function () use ($job, $next) {
                // 获取锁...

                $next($job);
            }, function () use ($job) {
                // 无法获取锁...

                $job->release(5);
            });
    }
}

如您所见,中间件接收正在处理的作业和一个应该调用的 $next 闭包以继续处理作业。

速率限制

Laravel 提供了一个 Illuminate\Queue\Middleware\RateLimited 中间件,您可以使用它来限制队列作业的速率。这允许您控制作业在给定时间内可以运行的次数。

例如,如果您想限制作业每分钟只运行 10 次,可以这样做:

php
use Illuminate\Queue\Middleware\RateLimited;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [new RateLimited('backups')];
}

RateLimited 中间件的构造函数接受一个「限制器」名称。此名称应对应于您在 AppServiceProviderboot 方法中定义的限制器:

php
use Illuminate\Support\Facades\RateLimiter;

/**
 * 引导任何应用程序服务。
 */
public function boot(): void
{
    RateLimiter::for('backups', function (object $job) {
        return $job->user->vipCustomer()
                    ? Limit::none()
                    : Limit::perMinute(10);
    });
}

如果作业超过速率限制,它将被释放回队列。您可以指定作业在再次尝试之前应该等待的秒数:

php
use Illuminate\Queue\Middleware\RateLimited;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [new RateLimited('backups', 30)]; // 等待 30 秒
}

如果作业超过速率限制,它将被释放回队列,并带有指定的延迟秒数。

防止作业重叠

Laravel 提供了一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,允许您根据任意键防止作业重叠。这对于一次只应由一个实例处理的作业非常有用。

例如,假设您有一个更新用户统计信息的作业,您希望确保一次只有一个这样的作业在运行:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [new WithoutOverlapping('user:'.$this->user->id)];
}

WithoutOverlapping 中间件接受一个唯一键作为参数。如果具有相同键的另一个作业已经在处理中,新作业将被释放回队列。

您还可以指定作业在释放之前应该等待锁的秒数:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [new WithoutOverlapping('user:'.$this->user->id, 60)];
}

默认情况下,WithoutOverlapping 中间件将在作业完成处理后释放锁。但是,您可以使用 releaseAfter 方法自定义此行为:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [(new WithoutOverlapping('user:'.$this->user->id))->releaseAfter(30)];
}

或者,您可以使用 dontRelease 方法指示中间件在作业完成后不释放锁:

php
use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [(new WithoutOverlapping('user:'.$this->user->id))->dontRelease()];
}

限制异常

Laravel 提供了一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,允许您限制作业抛出异常的频率。这对于可能偶尔失败但不希望因持续重试而使系统过载的作业非常有用。

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [new ThrottlesExceptions(3, 60)]; // 60 秒内最多 3 次异常
}

ThrottlesExceptions 中间件接受两个参数:允许的异常数量和时间窗口(秒)。如果作业在指定时间窗口内抛出超过允许数量的异常,它将被释放回队列并带有延迟。

您还可以指定异常被限制时作业应该等待的秒数:

php
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(3, 60))->backoff(300)]; // 等待 5 分钟
}

跳过作业

Laravel 提供了一个 Illuminate\Queue\Middleware\Skip 中间件,允许您根据条件跳过作业。这对于在满足某些条件时不需要处理作业的情况非常有用。

php
use Illuminate\Queue\Middleware\Skip;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [
        Skip::if($this->user->isBlocked()),
        Skip::unless($this->order->isPaid()),
    ];
}

Skip::if 方法在条件为真时跳过作业,而 Skip::unless 方法在条件为假时跳过作业。

分发作业

一旦您编写了作业类,您可以使用作业本身的 dispatch 方法分发它。传递给 dispatch 方法的参数将被传递给作业的构造函数:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast);

        return redirect('/podcasts');
    }
}

延迟分发

如果您想延迟执行队列作业,可以使用 delay 方法。例如,让我们指定一个作业在分发后 10 分钟内不可用:

php
ProcessPodcast::dispatch($podcast)
    ->delay(now()->addMinutes(10));

您也可以使用 Delay 属性在作业类本身上指定延迟:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\Delay;

#[Delay(60)]
class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    // ...
}

WARNING

Amazon SQS 队列服务最大延迟时间为 15 分钟。

同步分发

如果您想立即(同步)分发作业,可以使用 dispatchSync 方法。使用此方法时,作业不会排队,而是立即在当前进程中执行:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatchSync($podcast);

        return redirect('/podcasts');
    }
}

延迟同步分发

使用延迟同步分发,您可以分发一个作业在当前进程期间处理,但在 HTTP 响应发送给用户之后。这允许您同步处理「排队」作业,而不会减慢用户的应用程序体验。要延迟同步作业的执行,请将作业分发到 deferred 连接:

php
RecordDelivery::dispatch($order)->onConnection('deferred');

deferred 连接也作为默认的 故障转移队列

类似地,background 连接在 HTTP 响应发送给用户之后处理作业;但是,作业在单独生成的 PHP 进程中处理,允许 PHP-FPM / 应用程序工作进程可用于处理另一个传入的 HTTP 请求:

php
RecordDelivery::dispatch($order)->onConnection('background');

作业与数据库事务

虽然在数据库事务中分发作业完全没问题,但您应该特别注意确保您的作业能够成功执行。在事务内分发作业时,作业可能会在父事务提交之前被工作进程处理。发生这种情况时,您在数据库事务期间对模型或数据库记录所做的任何更新可能尚未反映在数据库中。此外,在事务内创建的任何模型或数据库记录可能不存在于数据库中。

幸运的是,Laravel 提供了几种解决此问题的方法。首先,您可以在队列连接的配置数组中设置 after_commit 连接选项:

php
'redis' => [
    'driver' => 'redis',
    // ...
    'after_commit' => true,
],

after_commit 选项为 true 时,您可以在数据库事务内分发作业;但是,Laravel 将等待打开的父数据库事务提交后才实际分发作业。当然,如果当前没有打开的数据库事务,作业将立即被分发。

如果事务由于期间发生的异常而回滚,则在该事务期间分发的作业将被丢弃。

NOTE

after_commit 配置选项设置为 true 也会导致所有排队的事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后被分发。

内联指定提交分发行为

如果您没有将 after_commit 队列连接配置选项设置为 true,您仍然可以指示特定作业应该在所有打开的数据库事务提交后被分发。为此,您可以将 afterCommit 方法链接到您的分发操作:

php
use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

同样,如果 after_commit 配置选项设置为 true,您可以指示特定作业应该立即分发,而不等待任何打开的数据库事务提交:

php
ProcessPodcast::dispatch($podcast)->beforeCommit();

作业链

作业链允许您指定一个排队作业列表,这些作业应该在主作业成功执行后按顺序运行。如果链中的一个作业失败,其余作业将不会运行。要执行排队作业链,可以使用 Bus 门面提供的 chain 方法。Laravel 的命令总线是一个较低级别的组件,排队作业分发是建立在其之上的:

php
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();

除了链接作业类实例外,您还可以链接闭包:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/* ... */);
    },
])->dispatch();

WARNING

在作业中使用 $this->delete() 方法删除作业不会阻止链式作业被处理。只有当链中的作业失败时,链才会停止执行。

链连接和队列

如果您想指定链式作业应该使用的连接和队列,可以使用 onConnectiononQueue 方法。这些方法指定应该使用的队列连接和队列名称,除非排队作业被明确分配了不同的连接/队列:

php
Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

向链添加作业

有时,您可能需要从链中的另一个作业内向现有作业链前置或追加作业。您可以使用 prependToChainappendToChain 方法完成此操作:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    // 前置到当前链,在当前作业之后立即运行作业...
    $this->prependToChain(new TranscribePodcast);

    // 追加到当前链,在链末尾运行作业...
    $this->appendToChain(new TranscribePodcast);
}

链失败

链接作业时,可以使用 catch 方法指定一个闭包,该闭包应该在链中的作业失败时调用。给定的回调将接收导致作业失败的 Throwable 实例:

php
use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // 链中的作业失败了...
})->dispatch();

WARNING

由于链回调被序列化并由 Laravel 队列在稍后执行,您不应在链回调中使用 $this 变量。

自定义队列和连接

分发到特定队列

通过将作业推送到不同的队列,您可以「分类」排队的作业,甚至优先考虑分配给各种队列的工作进程数量。请记住,这不会将作业推送到队列配置文件定义的不同队列「连接」,而只是推送到单个连接内的特定队列。要指定队列,请在分发作业时使用 onQueue 方法:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');

        return redirect('/podcasts');
    }
}

或者,您可以通过在作业的构造函数中调用 onQueue 方法来指定作业的队列:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建新的作业实例。
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

分发到特定连接

如果您的应用程序与多个队列连接交互,可以使用 onConnection 方法指定将作业推送到哪个连接:

php
<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');

        return redirect('/podcasts');
    }
}

您可以将 onConnectiononQueue 方法链接在一起,以指定作业的连接和队列:

php
ProcessPodcast::dispatch($podcast)
    ->onConnection('sqs')
    ->onQueue('processing');

或者,您可以通过在作业的构造函数中调用 onConnection 方法来指定作业的连接:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建新的作业实例。
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

队列路由

您可以使用 Queue 门面的 route 方法为特定作业类定义默认连接和队列。当您希望确保某些作业始终使用特定队列而无需在作业上指定连接或队列时,这非常有用。

除了路由特定作业类外,您还可以将接口、trait 或父类传递给 route 方法。这样做时,实现该接口、使用该 trait 或扩展该父类的任何作业都将自动使用配置的连接和队列。

通常,您应该从服务提供者的 boot 方法中调用 route 方法:

php
use App\Concerns\RequiresVideo;
use App\Jobs\ProcessPodcast;
use App\Jobs\ProcessVideo;
use Illuminate\Support\Facades\Queue;

/**
 * 引导任何应用程序服务。
 */
public function boot(): void
{
    Queue::route(ProcessPodcast::class, connection: 'redis', queue: 'podcasts');
    Queue::route(RequiresVideo::class, queue: 'video');
}

当指定连接而不指定队列时,作业将被发送到默认队列:

php
Queue::route(ProcessPodcast::class, connection: 'redis');

您还可以通过将数组传递给 route 方法来一次路由多个作业类:

php
Queue::route([
    ProcessPodcast::class => ['podcasts', 'redis'], // 队列和连接
    ProcessVideo::class => 'videos', // 仅队列(使用默认连接)
]);

NOTE

队列路由仍然可以被作业按作业基础覆盖。

指定最大作业尝试次数/超时值

最大尝试次数

作业尝试是 Laravel 队列系统的核心概念,并为许多高级功能提供支持。虽然起初可能令人困惑,但在修改默认配置之前了解它们的工作原理非常重要。

当作业被分发时,它被推送到队列上。然后工作进程将其取出并尝试执行。这就是一次作业尝试。

但是,尝试并不一定意味着作业的 handle 方法被执行。尝试也可以通过以下几种方式「消耗」:

  • 作业在执行期间遇到未处理的异常。
  • 作业使用 $this->release() 手动释放回队列。
  • WithoutOverlappingRateLimited 等中间件无法获取锁并释放作业。
  • 作业超时。
  • 作业的 handle 方法运行并完成而不抛出异常。

您可能不希望无限期地尝试作业。因此,Laravel 提供了多种方法来指定作业可以被尝试多少次或多长时间。

NOTE

默认情况下,Laravel 只会尝试作业一次。如果您的作业使用 WithoutOverlappingRateLimited 等中间件,或者您手动释放作业,您可能需要通过 tries 选项增加允许的尝试次数。

指定作业可以被尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关。这将应用于工作进程处理的所有作业,除非正在处理的作业指定了它可以被尝试的次数:

shell
php artisan queue:work --tries=3

如果作业超过其最大尝试次数,它将被视为「失败」作业。有关处理失败作业的更多信息,请参阅 失败作业文档。如果向 queue:work 命令提供 --tries=0,作业将被无限期重试。

您可以通过使用 Tries 属性在作业类本身上定义作业可以被尝试的最大次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的 --tries 值:

php
<?php

namespace App\Jobs;

use Illuminate\Queue\Attributes\Tries;

#[Tries(5)]
class ProcessPodcast implements ShouldQueue
{
    // ...
}

如果您需要对特定作业的最大尝试次数进行动态控制,可以在作业上定义一个 tries 方法:

php
/**
 * 确定作业可以被尝试的次数。
 */
public function tries(): int
{
    return 5;
}

基于时间的尝试

作为定义作业在失败之前可以被尝试多少次的替代方法,您可以定义一个作业不应再被尝试的时间。这允许作业在给定时间范围内被尝试任意次数。要定义作业不应再被尝试的时间,请在作业类中添加一个 retryUntil 方法。此方法应返回一个 DateTime 实例:

php
use DateTime;

/**
 * 确定作业应该超时的时间。
 */
public function retryUntil(): DateTime
{
    return now()->plus(minutes: 10);
}

如果同时定义了 retryUntiltries,Laravel 会优先使用 retryUntil 方法。

NOTE

您也可以在 排队事件监听器排队通知 上定义 Tries 属性或 retryUntil 方法。

最大异常

有时您可能希望指定作业可以被尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是直接由 release 方法释放),则应该失败。为此,您可以在作业类上使用 TriesMaxExceptions 属性:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\MaxExceptions;
use Illuminate\Queue\Attributes\Tries;
use Illuminate\Support\Facades\Redis;

#[Tries(25)]
#[MaxExceptions(3)]
class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 执行作业。
     */
    public function handle(): void
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // 获取锁,处理播客...
        }, function () {
            // 无法获取锁...
            return $this->release(10);
        });
    }
}

在此示例中,如果应用程序无法获取 Redis 锁,作业将被释放十秒钟,并将继续重试最多 25 次。但是,如果作业抛出三个未处理的异常,作业将失败。

超时

通常,您大致知道排队作业应该花费多长时间。因此,Laravel 允许您指定一个「超时」值。默认情况下,超时值为 60 秒。如果作业处理的时间超过超时值指定的秒数,处理该作业的工作进程将出错退出。通常,工作进程将由 服务器上配置的进程管理器 自动重启。

可以使用 Artisan 命令行上的 --timeout 开关指定作业可以运行的最大秒数:

shell
php artisan queue:work --timeout=30

如果作业因持续超时而超过其最大尝试次数,它将被标记为失败。

您还可以使用作业类上的 Timeout 属性定义作业应该被允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:

php
<?php

namespace App\Jobs;

use Illuminate\Queue\Attributes\Timeout;

#[Timeout(120)]
class ProcessPodcast implements ShouldQueue
{
    // ...
}

有时,IO 阻塞进程(如套接字或传出 HTTP 连接)可能不会遵守您指定的超时。因此,在使用这些功能时,您应该始终尝试使用它们的 API 指定超时。例如,使用 Guzzle 时,您应该始终指定连接和请求超时值。

WARNING

必须安装 PCNTL PHP 扩展才能指定作业超时。此外,作业的「超时」值应始终小于其 「重试后」 值。否则,作业可能在实际上完成执行或超时之前被重新尝试。

超时时失败

如果您想指示作业在超时时应该被标记为 失败,可以在作业类上使用 FailOnTimeout 属性:

php
<?php

namespace App\Jobs;

use Illuminate\Queue\Attributes\FailOnTimeout;

#[FailOnTimeout]
class ProcessPodcast implements ShouldQueue
{
    // ...
}

NOTE

默认情况下,当作业超时时,它会消耗一次尝试并被释放回队列(如果允许重试)。但是,如果您将作业配置为超时时失败,它将不会被重试,无论为 tries 设置的值如何。

SQS FIFO 和公平队列

Laravel 支持 Amazon SQS FIFO(先进先出) 队列,允许您按照发送的确切顺序处理作业,并通过消息去重确保恰好一次处理。

FIFO 队列需要一个消息组 ID 来确定哪些作业可以并行处理。具有相同组 ID 的作业按顺序处理,而具有不同组 ID 的消息可以并发处理。

Laravel 提供了一个流畅的 onGroup 方法来在分发作业时指定消息组 ID:

php
ProcessOrder::dispatch($order)
    ->onGroup("customer-{$order->customer_id}");

SQS FIFO 队列支持消息去重以确保恰好一次处理。在作业类中实现 deduplicationId 方法以提供自定义去重 ID:

php
<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessSubscriptionRenewal implements ShouldQueue
{
    use Queueable;

    // ...

    /**
     * 获取作业的去重 ID。
     */
    public function deduplicationId(): string
    {
        return "renewal-{$this->subscription->id}";
    }
}

FIFO 监听器、邮件和通知

使用 FIFO 队列时,您还需要在监听器、邮件和通知上定义消息组。或者,您可以将这些对象的排队实例分发到非 FIFO 队列。

要为 排队事件监听器 定义消息组,请在监听器上定义一个 messageGroup 方法。您还可以选择定义一个 deduplicationId 方法:

php
<?php

namespace App\Listeners;

class SendShipmentNotification
{
    // ...

    /**
     * 获取作业的消息组。
     */
    public function messageGroup(): string
    {
        return 'shipments';
    }

    /**
     * 获取作业的去重 ID。
     */
    public function deduplicationId(): string
    {
        return "shipment-{$this->shipment->id}";
    }
}

队列故障转移

有时您可能希望指定一个备用连接,如果主连接驱动程序无法获取作业,则应该使用该连接。您可以在队列连接定义中指定一个 failover 连接:

php
'database' => [
    'driver' => 'database',
    'connection' => 'default',
    'queue' => 'default',
    'failover' => ['redis', 'sqs'],
],

在此示例中,如果 database 连接无法获取作业,Laravel 将尝试从 redis 连接获取作业。如果 redis 连接也无法获取作业,Laravel 将尝试从 sqs 连接获取作业。

错误处理

如果在处理作业时抛出异常,作业将自动释放回队列,以便再次尝试。作业将继续释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由 queue:work 命令上的 --tries 开关或作业类本身确定。

手动释放作业

有时您可能希望手动将作业释放回队列,以便稍后再次尝试。您可以通过调用 release 方法来完成此操作:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    $this->release();
}

默认情况下,作业将立即释放回队列。如果您想指定作业在再次尝试之前应该等待的秒数,可以将延迟秒数传递给 release 方法:

php
$this->release(10);

手动使作业失败

有时您可能需要手动将作业标记为「失败」。您可以通过调用 fail 方法来完成此操作:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    // ...

    $this->fail();
}

如果您想将作业标记为失败并提供错误消息或异常,可以将消息或异常传递给 fail 方法:

php
$this->fail('Something went wrong.');

$this->fail(new \Exception('Something went wrong.'));

作业批处理

Laravel 的作业批处理功能允许您轻松执行一批作业,然后在批处理完成后执行某些操作。在开始之前,您应该创建一个数据库迁移来构建一个表来包含有关作业批次的元信息,例如它们的完成百分比。可以使用 queue:batches-table Artisan 命令生成此迁移:

shell
php artisan queue:batches-table

php artisan migrate

定义可批处理作业

要定义可批处理作业,您应该像往常一样创建可排队作业;但是,您应该在作业类中添加 Illuminate\Bus\Batchable trait。此 trait 提供对用于检查当前批次的 batch 方法的访问:

php
<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ImportCsv implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * 执行作业。
     */
    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            // 确定批次是否已取消...

            return;
        }

        // 导入 CSV 文件的一部分...
    }
}

分发批次

要分发一批作业,您应该使用 Bus 门面的 batch 方法。当然,批处理主要在与完成回调结合使用时才有用。因此,您可以使用 thencatchfinally 方法定义批次的完成回调。这些回调中的每一个在调用时都会收到一个 Illuminate\Bus\Batch 实例:

php
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
    new ImportCsv(301, 400),
    new ImportCsv(401, 500),
])->then(function (Batch $batch) {
    // 所有作业成功完成...
})->catch(function (Batch $batch, Throwable $e) {
    // 检测到第一批作业失败...
})->finally(function (Batch $batch) {
    // 批次已完成执行...
})->dispatch();

return $batch->id;

链和批次

您可以在批次内定义一组链接的作业,方法是将链接的作业放在数组内:

php
Bus::batch([
    [
        new ProcessPodcast,
        new OptimizePodcast,
        new ReleasePodcast,
    ],
    [
        new ProcessPodcast,
        new OptimizePodcast,
        new ReleasePodcast,
    ],
])->then(function (Batch $batch) {
    // ...
})->dispatch();

反之,您可以在链内定义批次:

php
use App\Jobs\ImportCsv;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new BackupDatabase,
    Bus::batch([
        new ImportCsv(1, 100),
        new ImportCsv(101, 200),
    ]),
    new NotifyUsers,
])->dispatch();

向批次添加作业

有时从批次内的作业向批次添加其他作业可能很有用。此模式非常适合批处理数千个作业,这些作业可能需要解密,例如解密后需要额外处理。您可以使用 add 方法完成此操作:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    if ($this->batch()->cancelled()) {
        return;
    }

    // 处理作业...

    $this->batch()->add([
        new ProcessVideo($this->video),
        new TranscodeVideo($this->video),
    ]);
}

您还可以从批次外部添加作业:

php
use App\Jobs\ProcessVideo;
use Illuminate\Support\Facades\Bus;

$batch = Bus::find($batchId);

$batch->add([
    new ProcessVideo($video),
    new TranscodeVideo($video),
]);

检查批次

为批次分发的作业返回的 Illuminate\Bus\Batch 实例提供了各种属性和方法来检查和与给定批次交互:

php
// 批次的 UUID...
$batch->id;

// 批次的名称(如果已设置)...
$batch->name;

// 批次中的作业总数...
$batch->totalJobs;

// 尚未处理的作业数量...
$batch->pendingJobs;

// 失败的作业数量...
$batch->failedJobs;

// 已处理的作业数量...
$batch->processedJobs();

// 批次的完成百分比(0-100)...
$batch->progress();

// 批次是否已完成执行...
$batch->finished();

// 批次是否已取消...
$batch->cancelled();

// 批次是否已失败...
$batch->failed();

您可以通过其 ID 从数据库中检索批次:

php
use Illuminate\Support\Facades\Bus;

$batch = Bus::find($batchId);

取消批次

有时您可能需要取消给定批次的执行。这可以通过调用 cancel 方法完成:

php
use Illuminate\Support\Facades\Bus;

$batch = Bus::find($batchId);

$batch->cancel();

如果批次已被取消,您可以使用 cancelled 方法确定:

php
use Illuminate\Support\Facades\Bus;

$batch = Bus::find($batchId);

if ($batch->cancelled()) {
    // 批次已取消...
}

如果您想在作业中检查批次是否已被取消,可以使用 batch 方法:

php
/**
 * 执行作业。
 */
public function handle(): void
{
    if ($this->batch()->cancelled()) {
        return;
    }

    // 处理作业...
}

您还可以使用 SkipIfBatchCancelled 中间件自动跳过已取消批次的作业:

php
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
 * 获取作业应该通过的中间件。
 */
public function middleware(): array
{
    return [new SkipIfBatchCancelled];
}

批次失败

当批处理作业失败时,将调用 catch 回调(如果已分配)。此回调仅为批次中第一个失败的作业调用。

允许失败

当批次中的作业失败时,Laravel 将自动将批次标记为「已取消」。如果您希望,可以禁用此行为,以便作业失败不会自动将批次标记为已取消。这可以通过在分发批次时调用 allowFailures 方法来完成:

php
$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // 所有作业成功完成...
})->allowFailures()->dispatch();

您可以选择向 allowFailures 方法提供一个闭包,该闭包将在每次作业失败时执行:

php
$batch = Bus::batch([
    // ...
])->allowFailures(function (Batch $batch, $exception) {
    // 处理单个作业失败...
})->dispatch();

重试失败的批处理作业

为方便起见,Laravel 提供了一个 queue:retry-batch Artisan 命令,允许您轻松重试给定批次的所有失败作业。此命令接受应该重试失败作业的批次的 UUID:

shell
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

修剪批次

如果不进行修剪,job_batches 表可能会非常快地积累记录。为了缓解这种情况,您应该 调度 queue:prune-batches Artisan 命令每天运行:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches')->daily();

默认情况下,所有超过 24 小时的已完成批次将被修剪。您可以在调用命令时使用 hours 选项来确定保留批次数据的时间。例如,以下命令将删除所有在 48 小时前完成的批次:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48')->daily();

有时,您的 job_batches 表可能会积累从未成功完成的批次的批次记录,例如作业失败且该作业从未成功重试的批次。您可以使用 unfinished 选项指示 queue:prune-batches 命令修剪这些未完成的批次记录:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

同样,您的 job_batches 表也可能积累已取消批次的批次记录。您可以使用 cancelled 选项指示 queue:prune-batches 命令修剪这些已取消的批次记录:

php
use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

在 DynamoDB 中存储批次

Laravel 还支持在 DynamoDB 而不是关系数据库中存储批次元信息。但是,您需要手动创建一个 DynamoDB 表来存储所有批次记录。

通常,此表应命名为 job_batches,但您应该根据应用程序的 queue 配置文件中的 queue.batching.table 配置值来命名该表。

DynamoDB 批次表配置

job_batches 表应该有一个名为 application 的字符串主分区键和一个名为 id 的字符串主排序键。键的 application 部分将包含应用程序的 app 配置文件中 name 配置值定义的应用程序名称。由于应用程序名称是 DynamoDB 表键的一部分,您可以使用同一个表来存储多个 Laravel 应用程序的作业批次。

此外,如果您想利用 自动批次修剪,可以为您的表定义 ttl 属性。

DynamoDB 配置

接下来,安装 AWS SDK,以便您的 Laravel 应用程序可以与 Amazon DynamoDB 通信:

shell
composer require aws/aws-sdk-php

然后,将 queue.batching.driver 配置选项的值设置为 dynamodb。此外,您应该在 batching 配置数组中定义 keysecretregion 配置选项。这些选项将用于与 AWS 进行身份验证。使用 dynamodb 驱动程序时,不需要 queue.batching.database 配置选项:

php
'batching' => [
    'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
],

在 DynamoDB 中修剪批次

使用 DynamoDB 存储作业批次信息时,用于修剪存储在关系数据库中的批次的典型修剪命令将不起作用。相反,您可以利用 DynamoDB 的原生 TTL 功能 自动删除旧批次的记录。

如果您使用 ttl 属性定义了 DynamoDB 表,则可以定义配置参数来指示 Laravel 如何修剪批次记录。queue.batching.ttl_attribute 配置值定义了保存 TTL 的属性的名称,而 queue.batching.ttl 配置值定义了相对于记录上次更新时间,批次记录可以从 DynamoDB 表中删除的秒数:

php
'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
    'ttl_attribute' => 'ttl',
    'ttl' => 60 * 60 * 24 * 7, // 7 天...
],

队列闭包

您可以将闭包分发到队列,而不是将作业类分发到队列。这对于需要在当前请求周期之外执行的快速、简单任务非常有用。将闭包分发到队列时,闭包的代码内容将进行加密签名,以便在传输过程中无法修改:

php
use App\Models\Podcast;

$podcast = Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

要为队列闭包分配一个名称,该名称可由队列报告仪表板使用,并由 queue:work 命令显示,可以使用 name 方法:

php
dispatch(function () {
    // ...
})->name('Publish Podcast');

使用 catch 方法,您可以提供一个闭包,该闭包应该在队列闭包在耗尽所有队列的 配置重试尝试 后未能成功完成时执行:

php
use Throwable;

dispatch(function () use ($podcast) {
    $podcast->publish();
})->catch(function (Throwable $e) {
    // 此作业已失败...
});

WARNING

由于 catch 回调被序列化并由 Laravel 队列在稍后执行,您不应在 catch 回调中使用 $this 变量。

运行队列工作进程

queue:work 命令

Laravel 包含一个 Artisan 命令,它将启动一个队列工作进程并在新作业被推送到队列时处理它们。您可以使用 queue:work Artisan 命令运行工作进程。请注意,一旦 queue:work 命令启动,它将继续运行,直到手动停止或关闭终端:

shell
php artisan queue:work

NOTE

要让 queue:work 进程在后台永久运行,您应该使用进程监视器(如 Supervisor)来确保队列工作进程不会停止运行。

如果您希望在命令的输出中包含处理的作业 ID、连接名称和队列名称,可以在调用 queue:work 命令时包含 -v 标志:

shell
php artisan queue:work -v

请记住,队列工作进程是长期运行的进程,并将启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请确保 重启队列工作进程。此外,请记住,应用程序创建或修改的任何静态状态不会在作业之间自动重置。

或者,您可以运行 queue:listen 命令。使用 queue:listen 命令时,当您想重新加载更新的代码或重置应用程序状态时,不必手动重启工作进程;但是,此命令比 queue:work 命令效率低得多:

shell
php artisan queue:listen

运行多个队列工作进程

要为队列分配多个工作进程并并发处理作业,您应该简单地启动多个 queue:work 进程。这可以通过终端中的多个选项卡在本地完成,或者在生产中使用进程管理器的配置设置完成。使用 Supervisor 时,您可以使用 numprocs 配置值。

指定连接和队列

您还可以指定工作进程应该使用的队列连接。传递给 work 命令的连接名称应对应于 config/queue.php 配置文件中定义的连接之一:

shell
php artisan queue:work redis

默认情况下,queue:work 命令仅处理给定连接上默认队列的作业。但是,您可以通过仅处理给定连接的特定队列来进一步自定义队列工作进程。例如,如果您的所有电子邮件都在 redis 队列连接的 emails 队列中处理,您可以发出以下命令来启动一个仅处理该队列的工作进程:

shell
php artisan queue:work redis --queue=emails

处理指定数量的作业

可以使用 --once 选项指示工作进程仅从队列中处理单个作业:

shell
php artisan queue:work --once

可以使用 --max-jobs 选项指示工作进程处理给定数量的作业然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便您的工作进程在处理给定数量的作业后自动重启,释放它们可能积累的任何内存:

shell
php artisan queue:work --max-jobs=1000

处理所有排队作业然后退出

可以使用 --stop-when-empty 选项指示工作进程处理所有作业然后优雅退出。如果您希望在 Docker 容器中处理 Laravel 队列并在队列为空后关闭容器,此选项可能很有用:

shell
php artisan queue:work --stop-when-empty

处理给定秒数的作业

可以使用 --max-time 选项指示工作进程处理给定秒数的作业然后退出。此选项在与 Supervisor 结合使用时可能很有用,以便您的工作进程在处理作业给定时间后自动重启,释放它们可能积累的任何内存:

shell
# 处理作业一小时然后退出...
php artisan queue:work --max-time=3600

工作进程休眠持续时间

当队列上有可用作业时,工作进程将继续处理作业,作业之间没有延迟。但是,如果没有可用作业,sleep 选项确定工作进程将「休眠」多少秒。当然,在休眠时,工作进程不会处理任何新作业:

shell
php artisan queue:work --sleep=3

维护模式和队列

当您的应用程序处于 维护模式 时,不会处理任何排队作业。一旦应用程序退出维护模式,作业将继续正常处理。

要强制您的队列工作进程即使在启用维护模式时也处理作业,可以使用 --force 选项:

shell
php artisan queue:work --force

资源考虑

守护进程队列工作进程在处理每个作业之前不会「重启」框架。因此,您应该在每次作业完成后释放任何繁重的资源。例如,如果您使用 GD 库 进行图像操作,应该在完成图像处理后使用 imagedestroy 释放内存。

队列优先级

有时您可能希望优先处理队列的方式。例如,在您的 config/queue.php 配置文件中,您可以将 redis 连接的默认 queue 设置为 low。但是,偶尔您可能希望将作业推送到 high 优先级队列,如下所示:

php
dispatch((new Job)->onQueue('high'));

要启动一个验证所有 high 队列作业在继续处理 low 队列上的任何作业之前都已处理的工作进程,请将逗号分隔的队列名称列表传递给 work 命令:

shell
php artisan queue:work --queue=high,low

队列工作进程和部署

由于队列工作进程是长期运行的进程,如果不重启它们将不会注意到代码的更改。因此,使用队列工作进程部署应用程序的最简单方法是在部署过程中重启工作进程。您可以通过发出 queue:restart 命令优雅地重启所有工作进程:

shell
php artisan queue:restart

此命令将指示所有队列工作进程在完成当前作业处理后优雅退出,以便不会丢失任何现有作业。由于队列工作进程在执行 queue:restart 命令时会退出,您应该运行进程管理器(如 Supervisor)来自动重启队列工作进程。

NOTE

队列使用 缓存 来存储重启信号,因此在使用此功能之前,您应该验证是否为应用程序正确配置了缓存驱动程序。

作业过期和超时