Skip to content

Events & EventHandler

When implementing a new EventHandler, users often want to store the results of event processing within the EventHandler itself and make them available to the read side (QueryHandler) or simply respond to specific events (Reactor).

From a technical standpoint, an EventHandler is an interface that implements the methods HandleEvent and CanHandleEvent. The CanHandleEvent method indicates whether the EventHandler can process the event, while the HandleEvent method processes the event itself.

Here, we are presenting the best practices for implementing an EventHandler that handles both event processing and result storage. Just like with Command, Query, and Aggregate, we create a new package folder named readmodel and create the file domain/simple/readmodel/readmodel.go within it.

go
// domain/simple/readmodel/readmodel.go
package readmodel

import "github.com/gradientzero/es-cqrs-base/base"

const (
	STATE_INITIALIZE     string = "STATE_INITIALIZE"
	STATE_READY          string = "STATE_READY"
)

type ReadModel interface {
	base.EventHandler
	base.OnEventHandlerRegistered
	GetModelByAggregateUuid(aggregateUuid string) (*Model, error)
	GetModelList() ([]*Model, error)
}

type readModel struct {
	EventRepository base.EventRepository
	ReadModelStore  ReadModelStore
	CatchedEvents   []base.Event
	State           string
}

// Make sure it implements interface
var _ ReadModel = (*readModel)(nil)

func NewReadModel(
	EventRepository base.EventRepository,
	ReadModelStore ReadModelStore,
) ReadModel {
	rm := &readModel{}
	rm.EventRepository = EventRepository
	rm.ReadModelStore = ReadModelStore
	rm.CatchedEvents = make([]base.Event, 0)
	rm.State = STATE_INITIALIZE
	return rm
}

// fullfilling ReadModel interface
func (rm *readModel) GetModelByAggregateUuid(aggregateUuid string) (*Model, error) {
	model, err := rm.ReadModelStore.GetModelByAggregateUuid(aggregateUuid)
	if err != nil {
		return nil, err
	}
	return model, nil
}

func (rm *readModel) GetModelList() ([]*Model, error) {
	modelList, err := rm.ReadModelStore.GetModelList()
	if err != nil {
		return nil, err
	}
	return modelList, nil
}

// fullfilling base.OnEventHandlerRegistered interface
func (rm *readModel) OnEventHandlerRegistered(enforceStrongConsistency bool) {
	if enforceStrongConsistency {
		if err := rm.restoreReadModel(); err != nil {
			log.Error(err)
		}
	} else {
		go rm.restoreReadModel()
	}
}

func (rm *readModel) restoreReadModel() error {
	// request new events unknown to this read model
	unixTime := rm.ReadModelStore.GetLastEventTimestamp()

	// get events from event store
	evts, err := rm.EventRepository.ListEvents(
		base.EventRepositoryListOptionAfter(unixTime),
		base.EventRepositoryListOptionAscending(true),
		base.EventRepositoryListOptionLimit(int64(2147483647)),
	)
	if err != nil {
		return err
	}

	// add catched events (events catched while restoring from event store)
	evts = append(evts, rm.CatchedEvents...)

	// change state to ready
	rm.State = STATE_READY

	// check any event is relevant
	for _, evt := range evts {
		if base.CanHandleEvent(rm, evt) {
			err = rm.HandleEvent(evt)
			if err != nil {
				return err
			}
		}
	}
	return nil
}

This code defines several aspects concurrently. Firstly, the custom interface ReadModel is defined, which includes the methods GetModelByAggregateUuid and GetModelList.

go
type ReadModel interface {
	base.EventHandler
	base.OnEventHandlerRegistered
	GetModelByAggregateUuid(aggregateUuid string) (*Model, error)
	GetModelList() ([]*Model, error)
}

These methods can later be utilized by the QueryHandler to retrieve query results. Additionally, the OnEventHandlerRegistered interface is implemented, containing the method OnEventHandlerRegistered. This method is invoked when the EventHandler - which is this instance - is successfully registered in the Facade. This callback also carries the information enforceStrongConsistency, which indicates to this instance whether the Facade is configured with Strong Consistency or Eventual Consistency. Strong Consistency is typically necessary only during testing, ensuring that events are processed immediately and not in a separate Goroutine, as is the case under normal circumstances.

The user-defined NewReadModel method serves as the constructor for our readModel, which implements our custom ReadModel interface. This method accepts the EventRepository and the ReadModelStore as parameters. Depending on the requirements of our EventHandler, additional parameters can also be provided.

go
type readModel struct {
	EventRepository base.EventRepository
	ReadModelStore  ReadModelStore
	CatchedEvents   []base.Event
	State           string
}

func NewReadModel(
	EventRepository base.EventRepository,
	ReadModelStore ReadModelStore,
) ReadModel {
	rm := &readModel{}
	rm.EventRepository = EventRepository
	rm.ReadModelStore = ReadModelStore
	rm.CatchedEvents = make([]base.Event, 0)
	rm.State = STATE_INITIALIZE
	return rm
}

Our readModel fulfills the ReadModel interface by implementing GetModelByAggregateUuid and GetModelList. These methods utilize the ReadModelStore to provide the already denormalized models from the underlying store. Here, all the methods that are necessary for the application should be defined and implemented.

Our readModel utilizes a dedicated ReadModelStore, which is defined as an interface. The ReadModelStore is used to store the results (here of type Model) of event processing. Typically, this Store is instantiated within the domain and subsequently passed to the readModel. This approach offers flexibility in terms of the Store implementation, enabling easy substitution. For instance, it allows us to seamlessly switch from managing our models in-memory to storing it in a database, without necessitating modifications to the readModel. The user could directly implement the underlying model store without an interface, but this has the drawback that later QueryHandlers would have visibility into the specific store implementation rather than just the interface.

The EventRepository is used to retrieve events from the EventStore and can be taken from the Facade.

The CatchedEvents is a slice of events that are captured during the restoration of the proprietary model. This mechanism helps maintain the integrity of event processing even in scenarios involving concurrent event capture and restoration processes.

The State is used to indicate whether this EventHandler is ready to process new events. The EventHandler is only ready to process events once the OnEventHandlerRegistered method is processed.

go
// fullfilling base.OnEventHandlerRegistered interface
func (rm *readModel) OnEventHandlerRegistered(enforceStrongConsistency bool) {
	if enforceStrongConsistency {
		if err := rm.restoreReadModel(); err != nil {
			log.Error(err)
		}
	} else {
		go rm.restoreReadModel()
	}
}

func (rm *readModel) restoreReadModel() error {
	// request new events unknown to this read model
	unixTime := rm.ReadModelStore.GetLastEventTimestamp()

	// get events from event store
	evts, err := rm.EventRepository.ListEvents(
		base.EventRepositoryListOptionAfter(unixTime),
		base.EventRepositoryListOptionAscending(true),
		base.EventRepositoryListOptionLimit(int64(2147483647)),
	)
	if err != nil {
		return err
	}

	// add catched events (events catched while restoring from event store)
	evts = append(evts, rm.CatchedEvents...)

	// change state to ready
	rm.State = STATE_READY

	// check any event is relevant
	for _, evt := range evts {
		if base.CanHandleEvent(rm, evt) {
			err = rm.HandleEvent(evt)
			if err != nil {
				return err
			}
		}
	}
	return nil
}

The ReadModelStore is an interface that is implemented by the user and is responsible for storing the results of event processing. The ReadModelStore is typically implemented as a database, but can also be implemented as a simple in-memory store. By convention ReadModelStore and the underlying implementation of this Store is defined in seperate files domain/simple/readmodel/store.go for the interface and domain/simple/readmodel/store.memory.go or domain/simple/readmodel/store.postgres.go.

go:
// domain/simple/readmodel/store.go
package readmodel

type ReadModelStore interface {
	SetLastEventTimestamp(id int64) error
	GetLastEventTimestamp() int64
	SetModel(model *Model) error
	GetModelByAggregateUuid(aggregateUuid string) (*Model, error)
	GetModelList() ([]*Model, error)
	RemoveModel(model *Model) error
}

Here's an example of an in-memory implementation of this ReadModelStore:

go
// domain/simple/readmodel/store.memory.go
package readmodel

const LAST_EVENT_TIMESTAMP string = "last_event_timestamp"

type readModelStoreMemory struct {
	Meta                 map[string]int64
	ItemsByAggregateUuid map[string]*Model
}

// Make sure it implements interface
var _ ReadModelStore = (*readModelStoreMemory)(nil)

func NewReadModelStoreMemory() ReadModelStore {
	rms := &readModelStoreMemory{
		Meta:                 make(map[string]int64, 0),
		ItemsByAggregateUuid: make(map[string]*Model, 0),
	}
	rms.Meta[LAST_EVENT_TIMESTAMP] = 0
	return rms
}

func (rms *readModelStoreMemory) SetLastEventTimestamp(unixTime int64) error {
	if rms.GetLastEventTimestamp() < unixTime {
		rms.Meta[LAST_EVENT_TIMESTAMP] = unixTime
	}
	return nil
}

func (rms *readModelStoreMemory) GetLastEventTimestamp() int64 {
	if val, found := rms.Meta[LAST_EVENT_TIMESTAMP]; found {
		return val
	}
	return 0
}

func (rms *readModelStoreMemory) SetModel(model *Model) error {
	rms.ItemsByAggregateUuid[model.AggregateUuid] = model
	return nil
}

func (rms *readModelStoreMemory) GetModelByAggregateUuid(aggregateUuid string) (*Model, error) {
	if val, found := rms.ItemsByAggregateUuid[aggregateUuid]; found {
		return val, nil
	}
	return nil, nil
}

func (rms *readModelStoreMemory) GetModelList() ([]*Model, error) {
	var modelList []*Model
	for _, val := range rms.ItemsByAggregateUuid {
		modelList = append(modelList, val)
	}
	return modelList, nil
}

func (rms *readModelStoreMemory) RemoveModel(model *Model) error {
	if val, found := rms.ItemsByAggregateUuid[model.AggregateUuid]; found {
		delete(rms.ItemsByAggregateUuid, val.AggregateUuid)
	}
	return nil
}

In general, the ReadModelStore typically provides a way to retrieve and set the last known timestamp. This timestamp is used to fetch only new events that have not yet been processed in this particular ReadModel. In addition, ReadModelStore manages its own model, here of type Model. The definition of this model can be found in the file domain/simple/readmodel/model.go.

go
// domain/simple/readmodel/model.go
package readmodel

type Model struct {
	AggregateUuid string `json:"aggregateUuid,omitempty"`
	Whatever      string `json:"whatever,omitempty"`
	CreatedAt     int64  `json:"createdAt,omitempty"`
	UpdatedAt     int64  `json:"updatedAt,omitempty"`
}

Finally, we proceed to implement the EventHandler interface itself. Following convention, this is typically done in a separate file domain/simple/readmodel/readmodel.event.go:

go
// domain/simple/readmodel/readmodel.event.go
package readmodel

import (
	"my/app/domain/simple/aggregate"
	"github.com/gradientzero/es-cqrs-base/base"
)

// fullfilling base.EventHandler interface
func (rm *readModel) GetEventDataList() []base.EventData {
	return []base.EventData{
		&aggregate.DidSomethingEvent{},
		// ... &aggregate.DidSomethingElseEvent{},
	}
}

func (rm *readModel) HandleEvent(evt base.Event) error {
	// catch events to process once ready
	if rm.State == STATE_INITIALIZE {
		rm.CatchedEvents = append(rm.CatchedEvents, evt)
		return nil
	}

	switch evtData := evt.GetData().(type) {
	case *aggregate.DidSomethingEvent:
		model, _ := rm.ReadModelStore.GetModelByAggregateUuid(evtData.AggregateUuid)
		if model == nil {
			model = &Model{AggregateUuid: evtData.AggregateUuid}
		}
		model.Whatever = evtData.Name
		model.CreatedAt = evt.GetCreatedAt()
		rm.ReadModelStore.SetModel(model)
	}

	// update last event id
	if err := rm.ReadModelStore.SetLastEventTimestamp(evt.GetCreatedAt()); err != nil {
		panic(err)
	}

	return nil
}

Our EventHandler is now complete and can be utilized by the Facade. This example is relatively simple, but the true strength of read models becomes evident when dealing with complex models. Read models prepare data for the QueryHandlers and can represent complex structures in a denormalized form, thereby enhancing query performance and efficiency.