Aggregate
The concept of Aggregates
(or Aggregate Roots) originates from Domain-Driven Design (DDD)
and is fundamental in modeling domain entities within complex systems. An Aggregate represents a cluster of related objects that are treated as a single unit, with the Aggregate (Root) serving as the entry point for managing the internal state of the group. The Aggregate establishes a boundary that ensures consistency within this group, meaning that any changes to its internal entities must go through the Aggregate (Root), maintaining the integrity of the domain's business rules.
In Event Sourcing
, the Aggregate (Root) is the primary entity retrieved from the AggregateRepository
. This repository relies on the EventRepository
, which in turn interacts with the underlying EventStore
. The Aggregate Root is responsible for processing actions (often referred to as intentions) and generating the corresponding Events
. Additionally, it applies these Events
to itself in order to update its state. By replaying past events, the Aggregate (Root) reconstructs its current state without relying on traditional persistence mechanisms, ensuring that its behavior remains consistent with the event stream.
Aggregate interface
In comby an Aggregate
(Root) is defined as an interface
as follows:
type Aggregate interface {
GetTenantUuid() string
SetTenantUuid(tenantUuid string)
GetDomain() string
GetAggregateUuid() string
SetAggregateUuid(aggregateUuid string)
GetVersion() int64
GetUncommittedEvents() []event.Event
ClearUncommittedEvents()
IsDeleted() bool
ApplyEvent(concreteAggregate Aggregate, evt event.Event, commit bool) error
OnEventData(evtData event.EventData) error
}
The Aggregate
interface defines the core contract for domain aggregates in an Event Sourcing system. It provides methods for managing multi-tenancy (TenantUuid), the aggregate's unique identifier (AggregateUuid), handling versioning, and accessing uncommitted events.
The interface also allows for applying and processing events through ApplyEvent
and OnEventData
, enabling the aggregate to evolve its state. Additionally, it includes a method to check if the aggregate is marked as deleted (IsDeleted
), ensuring proper state management and event consistency.
Base Aggregate
The framework includes a default implementation of the Aggregate
interface, called BaseAggregate
, which fully implements the required methods. Developers can embed BaseAggregate
within their own custom Aggregate structs to inherit its functionality. This approach allows BaseAggregate to handle essential Event Sourcing tasks automatically, such as versioning, event management, and other core methods, reducing the need for boilerplate code and simplifying the development of domain aggregates.
type BaseAggregate struct {
TenantUuid string
AggregateUuid string
Domain string
Version int64
Deleted bool
Changes []event.Event
}
The most important feature of BaseAggregate
is its pre-implemented ApplyEvent
method. If a developer chooses not to use BaseAggregate
, they must manually handle key responsibilities such as versioning, committing events, and invoking the appropriate EventData
logic. In practice, all user-defined aggregates embed BaseAggregate to automatically manage these tasks, ensuring consistency and reducing the complexity of implementing Event Sourcing manually.
// ApplyEvent applies new event to this base aggregate and to concrete aggregate
// within this method we do not have access to the concrete aggregate
// if we want to apply the event data to the concrete aggregate
// we have to pass the concrete aggregate
func (agg *BaseAggregate) ApplyEvent(concreteAggregate Aggregate, evt event.Event, commit bool) error {
// Event must have the same Version
// Background: One or more events were generated using the same aggregate version.
// This happens easily in asynchronous environments and needs to be dealt with.
// Therefore only events are accepted that correspond to the current version of the aggregate.
// Even if we enforce event version in commands we have to deal with this possible conflict situation.
if agg.GetVersion() != evt.GetVersion() {
logger.Warn(
"Inconsistency detected",
"eventUuid", evt.GetUuid(),
"eventVersion", evt.GetVersion(),
"aggregateVersion", agg.GetVersion(),
"aggregateUuid", evt.GetAggregateUuid(),
"domain", evt.GetDomain(),
)
}
// Event is valid increase aggregate Version
agg.Version = agg.Version + 1
// Only apply new event to Changes
if commit {
agg.Changes = append(agg.Changes, evt)
}
// update domain specific aggregate information
evtData := evt.GetData()
if evtData != nil {
if err := concreteAggregate.OnEventData(evtData); err != nil {
return fmt.Errorf("%s could not apply change, err %s", concreteAggregate.GetDomain(), err)
}
} else {
return fmt.Errorf("%s could not apply change, unknown type %s", concreteAggregate.GetDomain(), evt.GetDataType())
}
return nil
}
Two key points are important to understand:
In Go, embedded instances cannot directly access the "parent" or enclosing instance. To work around this, we pass the "parent" instance (referred to as
concreteAggregate
) as a parameter.When processing an
Event
, the actual event data is stored within theEventData
. So when we talk about events likeMetricAdded
, we are actually talking aboutEventData
. TheEvent
itself is a construct by the framework. The user usually works withEventData
, which is automatically packed and unpacked intoEvents
. TheEvent
object provides additional contextual information like timestamps and versioning.This method also defines our entry point for processing events (
OnEventData
).
User-Defined Aggregate
To create a custom Aggregate, developers must define a struct that embeds BaseAggregate
which implements the Aggregate
interface. This struct represents the domain entity that will interact with the Event Sourcing system, encapsulating the data and business logic for a specific bounded context.
type MyAggregate struct {
// embedd BaseAggregate
base.BaseAggregate
// from here user fields:
// References
OtherAggregateUuid string
OtherEntityUid string
// Entities
MyEntity *MyEntity
// Value Objects
Name string
Value int
}
In Event Sourcing
, developers must follow certain conceptual rules to maintain consistency and adhere to best practices in Domain-Driven Design (DDD). Here MyAggregate
is seperated into References
, Entities
, and Value Objects
.
References
An Aggregate
can only maintain references to other aggregates, ensuring each aggregate remains the sole authority over its internal state. If an aggregate needs to interact with a specific entity within another aggregate, both unique identifiers — the aggregate's and the entity's — must be provided. This guarantees that the other Aggregate remains the single point of control for managing its internal state.
In the example above, OtherAggregateUuid
represents a reference to the external aggregate, while OtherEntityUid
refers to an entity within that aggregate. By convention, Uuid
is used for aggregates, and Uid
is used for entities to distinguish their roles clearly.
This also implies that MyAggregate
does not have direct access to the internal state of OtherAggregate
(or even OtherEntity
). Instead, all interactions must occur through a higher level of abstraction, such as Commands
, Queries
, or though listening to other's events.
A real-world example would be an Identity
that has a reference to an Account
. However, an Account
can have multiple Identities
, which is why Identity
and Account
are aggregates and not entities. Both aggregates can "exist" without the existence of the other.
Entities
Entities represent the core domain objects within an Aggregate, encapsulating the data and business logic for a specific entity type. In the example, MyEntity
is an entity that is part of MyAggregate
. However, entities can only exist within an aggregate; the user cannot address an entity without an aggregate.
A real-world example would be an Identity
aggregate with a related Profile
entity. In this case, the Profile
can only exist in conjunction with the Identity
— if the Identity
is deleted, the Profile
is automatically deleted as well. This strong lifecycle dependency is why Profile
is modeled as an entity rather than an aggregate. As an entity, Profile is directly managed within the boundary of the Identity aggregate, ensuring that its existence and state are tightly coupled to the Identity. Aggregates, by contrast, have independent lifecycles and are treated as separate, self-contained units within the domain.
Value Objects
Value Objects represent immutable, self-contained objects that encapsulate specific attributes or properties within an Aggregate. Unlike entities, Value Objects do not have a unique identity and are defined solely by their attributes. In the example, Name
and Value
are Value Objects that are part of MyAggregate
.
Intentions
An Aggregate
holds the responsibility of processing user intentions
and generating corresponding events. User intentions are implemented as receiver methods
within the Aggregate. When executing an intention, the current state of the aggregate should be checked against the intention.
If the intention is to update a specific value object, but there is no difference with the current aggregate's state, it does not make sense to generate a new Event. However, if there are differences, a new Event is generated using the helper function base.NewEventFromAggregate
. This function takes the aggregate instance and the intention's EventData
to create a new Event containing the corresponding EventData
for this particular aggregate in its current state.
Next, the intention applies the newly generated Event to the aggregate using another helper function base.ApplyEvent
. This function takes care of all necessary steps for Event Sourcing
, including versioning and tracking new events to be committed in the EventStore
.
Moreover, base.ApplyEvent
automatically executes the underlying ApplyEvent
of the embedded BaseAggregate
and triggers OnEventData
for the corresponding aggregate, as implemented by the user.
By convention, the intention
receiver method and the definition of EventData
are implemented in the same file. This design promotes clarity and consistency, making it easier for developers to comprehend and maintain the codebase. Here is one example intention LetItHappen
for MyAggregate
:
type MyAggregateHappendEvent struct {
MyAggregateUuid string
Name string
Value int
}
func (agg *MyAggregate) LetItHappen(name string, value int) error {
// business logic
if len(name) < 1 {
return fmt.Errorf("name is required")
}
if value <= 0 || value > 100 {
return fmt.Errorf("value must be zero or greater and less than 100")
}
// create new event based on the current aggregate
evt := base.NewEventFromAggregate(agg, &MyAggregateHappendEvent{
MyAggregateUuid: agg.GetAggregateUuid(),
Name: name,
Value: value,
})
// apply event on the current aggregate
return agg.ApplyEvent(agg, evt, true)
}
The new Event
was added to the aggregate
and will be persisted by the Facade in the EventStore
later. By convention EventData
names are in past tense
. In this example the intention is named LetItHappen
and the corresponding EventData is named MyAggregateHappendEvent
.
In addition, based on best practices, it is recommended to implement the aggregate and its intentions in a separate package
called aggregate
, which lives within a domain-specific package. For example, the MyAggregate
could be located in domain/my/aggregate/aggregate.go
. The intention LetItHappen
and the corresponding EventData
could be located in domain/my/aggregate/let_it_happen.go
.