Event Sourcing / CQRS
Event Sourcing is a technical approach used in software development to capture and persist the state of an application as a series of events. Instead of directly storing the current state of an object, Event Sourcing focuses on recording every state change as an immutable Event
.
In this pattern, events represent significant actions or changes that occur within an object - named Aggregate
or aggregate root. Each Event
carries all the relevant information necessary to reconstruct the aggregate's state at any point in time. These events are stored in an EventStore
, which acts as the single source of truth for all aggregates for the whole application's history.
To reconstruct the current state of a single aggregate, all Events in the EventStore belonging to this particular object are sequentially applied in order. By replaying the events, the aggregate's state can be determined at any specific moment in time.
Command Query Responsibility Segregation (CQRS) is a software architecture pattern that separates the read and write operations of an application's data model. It advocates using distinct models for handling read (Query
) and write (Command
) operations to achieve better scalability, performance, and maintainability. By decoupling the data models, CQRS allows optimizing each model based on its specific requirements, leading to a more efficient and flexible system design.
Both concepts were implemented in this framework and connected in the Facade
. The facade is the central interface for utilizing the framework and is responsible for dispatching commands and queries, as well as handling events and distributing events through message brokers to other services.
Overall, the framework offers the following components, which will be described in detail here:
- Facade
- Aggregates
- Stores (EventStore, CommandStore, CacheStore, DataStore)
- Repositories (EventRepository, AggregateRepository, CommandRepository)
- Data Providers (EventDataProvider, CommandDataProvider, QueryDataProvider)
- Busses (CommandBus, EventBus)
- Broker
- Handlers (EventHandler, CommandHandler, QueryHandler)
- Domain
- Helper functions
Facade
To use this framework, it is necessary to create a Facade
instance, which serves as the central interface for utilizing the framework. The facade instance itself is a struct that fullfills the Facade
interface and can be created using the NewFacade
function. The NewFacade function, as well as most other constructor methods, implement the Options Pattern
, allowing dynamic configuration.
import (
"github.com/gradientzero/es-cqrs-base/base"
)
func mySetup() {
// with default options
facade := base.NewFacade()
// using functional optionals
facade := base.NewFacade(
base.FacadeOptionWithEventStore(myEventStore),
base.FacadeOptionWithCommandStore(myCommandStore),
...
)
}
The Facade
interface offers the following methods, which are essential for operating an Event Sourcing + CQRS application:
type Facade interface {
...
AddEventHandler(eventHandlerList ...EventHandler) error
AddCommandHandler(commandHandlerList ...CommandHandler) error
AddQueryHandler(queryHandlerList ...QueryHandler) error
...
DispatchCommand(cmd Command) (*Response, error)
DispatchQuery(qry Query) (*Response, error)
}
The AddEventHandler
, AddCommandHandler
, and AddQueryHandler
methods are used to register handlers for the various types of events, commands and queries that can be processed by the facade. The DispatchCommand
and DispatchQuery
methods are used to dispatch commands and queries, respectively. The Response
struct returned by these methods contains the result of the command or query, as well as any errors that may have occurred.
To create and dispatch a new Command
, you can use the following code as an example:
cmd := base.NewCommand(&CommandDoSomething{
NewAggregateUuid: "anyAggregateUuid",
Name: "anyName",
})
res, err := facade.DispatchCommand(cmd)
// res of type `Response` with `RequestUuid` set (or `Error`)
Since Commands operate on a fire-and-forget principle, only the RequestUuid
is returned. This RequestUuid can be used to query the status of the Command. To do this, you pass the RequestUuid to the CacheStore
, which retrieves the status of the Command.
val, err := facade.GetCacheStore().Get(
base.CacheStoreGetOptionWithKey(requestUuid),
)
// val can be either a number (representing the number of newly generated events) or a string containing an error message
On the other hand, Queries are processed directly and return the desired result. To dispatch a new Query
, do the following:
qry := base.NewQuery(&QueryGetSomething{
AggregateUuid: "anyAggregateUuid",
Params: "anyParams",
})
res, err := facade.DispatchQuery(qry)
// res of type `Response` with `Item` or `Items` set (or `Error`)
Aggregate (or Aggregate Root)
The concept of Aggregate Roots comes from Domain-Driven Design (DDD) and plays a crucial role in modeling domain entities in complex systems. An Aggregate Root is a specific type of domain entity that serves as a boundary and a consistency boundary for a group of related objects.
In Event Sourcing, an Aggregate Root is the object that is being retrieved from the AggregateRepository
, which uses the EventRepository
, which finally uses the underlying EventStore
. The Aggregate Root is responsible for handling actions (also named intentions) and generating the corresponding events. The Aggregate Root is also responsible for applying the events to itself and thus reconstructing its state.
In this framework an Aggregate Root is defined as an interface as follows:
type Aggregate interface {
GetAggregateType() string
GetAggregateUuid() string
SetAggregateUuid(aggregateUuid string)
GetVersion() int64
GetUncommittedEvents() []Event
ClearUncommittedEvents()
ApplyEvent(evt Event, commit bool) error
}
The framework also provides a default implementation of the Aggregate
interface called BaseAggregate
, which already fullfills the interface methods. The BaseAggregate
struct must be embedded into a user defined Aggregate
struct. By doing so, the BaseAggregate
takes care of many essential tasks required for Event Sourcing automatically, such as versioning and various other methods.
Stores
The framework provides four stores as interfaces and in-memory implementations. The most important store is the EventStore
, which is responsible for persisting events. The CommandStore
is used to persist commands, allowing to trace which command generated which events. The CacheStore
serves two purposes: to temporarily persist the status of a command and to manage short-lived key-value pairs. The DataStore
is used to persist data or other binary formats.
EventStore
The EventStore
serves as the core component within an Event Sourcing application, playing a pivotal role in the storage and retrieval of all Events generated by aggregates. It acts as a historical ledger, preserving the sequence of events that occurred in the system over time. With the EventStore, it becomes possible to reconstruct the state of an aggregate at any desired moment.
The EventStore
is defined as an interface
, outlining the contract that must be fulfilled by any custom implementation. Users are responsible for implementing this interface, allowing them to choose the most suitable storage solution for their specific needs.
An in-memory implementation of the EventStore does already exist, referred to as eventStoreMemory
. If the user does not provide an alternative EventStore
during setup, eventStoreMemory
is used as the default storage solution. This in-memory implementation is convenient for testing and prototyping purposes. The EventStore
interface is defined as follows:
type EventStore interface {
Init(opts ...EventStoreOption) error
Create(opts ...EventStoreCreateOption) error
Get(opts ...EventStoreGetOption) (Event, error)
List(opts ...EventStoreListOption) ([]Event, error)
Update(opts ...EventStoreUpdateOption) error
Delete(opts ...EventStoreDeleteOption) error
Close() error
Options() EventStoreOptions
String() string
Reset() error
}
The Facade
interface offers the method to retrieve the registered EventStore
:
type Facade interface {
...
GetEventStore() EventStore
...
}
Both the Facade
and the user do not directly interact with the EventStore
. The EventStore manages events in their purest form, without any knowledge of the underlying contents (deserialized form of EventData). When the EventStore is queried for events, it returns them in their raw format, as they are stored in the database. The deserialization of EventData is not handled by the EventStore, but by the EventRepository
, which deserializes the EventData, making it easily accessible for the caller.
CommandStore
The inclusion of the CommandStore
in the framework is for the sake of completeness. Every Command
processed by the Facade is persisted in the CommandStore. Simultaneously, the Facade ensures that the resulting events are associated with the corresponding command, making it possible to trace the origin of the events. The CommandStore is defined as an interface and can be implemented by the user.
An in-memory implementation of the CommandStore does already exist, referred to as commandStoreMemory
. If the user does not provide an alternative CommandStore
during setup, commandStoreMemory
is used as the default storage solution. This in-memory implementation is convenient for testing and prototyping purposes. The CommandStore
interface is defined as follows:
type CommandStore interface {
Init(opts ...CommandStoreOption) error
Create(opts ...CommandStoreCreateOption) error
Get(opts ...CommandStoreGetOption) (Command, error)
List(opts ...CommandStoreListOption) ([]Command, error)
Update(opts ...CommandStoreUpdateOption) error
Delete(opts ...CommandStoreDeleteOption) error
Close() error
Options() CommandStoreOptions
String() string
Reset() error
}
The Facade
interface offers the method to retrieve the registered CommandStore
:
type Facade interface {
...
GetCommandStore() CommandStore
...
}
CacheStore
The CacheStore
is used for temporary storage. It is used by the Facade
to store the status of a Command
and can be used by users to manage short-lived key-value pairs. The CacheStore is defined as an interface and can be implemented by the user.
An in-memory implementation of the CacheStore does already exist, referred to as cacheStoreMemory
. If the user does not provide an alternative CacheStore
during setup, cacheStoreMemory
is used as the default storage solution. This in-memory implementation is convenient for testing and prototyping purposes. The CacheStore
interface is defined as follows:
type CacheStore interface {
Init(opts ...CacheStoreOption) error
Get(opts ...CacheStoreGetOption) (interface{}, error)
Set(opts ...CacheStoreSetOption) error
Delete(opts ...CacheStoreDeleteOption) error
Close() error
Options() CacheStoreOptions
String() string
Reset() error
}
The Facade
interface offers the method to retrieve the registered CacheStore
:
type Facade interface {
...
GetCacheStore() CacheStore
...
}
DataStore
The DataStore
is utilized for data storage purposes. It is defined as an interface and can be implemented by the user.
An in-memory implementation of the DataStore does already exist, referred to as dataStoreMemory
. If the user does not provide an alternative DataStore
during setup, dataStoreMemory
is used as the default storage solution. This in-memory implementation is convenient for testing and prototyping purposes. The DataStore
interface is defined as follows:
type DataStore interface {
Init(opts ...DataStoreOption) error
Get(opts ...DataStoreGetOption) ([]byte, error)
Set(opts ...DataStoreSetOption) error
Copy(opts ...DataStoreCopyOption) error
Delete(opts ...DataStoreDeleteOption) error
Close() error
Options() DataStoreOptions
String() string
Reset() error
}
The Facade
interface offers the method to retrieve the registered DataStore
:
type Facade interface {
...
GetDataStore() DataStore
...
}
Repositories
The framework provides three repositories: EventRepository
, AggregateRepository
, and CommandRepository
. Each repository is responsible for loading data from the corresponding stores and transforming it into the appropriate format. When loading events from the EventRepository, the user can access all information of the event in its entirety. On the other hand, the AggregateRepository is designed not only to load events completely but also to convert them into an aggregate representation. The CommandRepository, on the other hand, is focused on fully loading commands.
Repositories are integral components of the Facade and cannot be overridden by the user. They play a crucial role in retrieving and transforming data, ensuring seamless access to events, aggregates, and commands, and simplifying data manipulation within the application.
EventRepository
Unlike the EventStore
, which focuses solely on managing the raw events, the EventRepository
provides the capability to load the events with fully deserialized data. This feature is crucial for various uses, such as in the EventHandler
, where events no longer need to be deserialized manually. Instead, an EventHandler can confidently rely on the fact that the events to be processed are fully available with their data fully deserialized. This streamlines the processing of events and simplifies the implementation of EventHandlers, improving overall efficiency and maintainability.
The EventRepository
is defined as an interface as follows:
type EventRepository interface {
AddEvent(opts ...EventRepositoryAddOption) error
GetEvent(opts ...EventRepositoryGetOption) (Event, error)
ListEvents(opts ...EventRepositoryListOption) ([]Event, error)
}
The EventRepository
interface is implemented by the eventRepository
struct, which is included in the framework. The eventRepository
struct is a wrapper around the EventStore and provides the functionality to load events with fully deserialized data. The EventRepository is handled internally by the Facade
and unlike the EventStore
can not be overwritten by the user.
The Facade
interface offers the method to retrieve the registered EventRepository
:
type Facade interface {
...
GetEventRepository() EventRepository
...
}
AggregateRepository
The AggregateRepository
is a wrapper around the EventRepository
and provides the functionality to load aggregates with fully deserialized data. The AggregateRepository
is defined as an interface
as follows:
type AggregateRepository interface {
GetAggregate(aggregateUuid string, NewAggregate func() Aggregate) (Aggregate, error)
ListAggregates(NewAggregate func() Aggregate) ([]Aggregate, error)
}
To enable the loading of an Aggregate
, we need to pass aggregate's constructor function NewAggregate
to the AggregateRepository
. The AggregateRepository
itself lacks the knowledge and capability to directly instantiate concrete Aggregate
instances. By providing the NewAggregate
function, the AggregateRepository
can dynamically create new instances of Aggregates as needed. The NewAggregate
function acts as a factory method, responsible for generating new instances of the desired Aggregate
type when requested by the AggregateRepository
. This way, the AggregateRepository can efficiently manage and manipulate Aggregates without requiring any knowledge of their internal construction details.
The AggregateRepository
interface is fullfilled by the aggregateRepository
struct, which is an integral part of this framework. The AggregateRepository
is managed internally by the Facade
and, like the EventRepository
, cannot be overridden by the user.
One of the notable extensions to the AggregateRepository
is the ability to load aggregates with fully deserialized data, which is facilitated through the use of EventRepository
and by applying corresponding Event Sourcing methods (base.ApplyEvent
etc). This design enables the AggregateRepository
to efficiently handle the reconstruction of aggregates from events, providing seamless access to the fully deserialized data.
Indeed, the AggregateRepository
offers opportunities to implement optimizations for loading aggregates. By leveraging techniques like snapshots or caching mechanisms, loading processes can be significantly accelerated on the write side. However, these optimizations are not implemented in this framwork yet.
The Facade
interface offers the method to retrieve the registered AggregateRepository
:
type Facade interface {
...
GetAggregateRepository() AggregateRepository
...
}
CommandRepository
The CommandRepository
is a wrapper around the CommandStore
and provides the functionality to load commands with fully deserialized data. The CommandRepository
is defined as an interface
as follows:
type CommandRepository interface {
AddCommand(opts ...CommandRepositoryAddOption) error
GetCommand(opts ...CommandRepositoryGetOption) (Command, error)
ListCommands(opts ...CommandRepositoryListOption) ([]Command, error)
}
The Facade
interface offers the method to retrieve the registered CommandRepository
:
type Facade interface {
...
GetCommandRepository() CommandRepository
...
}
Data Providers
The Data Providers are components that have the knowledge of how to deserialize specific data types. They are utilized by the repositories to load data into the corresponding data types effectively. The Data Providers are an integral part of the Facade and cannot be overridden by the user. They are automatically populated with information internally when Handlers are added and can then be used by the repositories. For all three serializable or deserializable data types, there are corresponding data providers: EventDataProvider
, CommandDataProvider
, and QueryDataProvider
.
EventDataProvider
The EventDataProvider
is directly managed by the Facade
and is fed with EventData
as soon as an EventHandler
is added to the Facade. The EventDataProvider is defined as an interface and cannot be overridden by the user. The EventDataProvider interface is defined as follows:
type EventDataProvider interface {
AddEventData(evtData EventData) error
GetEventDataList() []EventData
CanProvideEventData(evt Event) bool
ProvideEventData(evt Event) error
}
The Facade
interface offers the method to retrieve the internal EventDataProvider
:
type Facade interface {
...
GetEventDataProvider() EventDataProvider
...
}
The user can access all EventData
types through the EventDataProvider
, allowing them to identify which events are known to the Facade
. This capability provides transparency into the events that the Facade is aware of, enabling users to understand the available event types and work with them accordingly.
CommandDataProvider
The CommandDataProvider
is directly managed by the Facade
and is fed with CommandData
as soon as an CommandHandler
is added to the Facade. The CommandDataProvider is defined as an interface and cannot be overridden by the user. The CommandDataProvider interface is defined as follows:
type CommandDataProvider interface {
RegisterCommandData(cmdData CommandData) error
GetCommandDataList() []CommandData
CanProvideCommandData(cmd Command) bool
ProvideCommandData(cmd Command) error
}
The Facade
interface offers the method to retrieve the internal CommandDataProvider
:
type Facade interface {
...
GetCommandDataProvider() CommandDataProvider
...
}
The user can access all CommandData
types through the CommandDataProvider
, allowing them to identify which commands are known to the Facade
. This capability provides transparency into the commands that the Facade is aware of, enabling users to understand the available command types and work with them accordingly.
QueryDataProvider
The QueryDataProvider
is directly managed by the Facade
and is fed with QueryData
as soon as an QueryHandler
is added to the Facade. The QueryDataProvider is defined as an interface and cannot be overridden by the user. The QueryDataProvider interface is defined as follows:
type QueryDataProvider interface {
AddQueryData(qryData QueryData) error
GetQueryDataList() []QueryData
CanProvideQueryData(qry Query) bool
ProvideQueryData(qry Query) error
}
The Facade
interface offers the method to retrieve the internal QueryDataProvider
:
type Facade interface {
...
GetQueryDataProvider() QueryDataProvider
...
}
The user can access all QueryData
types through the QueryDataProvider
, allowing them to identify which queries are known to the Facade
. This capability provides transparency into the queries that the Facade is aware of, enabling users to understand the available query types and work with them accordingly.
Bus
Buses are managed by the Facade
and cannot be overridden by the user. They function as queues, where new information is appended and processed one by one. In this framework, two crucial buses are the EventBus
and the CommandBus
. The Facade has the capability to hold up to 100 items (Commands and Events) in advance before rejecting new items. By utilizing buses, the Facade effectively manages the flow of commands and events, ensuring smooth and controlled processing of information in a manageable and efficient manner.
Both interfaces, CommandBus
and EventBus
, are designed in a general way, allowing them to be versatile and adaptable. This design decision also enables the Broker
interface to utilize both CommandBus and EventBus.
CommandBus
When the user sends a new Command
, it is first added to the CommandBus
and later picked up by the Facade. The framework provides an in-memory implementation of the CommandBus
called commandBusMemory
. The CommandBus
interface is defined as follows:
type CommandBus interface {
CanPublishToCommandBus() bool
PublishToCommandBus(dataBytes []byte) error
CanSubscribeToCommandBus() bool
SubscribeToCommandBus() <-chan []byte
}
Since commandBusMemory
is internally managed by the Facade
, both the CanPublishToCommandBus
and CanSubscribeToCommandBus
methods always return true
. In the PublishToCommandBus
method, the Command
(as byte array
) is added to a buffered channel
, which can hold up to 100 commands simultaneously. This allows for efficient handling and queuing of commands without blocking the main execution flow. On the other hand, in the SubscribeToCommandBus
method, the buffered channel is simply returned. The Facade utilizes this channel to retrieve the next Command
(as byte array
) in a separate Goroutine
to process it.
The Facade
interface offers the method to retrieve the internal CommandBus
:
type Facade interface {
...
GetCommandBus() CommandBus
...
}
EventBus
When a Command
is executed, one Event
(or more) is typically generated as a result. These events are stored on the internal EventBus
so that the Facade
can invoke the corresponding EventHandlers. The framework includes an in-memory implementation of the EventBus called eventBusMemory
. The EventBus
interface is defined as follows:
type EventBus interface {
CanPublishToEventBus() bool
PublishToEventBus(dataBytes []byte) error
CanSubscribeToEventBus() bool
SubscribeToEventBus() <-chan []byte
}
Since eventBusMemory
is internally managed by the Facade
, both the CanPublishToEventBus
and CanSubscribeToEventBus
methods always return true
. In the PublishToEventBus
method, the Event
(as byte array
) is added to a buffered channel
, which can hold up to 100 events simultaneously. This allows for efficient handling and queuing of events without blocking the main execution flow. On the other hand, in the SubscribeToEventBus
method, the buffered channel is simply returned. The Facade utilizes this channel to retrieve the next Event
(as byte array
) in a separate Goroutine
to process it.
The Facade
interface offers the method to retrieve the internal EventBus
:
type Facade interface {
...
GetEventBus() EventBus
...
}
EventBusWithForeignEvents
The EventBusWithForeignEvents
is an additional EventBus
used by the Broker
to collect events from foreign services that are not known to the Facade
. Currently, these events are ignored, but having this additional bus allows us to precisely distinguish between events that we can deserialize and events that are foreign to the Facade, requiring manual deserialization in user code.
For example, if we receive an event indicating that a specific user has logged out, our application could clear all cache entries related to that user, even if we don't have any other information (or don't need it) about that user.
By using the EventBusWithForeignEvents
, the framework enhances flexibility and allows users to handle events from external sources in a controlled and deliberate manner. This separation between known and foreign events ensures that the application can make informed decisions on how to handle different event types, promoting adaptability and enabling seamless integration with external services.
Broker
The Broker
is a crucial component for exchanging information between different applications and/or application instances. It enables an application to follow the CQRS principle by running with multiple read-only instances and obtaining events from a write-only instance. Additionally, read-only instances must forward commands to the write-only instance instead of processing them directly, but can still process queries independently.
The Broker
is defined as an interface as follows:
type Broker interface {
CommandBus
EventBus
}
The Facade
interface offers the method to set the Broker
:
type Facade interface {
...
SetBroker(Broker Broker) error
...
}
In case user wants to use the Broker
functionality, user must implement the Broker
interface as there is no standard implementation, unlike other components. Examples of Broker
implementations could include NATS, RabbitMQ, Kafka, and more.
In both DispatchCommand
and executeCommand
, the Facade
takes into account whether the Broker
is set or not.
If the Broker
is set in DispatchCommand
and CanPublishToCommandBus
returns true
, the Facade
uses the Broker to send the command instead of placing it on the internal CommandBus
. This approach is applied in read-only instances where the instances themselves should not process the commands.
Similarly, if the Broker
is set in executeCommand
and CanPublishToEventBus
returns true
, the Facade
additionally sends the Event
over the Broker to all listeners.
Handlers
Handlers are user-defined components that are responsible for processing commands, queries, and events. The Facade
manages all handlers internally and provides the capability to add new handlers dynamically.
EventHandler
EventHandlers play a crucial role in processing events. EventHandlers are user-defined components responsible for either reacting to events (Reactor
) or updating the internal model representation (ReadModel
). From the perspective of the Facade
, both are treated equally fullfilling the EventHandler
interface that requires users to implement specific functions. The two most crucial functions are GetEventDataList
and HandleEvent
. The GetEventDataList
function defines the events that the EventHandler can understand and process within the HandleEvent
method. The EventHandler interface is defined as follows:
type EventHandler interface {
GetEventDataList() []EventData
HandleEvent(evt Event) error
}
As soon as a new Event
arrives in the facade, the HandleEvent
method of the corresponding EventHandler
is called. The HandleEvent
method is passed the Event
as a parameter, which can then be processed accordingly. The GetEventDataList
method returns a list of EventData
structs, which define the events that the EventHandler
can process. The EventData
interface is defined as follows:
type EventData interface {
ApplyEventData(_agg Aggregate) error
}
The Event
itself contains meta information about the containing EventData
. The Event
interface also contains helper methods for serializing and deserializing the EventData
into a byte array
. The Event
interface itself is defined as follows:
type Event interface {
GetUuid() string
SetUuid(uuid string) error
GetCommandUuid() string
SetCommandUuid(commandUuid string) error
GetTenantUuid() string
SetTenantUuid(tenantUuid string) error
GetAggregateType() string
SetAggregateType(aggregateType string) error
GetAggregateUuid() string
SetAggregateUuid(aggregateUuid string) error
GetVersion() int64
SetVersion(version int64) error
GetCreatedAt() int64
SetCreatedAt(createdAt int64) error
GetDataType() string
SetDataType(_type string) error
// underlying event data - string representation
GetDataContent() string
SetDataContent(data string) error
// underlying event data - runtime representation
GetData() EventData
SetData(data EventData) error
// underlying event data - helper methods
SerializeData() ([]byte, error)
DeserializeData(data []byte, dest EventData) error
}
Internally, the Facade
associates an new Event
with the Command
using SetCommandUuid
. Additionally, the Event
automatically inherits the TenantUuid
from the Command
through SetTenantUuid
, facilitated by the Facade. The TenantUuid
is only required for optimization purposes in scenarios involving multi-tenancy. In multi-tenant applications, the TenantUuid
helps segregate and distinguish data belonging to different tenants or customers.
When a EventHandler is added to the Facade, the EventDataProvider
automatically registers all the available EventData
present in the EventHandler. This registration process enables the EventDataProvider to keep track of and manage the various EventData types that the Facade
can handle. Adding EventHandlers to the Facade is straightforward and can be done as follows:
myEventHandler := NewSimpleEventHandler()
err := facade.AddEventHandler(myEventHandler)
CommandHandler
CommandHandlers are user-defined components responsible for executing custom commands. They play a crucial role in processing commands and implementing the cross-aggregate business logic associated with them. The CommandHandler interface is defined as follows:
type CommandHandler interface {
GetCommandDataList() []CommandData
HandleCommand(cmd Command) ([]Event, error)
}
The GetCommandDataList
function defines the commands that the CommandHandler can understand and process within the HandleCommand
method. The HandleCommand
method is passed the Command
as a parameter, which can then be processed accordingly. The Command
itself contains meta information about the containing CommandData
. The Command
interface also contains helper methods for serializing and deserializing the CommandData
into a byte array
. The Command
interface itself is defined as follows:
type Command interface {
GetUuid() string
SetUuid(uuid string) error
GetTenantUuid() string
SetTenantUuid(tenantUuid string) error
GetMetaData(key string) (interface{}, error)
GetMetaDataMap() map[string]interface{}
SetMetaData(key string, value interface{}) error
GetVersion() int64
SetVersion(version int64) error
GetCreatedAt() int64
SetCreatedAt(createdAt int64) error
GetDataType() string
SetDataType(_type string) error
// underlying command data - string representation
GetDataContent() string
SetDataContent(data string) error
// underlying command data - runtime representation
GetData() CommandData
SetData(cmdData CommandData) error
// underlying command data - helper methods
SerializeData() ([]byte, error)
DeserializeData(data []byte, destCmdData CommandData) error
}
Unlike events, commands provide the flexibility to carry additional metadata. This feature is especially useful for purposes such as authorization and other specific requirements. Metadata can be persisted in a Command
as key-value pairs, allowing for the inclusion of relevant information associated with the command. By incorporating metadata within commands, the application gains the ability to pass context-specific details alongside the command, enhancing its functionality and enabling better control over command execution and processing.
As with events, the actual content of a command is stored in the CommandData
, which has the following interface:
type CommandData interface {
HandleCommandData(cmdHandler CommandHandler, cmd Command) ([]Event, error)
}
As seen in the interface definition, a command can generate multiple events. The Facade automatically writes these events to the EventBus
or sends them to other services using the Broker
. This process ensures that all relevant events associated with the command are appropriately processed and dispatched.
When a CommandHandler is added to the Facade, the CommandDataProvider
automatically registers all the available CommandData
present in the CommandHandler. This registration process enables the CommandDataProvider to keep track of and manage the various CommandData types that the Facade
can handle. Adding CommandHandlers to the Facade is straightforward and can be done as follows:
myCommandHandler := NewSimpleCommandHandler()
err := facade.AddCommandHandler(myCommandHandler)
QueryHandler
QueryHandlers are user-defined components responsible for executing custom queries. They play a crucial role in processing queries. The QueryHandler interface is defined as follows:
type QueryHandler interface {
GetQueryDataList() []QueryData
HandleQuery(qry Query) (*Response, error)
}
As soon as a new Query
arrives in the facade, the HandleQuery
method of the corresponding QueryHandler
is called. The HandleQuery
method is passed the Query
as a parameter, which can then be processed accordingly. The GetQueryDataList
method returns a list of QueryData
structs, which define the queries that the QueryHandler
can process. The QueryData
interface is defined as follows:
type QueryData interface {
HandleQueryData(qryHandler QueryHandler, qry Query) (*Response, error)
}
The Query
itself contains meta information about the containing QueryData
. The Query
interface also contains helper methods for serializing and deserializing the QueryData
into a byte array
. The Query
interface itself is defined as follows:
type Query interface {
GetUuid() string
SetUuid(uuid string) error
GetTenantUuid() string
SetTenantUuid(tenantUuid string) error
GetMetaData(key string) (interface{}, error)
GetMetaDataMap() map[string]interface{}
SetMetaData(key string, value interface{}) error
GetVersion() int64
SetVersion(version int64) error
GetCreatedAt() int64
SetCreatedAt(createdAt int64) error
GetDataType() string
SetDataType(_type string) error
// underlying query data - string representation
GetDataContent() string
SetDataContent(data string) error
// underlying query data - runtime representation
GetData() QueryData
SetData(qryData QueryData) error
// underlying query data - helper methods
SerializeData() ([]byte, error)
DeserializeData(data []byte, destQryData QueryData) error
}
Similarly to commands, queries also offer the option to store metadata
. This metadata can be used for authorization and other purposes, just like with commands. However, unlike commands, the TenantUuid
information is not automatically set for queries since queries are not stored in the event sourcing database. Although queries are not stored, there are scenarios where applications may require tenant information in their query handlers. Therefore, it is possible to manually set this information in queries if needed.
When a QueryHandler is added to the Facade, the QueryDataProvider
automatically registers all the available QueryData
present in the QueryHandler. This registration process enables the QueryDataProvider to keep track of and manage the various QueryData types that the Facade
can handle. Adding QueryHandlers to the Facade is straightforward and can be done as follows:
myQueryHandler := NewSimpleQueryHandler()
err := facade.AddQueryHandler(myQueryHandler)
Domain
In larger projects, users often want to organize individual aggregates and their handlers into packages. In this framework, these packages are referred to as domain
. The domain is an interface that can be implemented by the user. Domains serve as a way to encapsulate and group all the essential aspects for the Facade
, allowing users to register the Facade with a single command.
The domain interface is defined as follows:
type Domain interface {
GetEventHandlerList() []EventHandler
GetCommandHandlerList() []CommandHandler
GetQueryHandlerList() []QueryHandler
}
After a domain is implemented, it can be added to the Facade, and the Facade takes care of the individual handlers. The Facade is not concerned with anything beyond this, as it focuses solely on handling the registered domains and their associated handlers. The Facade
interface offers the method to add a Domain
:
type Facade interface {
...
AddDomain(domain Domain) error
...
}
Helper functions
The framework provides several helper functions that are useful for handling events, aggregates, and commands. The following functions are available:
Common
func NewUuid() string
func IsValidUuid(uuid string) bool
func StringInSlice(el string, list []string) bool
func GetTypeName(cls interface{}) string
func CanHandleEvent(evtHandler EventHandler, evt Event) bool
func CanHandleCommand(cmdHandler CommandHandler, cmd Command) bool
func CanHandleQuery(qryHandler QueryHandler, qry Query) bool
func SerializeResponse(res *Response) ([]byte, error)
func DeserializeResponse(resBytes []byte, res *Response) error
Aggregate
func NewBaseAggregate() BaseAggregate
func ApplyEvent(agg Aggregate, evt Event, commit bool) error
Event
func NewBaseEvent() *BaseEvent
func NewEvent(aggregateType, aggregateUuid string, version int64, evtData EventData) Event
func NewEventFromAggregate(agg Aggregate, evtData EventData) Event
func SerializeEvent(evt Event) ([]byte, error)
func DeserializeEvent(evtBytes []byte, evt Event) error
Command
func NewBaseCommand() *BaseCommand
func NewCommand(cmdData CommandData) Command
func NewCommandFn(cmdData CommandData, fns ...func(Command) Command) Command
func SerializeCommand(cmd Command) ([]byte, error)
func DeserializeCommand(cmdBytes []byte, cmd Command) error
Query
func NewBaseQuery() *BaseQuery
func NewQuery(qryData QueryData) Query
func NewQueryFn(qryData QueryData, fns ...func(Query) Query) Query
func SerializeQuery(qry Query) ([]byte, error)
func DeserializeQuery(qryBytes []byte, qry Query) error