Blog

Zaawansowane Wzorce Kolejek w Laravel: Batching, Chainy i Blokady Atomowe

Jeśli używasz kolejek Laravel, znasz podstawy: wysyłasz job, odpala się w tle, gotowe. Ale prawdziwe aplikacje mają bardziej złożone wymagania. Musisz przetworzyć sto plików i wiedzieć, kiedy wszystkie się zakończą. Potrzebujesz jobów zależnych od innych. Chcesz obsługiwać błędy bez utraty kontekstu. Potrzebujesz, żeby niektóre joby nigdy nie działały równolegle, inne - żeby były rate-limitowane względem zewnętrznego API. Ten artykuł omawia wzorce, które uzupełniają lukę między działającą kolejką a produkcyjnym systemem kolejkowania.

📋 Spis treści

📦 Job Batching - Grupuj, śledź, reaguj

Batching pozwala wysłać grupę jobów i reagować, gdy wszystkie się zakończą, gdy któryś się nie powiedzie, albo w obu przypadkach.

Podstawowy batch:

// app/Actions/ImportProductsAction.php

declare(strict_types=1);

namespace App\Actions;

use App\Jobs\ImportProductChunkJob;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

class ImportProductsAction
{
    public function handle(array $productChunks): string
    {
        $batch = Bus::batch(
            collect($productChunks)->map(
                fn (array $chunk) => new ImportProductChunkJob($chunk)
            )->toArray()
        )
        ->then(function (Batch $batch) {
            // Wszystkie joby zakończone pomyślnie
            logger()->info("Import zakończony: {$batch->totalJobs} produktów.");
        })
        ->catch(function (Batch $batch, Throwable $e) {
            // Pierwsze niepowodzenie - batch domyślnie kontynuuje
            logger()->error("Chunk importu nie powiódł się: {$e->getMessage()}");
        })
        ->finally(function (Batch $batch) {
            // Zawsze się odpala - sukces lub błąd
            ImportLog::updateStatus($batch->id, $batch->failedJobs > 0 ? 'partial' : 'complete');
        })
        ->name('Product Import')
        ->allowFailures() // Nie anuluj całego batcha przy jednym błędzie
        ->dispatch();

        return $batch->id;
    }
}

Sam job musi używać traitu Batchable, żeby integrować się z cyklem życia batcha:

// app/Jobs/ImportProductChunkJob.php

declare(strict_types=1);

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;

class ImportProductChunkJob implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable;

    public function __construct(public readonly array $chunk) {}

    public function handle(): void
    {
        // Sprawdź, czy batch nie został anulowany przed ciężką pracą
        if ($this->batch()?->cancelled()) {
            return;
        }

        foreach ($this->chunk as $product) {
            Product::updateOrCreate(['sku' => $product['sku']], $product);
        }
    }
}

Sprawdź postęp batcha w kontrolerze:

// app/Http/Controllers/Api/ImportController.php

declare(strict_types=1);

namespace App\Http\Controllers\Api;

use Illuminate\Support\Facades\Bus;

class ImportController extends Controller
{
    public function status(string $batchId): JsonResponse
    {
        $batch = Bus::findBatch($batchId);

        return response()->json([
            'total'      => $batch->totalJobs,
            'processed'  => $batch->processedJobs(),
            'failed'     => $batch->failedJobs,
            'progress'   => $batch->progress(), // 0100
            'finished'   => $batch->finished(),
        ]);
    }
}

To daje endpoint postępu do pollingu z frontendu.

Setup - batche wymagają tabeli job_batches:

php artisan queue:batches-table
php artisan migrate

🔗 Job Chaining - Sekwencyjne pipeline'y

Chaining uruchamia joby sekwencyjnie - następny startuje tylko gdy poprzedni zakończy się pomyślnie. Jeśli którykolwiek się nie powiedzie, reszta łańcucha jest porzucana.

// app/Actions/ProcessOrderAction.php

declare(strict_types=1);

namespace App\Actions;

use App\Jobs\ChargePaymentJob;
use App\Jobs\GenerateInvoiceJob;
use App\Jobs\SendConfirmationEmailJob;
use App\Jobs\UpdateInventoryJob;
use Illuminate\Support\Facades\Bus;

class ProcessOrderAction
{
    public function handle(int $orderId): void
    {
        Bus::chain([
            new ChargePaymentJob($orderId),
            new UpdateInventoryJob($orderId),
            new GenerateInvoiceJob($orderId),
            new SendConfirmationEmailJob($orderId),
        ])
        ->catch(function (Throwable $e) use ($orderId) {
            logger()->error("Pipeline zamówienia {$orderId} nie powiódł się: {$e->getMessage()}");
            Order::find($orderId)?->markAsFailed();
        })
        ->dispatch();
    }
}

Warunkowa kontynuacja łańcucha - PendingChain::dispatchIf():

Bus::chain([
    new ChargePaymentJob($orderId),
    new UpdateInventoryJob($orderId),
])
->dispatchIf($order->requiresShipping, new CreateShipmentJob($orderId));

Mieszanie batchów i łańcuchów - batch wewnątrz łańcucha:

Bus::chain([
    new ValidateOrderJob($orderId),
    Bus::batch([
        new ProcessPaymentJob($orderId),
        new ReserveInventoryJob($orderId),
    ])->allowFailures(),
    new SendConfirmationJob($orderId),
])->dispatch();

Łańcuch czeka na zakończenie całego batcha przed przejściem do SendConfirmationJob.

🛡️ Job Middleware - Reguły per job

Job middleware pozwala przypiąć wielokrotnie używalne zachowania bezpośrednio do joba - rate limiting, deduplikację, throttling - bez powielania logiki w handle().

WithoutOverlapping - zapobiegaj równoległemu wykonaniu:

// app/Jobs/GenerateReportJob.php

declare(strict_types=1);

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;

class GenerateReportJob implements ShouldQueue
{
    public function __construct(public readonly int $userId) {}

    public function middleware(): array
    {
        return [
            // Tylko jeden GenerateReportJob per user może działać naraz
            new WithoutOverlapping($this->userId),
        ];
    }

    public function handle(): void
    {
        // Generuj raport dla $this->userId
    }
}

WithoutOverlapping używa blokady Redis zakluczowanej na klasie joba + ID, które podajesz. Nakładające się joby są zwracane do kolejki i ponawiane.

Skonfiguruj wygasanie i zachowanie przy zwrocie:

public function middleware(): array
{
    return [
        (new WithoutOverlapping($this->userId))
            ->expireAfter(300)    // Blokada wygasa po 5 minutach (zapobieganie deadlockowi)
            ->releaseAfter(30)    // Zwróć do kolejki po 30 sekundach
            ->dontRelease(),      // Lub całkowicie odrzuć nakładające się joby
    ];
}

ThrottlesExceptions - cofaj się przy powtarzających się błędach:

// app/Jobs/SyncExternalApiJob.php

use Illuminate\Queue\Middleware\ThrottlesExceptions;

public function middleware(): array
{
    return [
        // Po 3 wyjątkach odczekaj 10 minut
        new ThrottlesExceptions(maxAttempts: 3, decayMinutes: 10),
    ];
}

To różni się od $tries. ThrottlesExceptions wstrzymuje joba gdy zaczyna się nie udawać, dając zewnętrznemu serwisowi czas na odbudowanie, i automatycznie wznawia.

RateLimited - respektuj limity zewnętrznego API:

// app/Providers/AppServiceProvider.php

use Illuminate\Support\Facades\RateLimiter;
use Illuminate\Cache\RateLimiting\Limit;

RateLimiter::for('external-api', function () {
    return Limit::perMinute(60); // Maks 60 wywołań API na minutę łącznie przez wszystkich workerów
});
// app/Jobs/SendToExternalApiJob.php

use Illuminate\Queue\Middleware\RateLimited;

public function middleware(): array
{
    return [new RateLimited('external-api')];
}

Wszyscy workerzy dzielą ten sam rate limiter przez Redis - jeśli masz 5 workerów, łącznie respektują limit 60/minutę, nie 300/minutę.

🔁 ShouldBeUnique i ShouldBeEncrypted

ShouldBeUnique - zapobiega wysłaniu duplikatu joba gdy jeden już jest w kolejce lub działa:

// app/Jobs/RecalculateUserStatsJob.php

declare(strict_types=1);

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;

class RecalculateUserStatsJob implements ShouldQueue, ShouldBeUnique
{
    public int $uniqueFor = 3600; // Okno unikalności: 1 godzina

    public function __construct(public readonly int $userId) {}

    public function uniqueId(): string
    {
        return (string) $this->userId; // Jeden job per user naraz
    }

    public function handle(): void
    {
        // Przelicz statystyki dla tego użytkownika
    }
}

Jeśli wysyłasz RecalculateUserStatsJob(42) gdy jeden już jest w kolejce dla usera 42, drugi dispatch jest po cichu odrzucany.

ShouldBeUniqueUntilProcessing - zwalnia blokadę unikalności zaraz gdy job startuje (nie kończy), pozwalając na ponowne wysłanie podczas gdy aktualny działa:

use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class RecalculateUserStatsJob implements ShouldQueue, ShouldBeUniqueUntilProcessing

ShouldBeEncrypted - szyfruje payload joba w kolejce:

// app/Jobs/SendPasswordResetJob.php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class SendPasswordResetJob implements ShouldQueue, ShouldBeEncrypted
{
    public function __construct(
        public readonly string $email,
        public readonly string $token, // Niewidoczny w plain text w Redis
    ) {}
}

Używaj tego dla każdego joba niosącego wrażliwe dane - tokeny, PII, referencje płatności - które inaczej byłyby czytelne w Redis lub tabeli kolejki.

💥 Strategie błędów - Retry, Backoff, Dead Letter Queue

Podstawowa konfiguracja retry:

// app/Jobs/ProcessWebhookJob.php

declare(strict_types=1);

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessWebhookJob implements ShouldQueue
{
    public int $tries = 5;           // Łączna liczba prób przed oznaczeniem jako failed
    public int $maxExceptions = 3;   // Oznacz jako failed po 3 nieobsłużonych wyjątkach

    // Tablicowy backoff - czekaj 60s, potem 180s, potem 600s między próbami
    public array $backoff = [60, 180, 600];

    public function handle(): void
    {
        // Przetwórz payload webhooka
    }

    public function failed(\Throwable $e): void
    {
        // Wywoływane gdy job wyczerpał wszystkie próby
        logger()->error("Webhook trwale nie powiódł się: {$e->getMessage()}", [
            'job' => $this::class,
        ]);

        WebhookLog::markFailed($this->webhookId, $e->getMessage());
    }
}

retryUntil() - okno ponowień oparte na czasie:

public function retryUntil(): \DateTime
{
    // Ponawiaj przez 24 godziny, potem poddaj się
    return now()->addHours(24);
}

To lepsze niż stała liczba $tries dla jobów zależnych od zewnętrznych serwisów - ponawiasz przez sensowne okno czasowe, a nie liczbę prób.

Custom Dead Letter Queue - zapisz wszystkie nieudane joby do dedykowanej tabeli:

// database/migrations/2026_04_01_create_failed_jobs_detail_table.php

Schema::create('failed_jobs_detail', function (Blueprint $table) {
    $table->id();
    $table->string('job_class');
    $table->json('payload');
    $table->text('exception');
    $table->timestamp('failed_at');
});
// app/Jobs/Concerns/LogsFailure.php

declare(strict_types=1);

namespace App\Jobs\Concerns;

use App\Models\FailedJobDetail;

trait LogsFailure
{
    public function failed(\Throwable $e): void
    {
        FailedJobDetail::create([
            'job_class' => static::class,
            'payload'   => json_encode($this),
            'exception' => $e->getMessage() . "\n" . $e->getTraceAsString(),
            'failed_at' => now(),
        ]);
    }
}

Dodaj trait do każdego joba wymagającego niestandardowego śledzenia błędów, a następnie zbuduj prosty dashboard na failed_jobs_detail.

🧪 Testowanie z Bus::fake() i Queue::fake()

Queue::fake() - asercje na wysłanych jobach:

// tests/Feature/OrderControllerTest.php

declare(strict_types=1);

use App\Jobs\ProcessOrderJob;
use App\Jobs\SendConfirmationEmailJob;
use Illuminate\Support\Facades\Queue;

it('wysyła job przetwarzania przy tworzeniu zamówienia', function () {
    Queue::fake();

    $this->postJson('/api/v1/orders', [...])
        ->assertCreated();

    Queue::assertPushed(ProcessOrderJob::class);
    Queue::assertNotPushed(SendConfirmationEmailJob::class);
});

it('wysyła na właściwą kolejkę', function () {
    Queue::fake();

    ProcessOrderJob::dispatch($order);

    Queue::assertPushedOn('orders', ProcessOrderJob::class);
});

Bus::fake() - asercje na łańcuchach i batchach:

// tests/Feature/ProcessOrderActionTest.php

declare(strict_types=1);

use App\Actions\ProcessOrderAction;
use App\Jobs\ChargePaymentJob;
use App\Jobs\GenerateInvoiceJob;
use Illuminate\Support\Facades\Bus;

it('wysyła właściwy łańcuch jobów dla zamówienia', function () {
    Bus::fake();

    app(ProcessOrderAction::class)->handle($order->id);

    Bus::assertChained([
        ChargePaymentJob::class,
        UpdateInventoryJob::class,
        GenerateInvoiceJob::class,
        SendConfirmationEmailJob::class,
    ]);
});

it('wysyła batch przy masowym imporcie', function () {
    Bus::fake();

    app(ImportProductsAction::class)->handle($chunks);

    Bus::assertBatched(function ($batch) use ($chunks) {
        return $batch->jobs->count() === count($chunks)
            && $batch->name === 'Product Import';
    });
});

Testuj hook failed() bezpośrednio:

it('loguje błąd gdy job webhooka wyczerpie retry', function () {
    $job       = new ProcessWebhookJob($webhookId);
    $exception = new \RuntimeException('Timeout zewnętrznego serwisu');

    $job->failed($exception);

    expect(WebhookLog::where('webhook_id', $webhookId)->first()->status)
        ->toBe('failed');
});

🚀 Kolejki priorytetowe w Redis z Supervisor

Workery Laravel mogą słuchać na wielu kolejkach z określonym priorytetem. Joby na kolejkach wyższego priorytetu są przetwarzane pierwsze.

Wysyłaj do konkretnej kolejki:

// Wysoki priorytet - przetwarzanie płatności
ProcessPaymentJob::dispatch($order)->onQueue('high');

// Normalny priorytet - powiadomienia
SendEmailJob::dispatch($user)->onQueue('default');

// Niski priorytet - analityka, raporty
UpdateAnalyticsJob::dispatch($event)->onQueue('low');

Konfiguracja Supervisor - procesy workerów z kolejnością priorytetów:

; /etc/supervisor/conf.d/laravel-worker.conf

[program:laravel-worker-high]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/home/artisan queue:work redis --queue=high,default,low --sleep=3 --tries=3
autostart=true
autorestart=true
numprocs=4
redirect_stderr=true
stdout_logfile=/var/log/laravel-worker-high.log

[program:laravel-worker-low]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/home/artisan queue:work redis --queue=low --sleep=3 --tries=3
autostart=true
autorestart=true
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/laravel-worker-low.log

Kolejność --queue=high,default,low oznacza, że worker sprawdza najpierw kolejkę high, potem default, potem low. Cztery workery obsługują joby wysokiego priorytetu; jeden obsługuje pracę w tle niskiego priorytetu.

Kluczowa zasada: osobne workery dla różnych priorytetów zapobiegają zagłodzeniu jobów wysokiego priorytetu. Nawet jeśli low ma zaległy backlog 10k jobów analitycznych, workery płatności nie są tym dotknięte.

✅ Podsumowanie

  • Używaj batching gdy musisz przetworzyć grupę jobów i reagować na ich zbiorowe zakończenie - importy, operacje bulk, równoległe pipeline'y
  • Używaj chaining gdy kroki muszą się wykonywać sekwencyjnie i każdy zależy od poprzedniego - przetwarzanie zamówień, wieloetapowe przepływy
  • Dodawaj job middleware (WithoutOverlapping, ThrottlesExceptions, RateLimited) żeby wyrażać reguły per job bez zaśmiecania handle()
  • Używaj ShouldBeUnique żeby zapobiec duplikatom jobów dla tej samej encji przy częstym dispatchowaniu
  • Używaj ShouldBeEncrypted dla każdego joba niosącego PII, tokeny lub wrażliwe referencje
  • Buduj Dead Letter Queue na hooku failed() żeby mieć kontekst przy trwałych błędach
  • Pisz testy z Bus::fake() do asercji łańcuchów i batchów - to najcięższe bugi do złapania bez nich
  • Używaj kolejek priorytetowych z osobnymi procesami Supervisora żeby krytyczne joby nigdy nie czekały za pracą w tle

Obserwuj mnie na LinkedIn po więcej tipów z Laravel! Chcesz deep-dive w Laravel Pulse do monitorowania zdrowia kolejek na produkcji? Daj znać w komentarzach!

Wsparcie istniejącego systemu

Potrzebujesz pomocy z działającą aplikacją?

Pomagam firmom rozwijać działające systemy, porządkować wdrożenia i dodawać nowe funkcje bez dokładania chaosu do projektu.

Komentarze (0)
Zaloguj się, aby dodać komentarz

Musisz być zalogowany, aby dodać komentarz.

Zaloguj się

Potrzebujesz kogoś, kto weźmie odpowiedzialność za kolejny krok?

Porozmawiajmy o Twoim projekcie i określmy zakres, który ma sens dla Twoich celów.