Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker per single queue #39

Merged
merged 5 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ Subscribing with client acknowledgement option (ENV variables):

```
STOMP_CONSUMER_WIN_SIZE=819200 // number of bytes that Broker will send to client before it expects ACK
STOMP_CONSUMER_ACK_MODE=client // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case)
STOMP_CONSUMER_ACK_MODE=auto // mode: client (ACK needs to be sent) | auto (no ACK, and window-size has to be -1 in that case)
```

Options for Laravel worker:

```
STOMP_CONSUMER_ALL_QUEUES = default; // which queue name(s) represent that all queues from Config should be read
STOMP_READ_MESSAGE_DB_LOG = false // write POP-ed events in DB table `stomp_event_logs`
```

You can see all other available ``.env`` variables, their defaults and usage explanation within
Expand Down
4 changes: 4 additions & 0 deletions config/asseco-stomp.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@

return [
'log_manager' => LogManager::class,

'migrations' => [
'run' => true,
],
];
15 changes: 13 additions & 2 deletions config/stomp.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@

/** If all messages should fail on timeout. Set to false in order to revert to default (looking in event payload) */
'fail_on_timeout' => env('STOMP_FAIL_ON_TIMEOUT', true),
/** Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly. */

/**
* Maximum time in seconds for job execution. This value must be less than send heartbeat in order to run correctly.
*/
'timeout' => env('STOMP_TIMEOUT', 45),

/**
Expand All @@ -56,6 +59,8 @@
*/
'default_queue' => env('STOMP_DEFAULT_QUEUE'),

'enable_read_events_DB_logs' => env('STOMP_READ_MESSAGE_DB_LOG', false) === true,

/**
* Use Laravel logger for outputting logs.
*/
Expand Down Expand Up @@ -89,7 +94,13 @@
/**
* Subscribe mode: auto, client.
*/
'consumer_ack_mode' => env('STOMP_CONSUMER_ACK_MODE', 'client'),
'consumer_ack_mode' => env('STOMP_CONSUMER_ACK_MODE', 'auto'),

/**
* Queue name(s) that represent that all queues should be read
* If no queue is specified, Laravel puts 'default' - so this should be entered here.
*/
'worker_queue_name_all' => explode(';', env('STOMP_CONSUMER_ALL_QUEUES', 'default;')),

/**
* Array of supported versions.
Expand Down
38 changes: 38 additions & 0 deletions migrations/2024_05_28_220001_create_stomp_event_log_table.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;

return new class extends Migration
{
/**
* Run the migrations.
*
* @return void
*/
public function up()
{
Schema::create('stomp_event_logs', function (Blueprint $table) {
$table->id();
$table->string('session_id')->nullable();
$table->string('queue_name')->nullable();
$table->string('subscription_id')->nullable();
$table->string('message_id')->nullable();

$table->text('payload')->nullable();

$table->timestamp('created_at')->useCurrent();
});
}

/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
Schema::dropIfExists('stomp_event_logs');
}
};
4 changes: 2 additions & 2 deletions src/Queue/Jobs/StompJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ protected function fireLaravelJob(): void
{
if ($this->laravelJobClassExists()) {
[$class, $method] = JobName::parse($this->payload['job']);
($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data']);
($this->instance = $this->resolve($class))->{$method}($this, $this->payload['data'] ?? []);
} else {
$this->log->error("$this->session [STOMP] Laravel job class does not exist!");
}
Expand Down Expand Up @@ -253,7 +253,7 @@ protected function failed($e)

try {
if (method_exists($this->instance = $this->resolve($class), 'failed')) {
$this->instance->failed($this->payload['data'], $e, $this->payload['uuid']);
$this->instance->failed($this->payload['data'] ?? [], $e, $this->payload['uuid']);
}
} catch (\Exception $e) {
$this->log->error('Exception in job failing: ' . $e->getMessage());
Expand Down
10 changes: 10 additions & 0 deletions src/Queue/Stomp/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,14 @@ protected static function appName(): string
{
return Str::snake(config('app.name', 'localhost'));
}

public static function queueNamesForProcessAllQueues()
{
return self::get('worker_queue_name_all');
}

public static function shouldReadMessagesBeLoggedToDB()
{
return self::get('enable_read_events_DB_logs');
}
}
100 changes: 83 additions & 17 deletions src/Queue/StompQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Illuminate\Queue\InvalidPayloadException;
use Illuminate\Queue\Queue;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Str;
use Psr\Log\LoggerInterface;
use Stomp\Exception\ConnectionException;
Expand All @@ -33,6 +34,7 @@ class StompQueue extends Queue implements QueueInterface
const CORRELATION = 'X-Correlation-ID';

const ACK_MODE_CLIENT = 'client';
const ACK_MODE_AUTO = 'auto';

/**
* Stomp instance from stomp-php repo.
Expand All @@ -53,8 +55,15 @@ class StompQueue extends Queue implements QueueInterface
protected int $circuitBreaker = 0;
protected string $session;

/** @var null|Frame */
protected $_lastFrame = null;
protected $_ackMode = 'client';

protected string $_ackMode = '';

protected array $_queueNamesForProcessAllQueues = [''];
protected bool $_customReadQueusDefined = false;

protected bool $_readMessagesLogToDb = false;

public function __construct(ClientWrapper $stompClient)
{
Expand All @@ -65,26 +74,31 @@ public function __construct(ClientWrapper $stompClient)

$this->session = $this->client->getClient()->getSessionId();

$this->_ackMode = strtolower(Config::get('consumer_ack_mode') ?? 'client');
$this->_ackMode = strtolower(Config::get('consumer_ack_mode') ?? self::ACK_MODE_AUTO);

// specify which queue names should be considered as "All queues from Config"
// "default" & ""
$this->_queueNamesForProcessAllQueues = Config::queueNamesForProcessAllQueues();
$this->_readMessagesLogToDb = Config::shouldReadMessagesBeLoggedToDB();
}

/**
* Append queue name to topic/address to avoid random hashes in broker.
*
* @param string|null $queuesString
* @return array
*/
protected function setReadQueues(): array
protected function setReadQueues(?string $queuesString = ''): array
{
$queues = $this->parseQueues(Config::readQueues());
$queuesString = $queuesString ?: Config::readQueues();
$queues = $this->parseQueues($queuesString);

foreach ($queues as &$queue) {
$default = Config::defaultQueue();

if (!str_contains($queue, self::AMQ_QUEUE_SEPARATOR)) {
$queue .= self::AMQ_QUEUE_SEPARATOR . $default . '_' . substr(Str::uuid(), -5);
continue;
}

if (Config::get('prepend_queues')) {
$topic = Str::before($queue, self::AMQ_QUEUE_SEPARATOR);
$queueName = Str::after($queue, self::AMQ_QUEUE_SEPARATOR);
Expand Down Expand Up @@ -218,7 +232,7 @@ protected function writeToMultipleQueues(array $writeQueues, Message $payload):
$this->log->info("$this->session [STOMP] Pushing stomp payload to queue: " . print_r([
'body' => $payload->getBody(),
'headers' => $payload->getHeaders(),
'queue' => $writeQueues,
'queues' => $writeQueues,
], true));

$allEventsSent = true;
Expand Down Expand Up @@ -349,6 +363,8 @@ protected function hasEvent($job): bool
*/
public function pop($queue = null)
{
$this->setReadQueuesForWorker($queue);

$this->ackLastFrameIfNecessary();

$frame = $this->read($queue);
Expand All @@ -366,21 +382,21 @@ public function pop($queue = null)
$queueFromFrame = $this->getQueueFromFrame($frame);

if (!$queueFromFrame) {
$this->log->error("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true));
$this->log->warning("$this->session [STOMP] Wrong frame received. Expected MESSAGE, got: " . print_r($frame, true));
$this->_lastFrame = null;

return null;
}

$this->_lastFrame = $frame;

$this->writeMessageToDBIfNeeded($frame, $queueFromFrame);

return new StompJob($this->container, $this, $frame, $queueFromFrame);
}

protected function read($queue)
{
// This will read from queue, then push on same session ID if there are events following, then delete event which was read
// If job fails, it will be re-pushed on same session ID but with additional headers for redelivery
try {
$this->log->info("$this->session [STOMP] POP");

Expand Down Expand Up @@ -457,7 +473,11 @@ protected function reconnect(bool $subscribe = true)

try {
$this->client->getClient()->connect();
$newSessionId = $this->client->getClient()->getSessionId();

$this->log->info("$this->session [STOMP] Reconnected successfully.");
$this->log->info("$this->session [STOMP] Switching session to: $newSessionId");
$this->session = $newSessionId;
} catch (Exception $e) {
$this->circuitBreaker++;

Expand All @@ -475,7 +495,7 @@ protected function reconnect(bool $subscribe = true)
}

// By this point it should be connected, so it is safe to subscribe
if ($this->client->getClient()->isConnected() && $subscribe) {
if ($subscribe && $this->client->getClient()->isConnected()) {
$this->log->info("$this->session [STOMP] Connected, subscribing...");
$this->subscribedTo = [];
$this->subscribeToQueues();
Expand All @@ -497,23 +517,29 @@ public function disconnect()
}
}

/**
* Subscribe to queues.
*
* @return void
*/
protected function subscribeToQueues(): void
{
$winSize = Config::get('consumer_window_size');
if ($this->_ackMode != self::ACK_MODE_CLIENT) {
// New Artemis version can't work without this as it will consume only first message otherwise.
$winSize = -1;
}

foreach ($this->readQueues as $queue) {
$alreadySubscribed = in_array($queue, $this->subscribedTo);

if ($alreadySubscribed) {
continue;
}

$winSize = Config::get('consumer_window_size') ?: 8192000;
if ($this->_ackMode != self::ACK_MODE_CLIENT) {
$winSize = -1;
}
$this->log->info("$this->session [STOMP] subscribeToQueue `$queue` with ack-mode: {$this->_ackMode} & window-size: $winSize");

$this->client->subscribe($queue, null, $this->_ackMode, [
// New Artemis version can't work without this as it will consume only first message otherwise.
//'consumer-window-size' => '-1',
// we can define this if we are using ack mode = client
'consumer-window-size' => (string) $winSize,
]);
Expand All @@ -530,8 +556,48 @@ protected function subscribeToQueues(): void
public function ackLastFrameIfNecessary()
{
if ($this->_ackMode == self::ACK_MODE_CLIENT && $this->_lastFrame) {
$this->log->debug("$this->session [STOMP] ACK-ing last frame. Msg #" . $this->_lastFrame->getMessageId());
$this->client->ack($this->_lastFrame);
$this->_lastFrame = null;
}
}

/**
* Set read queues for queue worker, if queue parameter is defined
* > php artisan queue:work --queue=eloquent::live30.
*
* @param $queue
* @return void
*/
protected function setReadQueuesForWorker($queue)
{
if ($this->_customReadQueusDefined) {
// already setup
return;
}

$queue = (string) $queue;
if (!in_array($queue, $this->_queueNamesForProcessAllQueues)) {
// one or more queue
$this->readQueues = $this->setReadQueues($queue);
}

$this->_customReadQueusDefined = true;
}

protected function writeMessageToDBIfNeeded(Frame $frame, $queueFromFrame)
{
if ($this->_readMessagesLogToDb) {
DB::table('stomp_event_logs')->insert(
[
'session_id' => $this->session,
'queue_name' => $queueFromFrame,
'subscription_id' => $frame['subscription'],
'message_id' => $frame->getMessageId(),
'payload' => print_r($frame, true),
'created_at' => date('Y-m-d H:i:s.u'),
]
);
}
}
}
8 changes: 8 additions & 0 deletions src/StompServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,13 @@ public function boot()

return $logsEnabled ? new $logManager($app) : new NullLogger();
});

if (config('asseco-stomp.migrations.run')) {
$this->loadMigrationsFrom(__DIR__ . '/../migrations');
}

$this->publishes([
__DIR__ . '/../migrations' => database_path('migrations'),
], 'asseco-stomp');
}
}
Loading