Event Sourcing

The framework’s Event Sourcing library allows you to set up an Event Sourcing infrastructure, that uses CQRS or not, based on controllers as the aggregate root (main command and main event listener) of each and every aggregate. The principle of it is that all of the software’s underlying commands and interactions with other systems (internal or external) will be called through the dispatching of events. These events are named ‘aggregate events’ and can be any custom event that the controller can dispatch through an event dispatcher, also known as the event bus. The event dispatcher will then notify any listener that is attached to this event.

Typically, an Event Sourcing aggregate will have Read Model and Policy listeners. These event listeners will then call aggregate commands by passing any values received through the aggregate event. In order to make things more simple, these values are normally wrapped inside an immutable value object that allows for a unified interface to deal with these values throughout the entire aggregate life cycle.

Since all of the aggregate’s underlying commands are called through events, logging the aggregate’s activities is made much more simple. Auditing and monitoring are added benefits that come with Event Sourcing.

To read further on Event Sourcing, please see the Martin Fowler’s definition and explanations on this subject.

Note

For more information on configuring an application’s event sourcing aggregates and the application’s event log, please see the Event Sourcing Configuration section.

Aggregates

An aggregate is defined as a collective of classes that work together in order to accomplish a specific task. When using aggregates with controllers as their aggregate root, this task, or common goal, is sending a response back for a specific request from the client.

Thus, the first step in defining an aggregate is to establish the aggregate root from within a controller’s factory() method when implementing the \Ascmvc\AscmvcControllerFactoryInterface interface, like so:

public static function factory(array &$baseConfig, EventDispatcher &$eventDispatcher, Container &$serviceManager, &$viewObject)
{
    // Setting the identifiers of this Event Dispatcher (event bus).
    // Subscribing this controller (Aggregate Root) and the Event Sourcing Logger.
    $eventDispatcher->setIdentifiers(
        [
            SomeController::class,
        ]
    );

    // Do something else...

    $controller = new SomeController($baseConfig, $eventDispatcher);

    $sharedEventManager = $eventDispatcher->getSharedManager();

    // Attaching this controller's listener method to the shared event manager's
    // corresponding identifier (see above).
    $sharedEventManager->attach(
        SomeController::class,
        '*',
        [$controller, 'someMethodName']
    );

    return $controller;
}

By setting the event dispatcher’s identifier to the controller class’ fully-qualified class name (FQCN) and by attaching the controller’s listener method (‘someMethodName’ in this example) with the controller’s name as the aggregate root’s name (first parameter of the event dispatcher’s attach() method) and with a wildcard symbol as the event’ name (second parameter of the same attach() method), we are, in fact, making this controller a listener to all of the aggregates events. This will allow the controller to determine what is left to be done, before a response can be considered to be completely finished. In an asynchronous environment like Swoole, this allows for simultaneous execution of multiple parts of the aggregate, without having to wait for one part to finish before another one can be executed.

Each part of the aggregate is then responsible of accomplishing its own subordinated task in order to fulfill the common goal. The way each part of the aggregate can interact with the other parts is by dispatching events through the event dispatcher.

Event Dispatcher

The default LightMVC event dispatcher is an instance of the \Ascmvc\EventSourcing\EventDispatcher class. It is a PSR-14 compliant event dispatcher. Therefore, you can replace this event dispatcher with any other PSR-14 compliant event dispatcher. Since the LightMVC event dispatcher is an extension of the \Zend\EventManager\EventManager, it is possible to use any of the known Zend event manager facilities.

Note

For more information on configuring an application’s event sourcing aggregates, please see the Event Sourcing Configuration section.

To dispatch aggregate events, it is a question of instantiating an aggregate value object and an aggregate event, and then using the event dispatcher’s dispatch() method to dispatch it to the attached listeners.

// The value object can be empty.
$aggregateValueObject = new AggregateImmutableValueObject();

// The aggregate even must receive an aggregate value object,
// the name of aggregate root, and the name of the event.
$event = new AggregateEvent(
    $aggregateValueObject,
    ProductsController::class,
    ProductsController::READ_REQUESTED
);

$this->eventDispatcher->dispatch($event);

The event dispatcher contains an instance of the \Zend\EventManager\SharedEventManager by default. This allows for the dispatching of events to other parts of the application, or for listening to events dispatched by other parts of the application.

For more information on the shared event manager, please see the Event Manager section.

Aggregate Events

The LightMVC \Ascmvc\EventSourcing\Event\AggregateEvent class is, ultimately, an extension of the \Zend\EventManager\Event class. The added facilities allow the dispatching code to define the name of the aggregate root, and to inject an aggregate value object to be shared with listeners. The framework defines two child event classes: \Ascmvc\EventSourcing\Event\ReadAggregateCompletedEvent and \Ascmvc\EventSourcing\Event\WriteAggregateCompletedEvent. These two classes are designed to make logging easier and to allow for dispatching to the Read Model and Policy listeners more convenient.

Aggregate Value Objects

An \Ascmvc\EventSourcing\AggregateImmutableValueObject object is an immutable value object that is designed to allow all parts of an aggregate to easily share any data through a common interface. An aggregate value object can be empty. Since this class implements the Serializable interface, it is possible to serialize its data into a string format. Finally, it allows its data to be hydrated into an array with its hydrateToArray() method.

Aggregate Event Listeners

All LightMVC listeners implement the \Ascmvc\EventSourcing\EventListenerInterface interface. This interface defines one single listener method named onEvent(). This being said, one can define any custom listener method, but the LightMVC Event Sourcing implementation recommends using the default onEvent() listener method for all event listeners.

There are two main types of listeners in the LightMVC Event Sourcing implementation. The \Ascmvc\EventSourcing\ReadModel class and the \Ascmvc\EventSourcing\Policy class.

Aggregate Read Models

The \Ascmvc\EventSourcing\ReadModel class is to be used to call a command that will read data from a given source. The Read Model is responsible of determining what is this data source and how to access it. The Read Model is therefore the class that is responsible of implementing CQRS, if need be.

Here is an example of a Read Model that calls a read command by passing to it all the necessary data and the needed database entity manager in order for the command to successfully execute itself and retrieve data from a ‘products’ table in a database:

<?php

namespace Application\ReadModels;

use Application\Commands\ReadProductsCommand;
use Application\Models\Entity\Products;
use Application\Models\Traits\DoctrineTrait;
use Ascmvc\EventSourcing\Event\Event;
use Ascmvc\EventSourcing\EventDispatcher;
use Ascmvc\EventSourcing\ReadModel;

class ProductsReadModel extends ReadModel
{
    use DoctrineTrait;

    protected $id;

    protected $products;

    protected $productsRepository;

    protected function __construct(EventDispatcher $eventDispatcher, Products $products)
    {
        parent::__construct($eventDispatcher);

        $this->products = $products;
    }

    public static function getInstance(EventDispatcher $eventDispatcher)
    {
        $productsEntity = new Products();

        return new self($eventDispatcher, $productsEntity);
    }

    public function onEvent(Event $event)
    {
        // The read connection can be different from the write connection if implementing full CQRS.
        $connName = $event->getApplication()->getBaseConfig()['events']['read_conn_name'];

        $entityManager = $event->getApplication()->getServiceManager()[$connName];

        $productsCommand = new ReadProductsCommand(
            $event->getAggregateValueObject(),
            $entityManager,
            $this->eventDispatcher
        );

        if (!is_null($productsCommand)) {
            $productsCommand->execute();
        }

        return;
    }
}

Then, from within the controller’s factory() method (or any other main AscmvcEvent method), the Read Model can then be attached to the aggregate’s event bus (event dispatcher) in this way:

// Controller's factory() method

$productsReadModel = ProductsReadModel::getInstance($eventDispatcher);

$eventDispatcher->attach(
    ProductsController::READ_REQUESTED,
    [$productsReadModel, 'onEvent']
);

Thus, the Read Model will listen for any event with the name ProductsController::READ_REQUESTED from within this aggregate.

Aggregate Policies

The \Ascmvc\EventSourcing\Policy class is to be used to call a command that will write data to a given source. The Policy is responsible of determining what data to write, where to store it and how to access the storage. The Policy is therefore the class that is responsible of implementing CQRS, if need be.

Here is an example of a Policy that calls a write command by passing to it all the necessary data and the needed database entity manager in order for the command to successfully execute itself and store the data to a ‘products’ table in a database:

<?php

namespace Application\Policies;

use Application\Commands\WriteProductsCommand;
use Application\Models\Traits\DoctrineTrait;
use Ascmvc\EventSourcing\Event\Event;
use Ascmvc\EventSourcing\EventDispatcher;
use Ascmvc\EventSourcing\Policy;

class ProductsPolicy extends Policy
{
    use DoctrineTrait;

    protected $properties;

    protected $products;

    protected $productsRepository;

    public static function getInstance(EventDispatcher $eventDispatcher)
    {
        return new self($eventDispatcher);
    }

    public function onEvent(Event $event)
    {
        $connName = $event->getApplication()->getBaseConfig()['events']['write_conn_name'];

        $entityManager = $event->getApplication()->getServiceManager()[$connName];

        $argv['name'] = $event->getName();

        $productsCommand = new WriteProductsCommand(
            $event->getAggregateValueObject(),
            $entityManager,
            $this->eventDispatcher,
            $argv
        );

        $productsCommand->execute();

        return;
    }
}

Then, from within the controller’s factory() method (or any other main AscmvcEvent method), the Policy can then be attached to the aggregate’s event bus (event dispatcher) in this way:

// Controller's factory() method

$productsPolicy = ProductsPolicy::getInstance($eventDispatcher);

// If there are many listeners to attach, one may use a
// Listener Aggregate that implements the \Zend\EventManager\ListenerAggregateInterface
// instead of attaching them one by one.
$eventDispatcher->attach(
    ProductsController::CREATE_REQUESTED,
    [$productsPolicy, 'onEvent']
);

$eventDispatcher->attach(
    ProductsController::UPDATE_REQUESTED,
    [$productsPolicy, 'onEvent']
);

$eventDispatcher->attach(
    ProductsController::DELETE_REQUESTED,
    [$productsPolicy, 'onEvent']
);

Note

To learn more about the \Zend\EventManager\ListenerAggregateInterface interface, please see the ZF documentation on Aggregate Listeners.

Thus, the Policy will listen for any of the above mentioned events from within this aggregate.

Aggregate Commands

The \Ascmvc\EventSourcing\Command is a very simple blueprint that defines common functionality to be used by all commands. Command classes should extend this base class and should represent an imperative that takes place within an aggregate. If one is to say “write this data about our products to the database”, one should extend the \Ascmvc\EventSourcing\Command class and name the class WriteProductsCommand within the namespace of the aggregate. Once the command has finished executing itself, it should dispatch a new aggregate event in order to notify listeners that the command is finished. Here is an example of what a WriteProductsCommand class could look like:

<?php

namespace Application\Commands;

use Application\Controllers\ProductsController;
use Application\Events\WriteProductsCompleted;
use Application\Models\Entity\Products;
use Application\Models\Repository\ProductsRepository;
use Ascmvc\EventSourcing\AggregateImmutableValueObject;
use Doctrine\ORM\Mapping\ClassMetadata;

class WriteProductsCommand extends ProductsCommand
{
    public function execute()
    {
        $name = $this->argv['name'];

        $args = $this->aggregateValueObject->getProperties();

        $productsRepository = new ProductsRepository(
            $this->entityManager,
            new ClassMetadata(Products::class)
        );

        $values = [];

        try {
            if ($name === ProductsController::CREATE_REQUESTED) {
                $productsRepository->save($args);
            } elseif ($name === ProductsController::UPDATE_REQUESTED) {
                $products = $this->entityManager->find(Products::class, $args['id']);

                $values['pre'] = [
                    'id' => $products->getId(),
                    'name' => $products->getName(),
                    'price' => $products->getPrice(),
                    'description' => $products->getDescription(),
                    'image' => $products->getImage(),
                ];

                $productsRepository->save($args, $products);
            } elseif ($name === ProductsController::DELETE_REQUESTED) {
                if (isset($args['id'])) {
                    $products = $this->entityManager->find(Products::class, $args['id']);
                    $productsRepository->delete($products);
                }
            }

            $params = ['saved' => 1];

            $values['post'] = $args;

            $aggregateValueObject = new AggregateImmutableValueObject($values);

            if ($name === ProductsController::CREATE_REQUESTED) {
                $event = new WriteProductsCompleted(
                    $aggregateValueObject,
                    ProductsController::class,
                    ProductsController::CREATE_COMPLETED
                );
            } elseif ($name === ProductsController::UPDATE_REQUESTED) {
                $event = new WriteProductsCompleted(
                    $aggregateValueObject,
                    ProductsController::class,
                    ProductsController::UPDATE_COMPLETED
                );
            } elseif ($name === ProductsController::DELETE_REQUESTED) {
                $event = new WriteProductsCompleted(
                    $aggregateValueObject,
                    ProductsController::class,
                    ProductsController::DELETE_COMPLETED
                );
            }

            $event->setParams($params);
        } catch (\Exception $e) {
            $event->setParam('error', 1);
        }

        $this->eventDispatcher->dispatch($event);
    }
}

This new class will then be ready to be called by a \Ascmvc\EventSourcing\Policy listener once the corresponding event will be dispatched by a command, whether it is the main command (controller action method) or a subordinate command.

Event Logger

LightMVC Framework’s Event Sourcing implementation comes with \Ascmvc\EventSourcing\EventLogger that will log any event based on two criteria: 1- any aggregate that has added the EventLogger class name to its event bus identifiers, and 2- any whitelisted (or not blacklisted) event class type. Concerning this second criterium, the logger will log all events if no classes were whitelisted or blacklisted. If one class is whitelisted or blacklisted, the logger will blacklist by default.

Also, it is possible to log events to a different database if a Doctrine ORM connection name is defined for it in the application’s configuration.

Note

For more information on configuring an application’s event log, please see the Event Sourcing Configuration section.

For a working example of Event Sourcing and CQRS with LightMVC, please use our skeleton application as it is explained in the section on the LightMVC Skeleton Application.