Skip to content

Child process queue workers #450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/nativephp.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,12 @@
],
],
],

'queue_workers' => [
'default' => [
'queues' => ['default'],
'memory_limit' => 128,
'timeout' => 60,
],
],
];
12 changes: 12 additions & 0 deletions src/Contracts/QueueWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace Native\Laravel\Contracts;

use Native\Laravel\DTOs\QueueConfig;

interface QueueWorker
{
public function up(QueueConfig $config): void;

public function down(string $alias): void;
}
35 changes: 35 additions & 0 deletions src/DTOs/QueueConfig.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Native\Laravel\DTOs;

class QueueConfig
{
/**
* @param array<int, string> $queuesToConsume
*/
public function __construct(
public readonly string $alias,
public readonly array $queuesToConsume,
public readonly int $memoryLimit,
public readonly int $timeout,
) {}

/**
* @return array<int, self>
*/
public static function fromConfigArray(array $config): array
{
return array_map(
function (array|string $worker, string $alias) {
return new self(
$alias,
$worker['queues'] ?? ['default'],
$worker['memory_limit'] ?? 128,
$worker['timeout'] ?? 60,
);
},
$config,
array_keys($config),
);
}
}
4 changes: 2 additions & 2 deletions src/Events/ChildProcess/ErrorReceived.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
namespace Native\Laravel\Events\ChildProcess;

use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class ErrorReceived implements ShouldBroadcast
class ErrorReceived implements ShouldBroadcastNow
{
use Dispatchable, SerializesModels;

Expand Down
4 changes: 2 additions & 2 deletions src/Events/ChildProcess/MessageReceived.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
namespace Native\Laravel\Events\ChildProcess;

use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class MessageReceived implements ShouldBroadcast
class MessageReceived implements ShouldBroadcastNow
{
use Dispatchable, SerializesModels;

Expand Down
4 changes: 2 additions & 2 deletions src/Events/ChildProcess/ProcessExited.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
namespace Native\Laravel\Events\ChildProcess;

use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class ProcessExited implements ShouldBroadcast
class ProcessExited implements ShouldBroadcastNow
{
use Dispatchable, SerializesModels;

Expand Down
4 changes: 2 additions & 2 deletions src/Events/ChildProcess/ProcessSpawned.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
namespace Native\Laravel\Events\ChildProcess;

use Illuminate\Broadcasting\Channel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class ProcessSpawned implements ShouldBroadcast
class ProcessSpawned implements ShouldBroadcastNow
{
use Dispatchable, SerializesModels;

Expand Down
29 changes: 29 additions & 0 deletions src/Facades/QueueWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace Native\Laravel\Facades;

use Illuminate\Support\Facades\Facade;
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
use Native\Laravel\DTOs\QueueConfig;
use Native\Laravel\Fakes\QueueWorkerFake;

/**
* @method static void up(QueueConfig $config)
* @method static void down(string $alias)
*/
class QueueWorker extends Facade
{
public static function fake()
{
return tap(static::getFacadeApplication()->make(QueueWorkerFake::class), function ($fake) {
static::swap($fake);
});
}

protected static function getFacadeAccessor(): string
{
self::clearResolvedInstance(QueueWorkerContract::class);

return QueueWorkerContract::class;
}
}
61 changes: 61 additions & 0 deletions src/Fakes/QueueWorkerFake.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

namespace Native\Laravel\Fakes;

use Closure;
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
use Native\Laravel\DTOs\QueueConfig;
use PHPUnit\Framework\Assert as PHPUnit;

class QueueWorkerFake implements QueueWorkerContract
{
/**
* @var array<int, QueueConfig>
*/
public array $ups = [];

/**
* @var array<int, string>
*/
public array $downs = [];

public function up(QueueConfig $config): void
{
$this->ups[] = $config;
}

public function down(string $alias): void
{
$this->downs[] = $alias;
}

public function assertUp(Closure $callback): void
{
$hit = empty(
array_filter(
$this->ups,
fn (QueueConfig $up) => $callback($up) === true
)
) === false;

PHPUnit::assertTrue($hit);
}

public function assertDown(string|Closure $alias): void
{
if (is_callable($alias) === false) {
PHPUnit::assertContains($alias, $this->downs);

return;
}

$hit = empty(
array_filter(
$this->downs,
fn (string $down) => $alias($down) === true
)
) === false;

PHPUnit::assertTrue($hit);
}
}
20 changes: 20 additions & 0 deletions src/NativeServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
use Native\Laravel\Contracts\ChildProcess as ChildProcessContract;
use Native\Laravel\Contracts\GlobalShortcut as GlobalShortcutContract;
use Native\Laravel\Contracts\PowerMonitor as PowerMonitorContract;
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
use Native\Laravel\Contracts\WindowManager as WindowManagerContract;
use Native\Laravel\DTOs\QueueConfig;
use Native\Laravel\Events\EventWatcher;
use Native\Laravel\Exceptions\Handler;
use Native\Laravel\GlobalShortcut as GlobalShortcutImplementation;
Expand Down Expand Up @@ -73,6 +75,10 @@ public function packageRegistered()
return $app->make(PowerMonitorImplementation::class);
});

$this->app->bind(QueueWorkerContract::class, function (Foundation $app) {
return $app->make(QueueWorker::class);
});

if (config('nativephp-internal.running')) {
$this->app->singleton(
\Illuminate\Contracts\Debug\ExceptionHandler::class,
Expand Down Expand Up @@ -112,6 +118,11 @@ protected function configureApp()

config(['session.driver' => 'file']);
config(['queue.default' => 'database']);

// XXX: This logic may need to change when we ditch the internal web server
if (! $this->app->runningInConsole()) {
$this->fireUpQueueWorkers();
}
}

protected function rewriteStoragePath()
Expand Down Expand Up @@ -210,4 +221,13 @@ protected function configureDisks(): void
]);
}
}

protected function fireUpQueueWorkers(): void
{
$queueConfigs = QueueConfig::fromConfigArray(config('nativephp.queue_workers'));

foreach ($queueConfigs as $queueConfig) {
$this->app->make(QueueWorkerContract::class)->up($queueConfig);
}
}
}
47 changes: 47 additions & 0 deletions src/QueueWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace Native\Laravel;

use Native\Laravel\Contracts\ChildProcess as ChildProcessContract;
use Native\Laravel\Contracts\QueueWorker as QueueWorkerContract;
use Native\Laravel\DTOs\QueueConfig;

class QueueWorker implements QueueWorkerContract
{
public function __construct(
private readonly ChildProcessContract $childProcess,
) {}

public function up(string|QueueConfig $config): void
{
if (is_string($config) && config()->has("nativephp.queue_workers.{$config}")) {
$config = QueueConfig::fromConfigArray([
$config => config("nativephp.queue_workers.{$config}"),
])[0];
}

if (! $config instanceof QueueConfig) {
throw new \InvalidArgumentException("Invalid queue configuration alias [$config]");
}

$this->childProcess->php(
[
'-d',
"memory_limit={$config->memoryLimit}M",
'artisan',
'queue:work',
"--name={$config->alias}",
'--queue='.implode(',', $config->queuesToConsume),
"--memory={$config->memoryLimit}",
"--timeout={$config->timeout}",
],
$config->alias,
persistent: true,
);
}

public function down(string $alias): void
{
$this->childProcess->stop($alias);
}
}
66 changes: 66 additions & 0 deletions tests/DTOs/QueueWorkerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

use Illuminate\Support\Arr;
use Native\Laravel\DTOs\QueueConfig;

test('the factory method generates an array of config objects for several formats', function (array $config) {
$configObject = QueueConfig::fromConfigArray($config);

expect($configObject)->toBeArray();
expect($configObject)->toHaveCount(count($config));

foreach ($config as $alias => $worker) {
if (is_string($worker)) {
expect(
Arr::first(
array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker))
)->queuesToConsume->toBe(['default']
);

expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->memoryLimit->toBe(128);
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $worker)))->timeout->toBe(60);

continue;
}

expect(
Arr::first(
array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias))
)->queuesToConsume->toBe($worker['queues'] ?? ['default']
);

expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->memoryLimit->toBe($worker['memory_limit'] ?? 128);
expect(Arr::first(array_filter($configObject, fn (QueueConfig $config) => $config->alias === $alias)))->timeout->toBe($worker['timeout'] ?? 60);
}
})->with([
[
'queue_workers' => [
'some_worker' => [
'queues' => ['default'],
'memory_limit' => 64,
'timeout' => 60,
],
],
],
[
'queue_workers' => [
'some_worker' => [],
'another_worker' => [],
],
],
[
'queue_workers' => [
'some_worker' => [
],
'another_worker' => [
'queues' => ['default', 'another'],
],
'yet_another_worker' => [
'memory_limit' => 256,
],
'one_more_worker' => [
'timeout' => 120,
],
],
],
]);
Loading
Loading