Events & EventHandler
When implementing a new EventHandler
, users often want to store the results of event processing within the EventHandler itself and make them available to the read side (QueryHandler) or simply respond to specific events (Reactor).
From a technical standpoint, an EventHandler is an interface that implements the methods HandleEvent
and CanHandleEvent
. The CanHandleEvent method indicates whether the EventHandler can process the event, while the HandleEvent method processes the event itself.
Here, we are presenting the best practices for implementing an EventHandler that handles both event processing and result storage. Just like with Command, Query, and Aggregate, we create a new package folder named readmodel
and create the file domain/simple/readmodel/readmodel.go
within it.
// domain/simple/readmodel/readmodel.go
package readmodel
import "github.com/gradientzero/es-cqrs-base/base"
const (
STATE_INITIALIZE string = "STATE_INITIALIZE"
STATE_READY string = "STATE_READY"
)
type ReadModel interface {
base.EventHandler
base.OnEventHandlerRegistered
GetModelByAggregateUuid(aggregateUuid string) (*Model, error)
GetModelList() ([]*Model, error)
}
type readModel struct {
EventRepository base.EventRepository
ReadModelStore ReadModelStore
CatchedEvents []base.Event
State string
}
// Make sure it implements interface
var _ ReadModel = (*readModel)(nil)
func NewReadModel(
EventRepository base.EventRepository,
ReadModelStore ReadModelStore,
) ReadModel {
rm := &readModel{}
rm.EventRepository = EventRepository
rm.ReadModelStore = ReadModelStore
rm.CatchedEvents = make([]base.Event, 0)
rm.State = STATE_INITIALIZE
return rm
}
// fullfilling ReadModel interface
func (rm *readModel) GetModelByAggregateUuid(aggregateUuid string) (*Model, error) {
model, err := rm.ReadModelStore.GetModelByAggregateUuid(aggregateUuid)
if err != nil {
return nil, err
}
return model, nil
}
func (rm *readModel) GetModelList() ([]*Model, error) {
modelList, err := rm.ReadModelStore.GetModelList()
if err != nil {
return nil, err
}
return modelList, nil
}
// fullfilling base.OnEventHandlerRegistered interface
func (rm *readModel) OnEventHandlerRegistered(enforceStrongConsistency bool) {
if enforceStrongConsistency {
if err := rm.restoreReadModel(); err != nil {
log.Error(err)
}
} else {
go rm.restoreReadModel()
}
}
func (rm *readModel) restoreReadModel() error {
// request new events unknown to this read model
unixTime := rm.ReadModelStore.GetLastEventTimestamp()
// get events from event store
evts, err := rm.EventRepository.ListEvents(
base.EventRepositoryListOptionAfter(unixTime),
base.EventRepositoryListOptionAscending(true),
base.EventRepositoryListOptionLimit(int64(2147483647)),
)
if err != nil {
return err
}
// add catched events (events catched while restoring from event store)
evts = append(evts, rm.CatchedEvents...)
// change state to ready
rm.State = STATE_READY
// check any event is relevant
for _, evt := range evts {
if base.CanHandleEvent(rm, evt) {
err = rm.HandleEvent(evt)
if err != nil {
return err
}
}
}
return nil
}
This code defines several aspects concurrently. Firstly, the custom interface ReadModel
is defined, which includes the methods GetModelByAggregateUuid
and GetModelList
.
type ReadModel interface {
base.EventHandler
base.OnEventHandlerRegistered
GetModelByAggregateUuid(aggregateUuid string) (*Model, error)
GetModelList() ([]*Model, error)
}
These methods can later be utilized by the QueryHandler
to retrieve query results. Additionally, the OnEventHandlerRegistered
interface is implemented, containing the method OnEventHandlerRegistered
. This method is invoked when the EventHandler
- which is this instance - is successfully registered in the Facade
. This callback also carries the information enforceStrongConsistency
, which indicates to this instance whether the Facade is configured with Strong Consistency
or Eventual Consistency
. Strong Consistency is typically necessary only during testing, ensuring that events are processed immediately and not in a separate Goroutine
, as is the case under normal circumstances.
The user-defined NewReadModel
method serves as the constructor for our readModel
, which implements our custom ReadModel
interface. This method accepts the EventRepository
and the ReadModelStore
as parameters. Depending on the requirements of our EventHandler, additional parameters can also be provided.
type readModel struct {
EventRepository base.EventRepository
ReadModelStore ReadModelStore
CatchedEvents []base.Event
State string
}
func NewReadModel(
EventRepository base.EventRepository,
ReadModelStore ReadModelStore,
) ReadModel {
rm := &readModel{}
rm.EventRepository = EventRepository
rm.ReadModelStore = ReadModelStore
rm.CatchedEvents = make([]base.Event, 0)
rm.State = STATE_INITIALIZE
return rm
}
Our readModel
fulfills the ReadModel
interface by implementing GetModelByAggregateUuid
and GetModelList
. These methods utilize the ReadModelStore
to provide the already denormalized models from the underlying store. Here, all the methods that are necessary for the application should be defined and implemented.
Our readModel
utilizes a dedicated ReadModelStore
, which is defined as an interface
. The ReadModelStore
is used to store the results (here of type Model
) of event processing. Typically, this Store is instantiated within the domain and subsequently passed to the readModel. This approach offers flexibility in terms of the Store implementation, enabling easy substitution. For instance, it allows us to seamlessly switch from managing our models in-memory to storing it in a database, without necessitating modifications to the readModel
. The user could directly implement the underlying model store without an interface, but this has the drawback that later QueryHandlers would have visibility into the specific store implementation rather than just the interface.
The EventRepository
is used to retrieve events from the EventStore
and can be taken from the Facade
.
The CatchedEvents
is a slice of events that are captured during the restoration of the proprietary model. This mechanism helps maintain the integrity of event processing even in scenarios involving concurrent event capture and restoration processes.
The State
is used to indicate whether this EventHandler is ready to process new events. The EventHandler is only ready to process events once the OnEventHandlerRegistered
method is processed.
// fullfilling base.OnEventHandlerRegistered interface
func (rm *readModel) OnEventHandlerRegistered(enforceStrongConsistency bool) {
if enforceStrongConsistency {
if err := rm.restoreReadModel(); err != nil {
log.Error(err)
}
} else {
go rm.restoreReadModel()
}
}
func (rm *readModel) restoreReadModel() error {
// request new events unknown to this read model
unixTime := rm.ReadModelStore.GetLastEventTimestamp()
// get events from event store
evts, err := rm.EventRepository.ListEvents(
base.EventRepositoryListOptionAfter(unixTime),
base.EventRepositoryListOptionAscending(true),
base.EventRepositoryListOptionLimit(int64(2147483647)),
)
if err != nil {
return err
}
// add catched events (events catched while restoring from event store)
evts = append(evts, rm.CatchedEvents...)
// change state to ready
rm.State = STATE_READY
// check any event is relevant
for _, evt := range evts {
if base.CanHandleEvent(rm, evt) {
err = rm.HandleEvent(evt)
if err != nil {
return err
}
}
}
return nil
}
The ReadModelStore
is an interface that is implemented by the user and is responsible for storing the results of event processing. The ReadModelStore
is typically implemented as a database, but can also be implemented as a simple in-memory store. By convention ReadModelStore
and the underlying implementation of this Store is defined in seperate files domain/simple/readmodel/store.go
for the interface and domain/simple/readmodel/store.memory.go
or domain/simple/readmodel/store.postgres.go
.
// domain/simple/readmodel/store.go
package readmodel
type ReadModelStore interface {
SetLastEventTimestamp(id int64) error
GetLastEventTimestamp() int64
SetModel(model *Model) error
GetModelByAggregateUuid(aggregateUuid string) (*Model, error)
GetModelList() ([]*Model, error)
RemoveModel(model *Model) error
}
Here's an example of an in-memory implementation of this ReadModelStore
:
// domain/simple/readmodel/store.memory.go
package readmodel
const LAST_EVENT_TIMESTAMP string = "last_event_timestamp"
type readModelStoreMemory struct {
Meta map[string]int64
ItemsByAggregateUuid map[string]*Model
}
// Make sure it implements interface
var _ ReadModelStore = (*readModelStoreMemory)(nil)
func NewReadModelStoreMemory() ReadModelStore {
rms := &readModelStoreMemory{
Meta: make(map[string]int64, 0),
ItemsByAggregateUuid: make(map[string]*Model, 0),
}
rms.Meta[LAST_EVENT_TIMESTAMP] = 0
return rms
}
func (rms *readModelStoreMemory) SetLastEventTimestamp(unixTime int64) error {
if rms.GetLastEventTimestamp() < unixTime {
rms.Meta[LAST_EVENT_TIMESTAMP] = unixTime
}
return nil
}
func (rms *readModelStoreMemory) GetLastEventTimestamp() int64 {
if val, found := rms.Meta[LAST_EVENT_TIMESTAMP]; found {
return val
}
return 0
}
func (rms *readModelStoreMemory) SetModel(model *Model) error {
rms.ItemsByAggregateUuid[model.AggregateUuid] = model
return nil
}
func (rms *readModelStoreMemory) GetModelByAggregateUuid(aggregateUuid string) (*Model, error) {
if val, found := rms.ItemsByAggregateUuid[aggregateUuid]; found {
return val, nil
}
return nil, nil
}
func (rms *readModelStoreMemory) GetModelList() ([]*Model, error) {
var modelList []*Model
for _, val := range rms.ItemsByAggregateUuid {
modelList = append(modelList, val)
}
return modelList, nil
}
func (rms *readModelStoreMemory) RemoveModel(model *Model) error {
if val, found := rms.ItemsByAggregateUuid[model.AggregateUuid]; found {
delete(rms.ItemsByAggregateUuid, val.AggregateUuid)
}
return nil
}
In general, the ReadModelStore
typically provides a way to retrieve and set the last known timestamp. This timestamp is used to fetch only new events that have not yet been processed in this particular ReadModel. In addition, ReadModelStore
manages its own model, here of type Model
. The definition of this model can be found in the file domain/simple/readmodel/model.go
.
// domain/simple/readmodel/model.go
package readmodel
type Model struct {
AggregateUuid string `json:"aggregateUuid,omitempty"`
Whatever string `json:"whatever,omitempty"`
CreatedAt int64 `json:"createdAt,omitempty"`
UpdatedAt int64 `json:"updatedAt,omitempty"`
}
Finally, we proceed to implement the EventHandler
interface itself. Following convention, this is typically done in a separate file domain/simple/readmodel/readmodel.event.go
:
// domain/simple/readmodel/readmodel.event.go
package readmodel
import (
"my/app/domain/simple/aggregate"
"github.com/gradientzero/es-cqrs-base/base"
)
// fullfilling base.EventHandler interface
func (rm *readModel) GetEventDataList() []base.EventData {
return []base.EventData{
&aggregate.DidSomethingEvent{},
// ... &aggregate.DidSomethingElseEvent{},
}
}
func (rm *readModel) HandleEvent(evt base.Event) error {
// catch events to process once ready
if rm.State == STATE_INITIALIZE {
rm.CatchedEvents = append(rm.CatchedEvents, evt)
return nil
}
switch evtData := evt.GetData().(type) {
case *aggregate.DidSomethingEvent:
model, _ := rm.ReadModelStore.GetModelByAggregateUuid(evtData.AggregateUuid)
if model == nil {
model = &Model{AggregateUuid: evtData.AggregateUuid}
}
model.Whatever = evtData.Name
model.CreatedAt = evt.GetCreatedAt()
rm.ReadModelStore.SetModel(model)
}
// update last event id
if err := rm.ReadModelStore.SetLastEventTimestamp(evt.GetCreatedAt()); err != nil {
panic(err)
}
return nil
}
Our EventHandler is now complete and can be utilized by the Facade. This example is relatively simple, but the true strength of read models becomes evident when dealing with complex models. Read models prepare data for the QueryHandlers and can represent complex structures in a denormalized form, thereby enhancing query performance and efficiency.