Message Outbox Setup and Usage

Published at 24 August 2021

The outbox module consists of three parts, a repository, a dispatcher and a relay. The repository is responsible storing and retrieving messages. The dispatcher is responsible for putting messages in the repository and hooks into the core by implementing the MessageDispatcher interface. The relay is responsible for fetching messages from the repository and committing them (mark as consumed or delete).

Step 1: Set up your database schema

Your messages should be stored in the same database as our regular message repository does. This may sound like we’re needlessly storing twice but each table has its own responsibility, and we interact with the in distinct ways.

Messages stored in the outbox table are expected to be deleted or marked as consumed while our normal messages are intended to stay around for as long as we need to rebuild our aggregates.

Check out the database table schema.

Step 2: Set up your Outbox Repository

Once you’ve set up the table schema you can bootstrap your Outbox Repository. EventSauce supplies a couple of them:

All the outbox repositories listed are compatible with documented the table schema.

Step 3: Set up the message repository

Each of the supplied outbox repository packages also supplies a transactional message repository implementation which ensures the messages are stored in both repositories (outbox and normal for rebuilding) in a single transaction.

Generically speaking, the setup looks something like:

new SomeTransactionalMessageRepository(
    connection: $databaseConnection,
    messageRepository: $messageRepository,
    outboxRepository: $outboxRepository,
);

Use this newly set up repository as your main Message Repository implementation and you’re all set on the dispatching side.

Step 4: Set up the relay

The last thing to do, is to set up the relay mechanism. The relay is responsible for reading the messages from the buffer table so they can be pushed onto the queue. The relay takes your outbox repository and a consumer. This consumer (and implementation of the Consumer interface) is tasked with forwarding the messages to your queueing mechanism of choice.

use EventSauce\MessageOutbox\OutboxRelay;

$relay = new OutboxRelay(
    $yourRepository,
    $forwardingMessageConsumer,
    $backOffStrategy, /* optional */
    $commitStrategy, /* optional */
);

The relay should run in a background process. You can hook the relay into your (CLI) framework of choice. An implementation might look something like:

use EventSauce\MessageOutbox\OutboxRelay;

class RelayOutboxMessagesCommand implements Command
{
    private bool $shouldRun = true;
    public function __construct(private OutboxRelay $relay) {}
    public function main(): int
    {
        while($this->shouldRun) {
            $numberOfMessagesDispatched = $this->relay->publishBatch(
                batchSize: 100,
                commitSize: 1,
            );
            
            if ($numberOfMessagesDispatched === 0) {
                // when no messages are relayed, sleep to prevent infrastructure hammering
                sleep(1);
            }
        }
    }
}

The commitSize can be increased to increase throughput. When doing so, it may increase the likeliness of dispatching the same message to the queue twice in case of network failure.

Outbox Message Dispatcher

The outbox package provides a MessageDipatcher implementation that forwards the messages to the outbox repository. This implementation is useful when want to leverage EventSauce’s dispatcher capabilities in a non-event-sourced context.

use EventSauce\EventSourcing\Message;
use EventSauce\MessageOutbox\OutboxMessageDispatcher;

$messageDispatcher = new OutboxMessageDispatcher($yourRepository);

$messageDispatcher->dispatch(new Message(new YourEvent(1234)));

Back-off Strategy

A back-off strategy may be specified. By default, an exponential back-off strategy is used. For more information about the available strategies and configuration options, view the readme.

Commit Strategy

Messages can be committed using two strategies:

  • Mark message as consumed
  • Delete the message
$markAsConsumed = new EventSauce\MessageOutbox\MarkMessagesConsumedOnCommit();
$deleteMessage = new EventSauce\MessageOutbox\DeleteMessageOnCommit();
Frank de Jonge

EventSauce is a project by Frank de Jonge.