Skip to content

Aggregate

The concept of Aggregates (or Aggregate Roots) originates from Domain-Driven Design (DDD) and is fundamental in modeling domain entities within complex systems. An Aggregate represents a cluster of related objects that are treated as a single unit, with the Aggregate (Root) serving as the entry point for managing the internal state of the group. The Aggregate establishes a boundary that ensures consistency within this group, meaning that any changes to its internal entities must go through the Aggregate (Root), maintaining the integrity of the domain's business rules.

In Event Sourcing, the Aggregate (Root) is the primary entity retrieved from the AggregateRepository. This repository relies on the EventRepository, which in turn interacts with the underlying EventStore. The Aggregate Root is responsible for processing actions (often referred to as intentions) and generating the corresponding Events. Additionally, it applies these Events to itself in order to update its state. By replaying past events, the Aggregate (Root) reconstructs its current state without relying on traditional persistence mechanisms, ensuring that its behavior remains consistent with the event stream.

Aggregate interface

In comby an Aggregate (Root) is defined as an interface as follows:

go
type Aggregate interface {
	GetTenantUuid() string
	SetTenantUuid(tenantUuid string)
	GetDomain() string
	GetAggregateUuid() string
	SetAggregateUuid(aggregateUuid string)
	GetVersion() int64
	GetUncommittedEvents() []event.Event
	ClearUncommittedEvents()
	IsDeleted() bool
	ApplyEvent(concreteAggregate Aggregate, evt event.Event, commit bool) error
	OnEventData(evtData event.EventData) error
}

The Aggregate interface defines the core contract for domain aggregates in an Event Sourcing system. It provides methods for managing multi-tenancy (TenantUuid), the aggregate's unique identifier (AggregateUuid), handling versioning, and accessing uncommitted events.

The interface also allows for applying and processing events through ApplyEvent and OnEventData, enabling the aggregate to evolve its state. Additionally, it includes a method to check if the aggregate is marked as deleted (IsDeleted), ensuring proper state management and event consistency.

Base Aggregate

The framework includes a default implementation of the Aggregate interface, called BaseAggregate, which fully implements the required methods. Developers can embed BaseAggregate within their own custom Aggregate structs to inherit its functionality. This approach allows BaseAggregate to handle essential Event Sourcing tasks automatically, such as versioning, event management, and other core methods, reducing the need for boilerplate code and simplifying the development of domain aggregates.

go
type BaseAggregate struct {
	TenantUuid    string
	AggregateUuid string
	Domain        string
	Version       int64
	Deleted       bool
	Changes       []event.Event
}

The most important feature of BaseAggregate is its pre-implemented ApplyEvent method. If a developer chooses not to use BaseAggregate, they must manually handle key responsibilities such as versioning, committing events, and invoking the appropriate EventData logic. In practice, all user-defined aggregates embed BaseAggregate to automatically manage these tasks, ensuring consistency and reducing the complexity of implementing Event Sourcing manually.

go
// ApplyEvent applies new event to this base aggregate and to concrete aggregate
// within this method we do not have access to the concrete aggregate
// if we want to apply the event data to the concrete aggregate
// we have to pass the concrete aggregate
func (agg *BaseAggregate) ApplyEvent(concreteAggregate Aggregate, evt event.Event, commit bool) error {

	// Event must have the same Version
	// Background: One or more events were generated using the same aggregate version.
	// This happens easily in asynchronous environments and needs to be dealt with.
	// Therefore only events are accepted that correspond to the current version of the aggregate.
	// Even if we enforce event version in commands we have to deal with this possible conflict situation.
	if agg.GetVersion() != evt.GetVersion() {
		logger.Warn(
			"Inconsistency detected",
			"eventUuid", evt.GetUuid(),
			"eventVersion", evt.GetVersion(),
			"aggregateVersion", agg.GetVersion(),
			"aggregateUuid", evt.GetAggregateUuid(),
			"domain", evt.GetDomain(),
		)
	}

	// Event is valid increase aggregate Version
	agg.Version = agg.Version + 1

	// Only apply new event to Changes
	if commit {
		agg.Changes = append(agg.Changes, evt)
	}

	// update domain specific aggregate information
	evtData := evt.GetData()
	if evtData != nil {
		if err := concreteAggregate.OnEventData(evtData); err != nil {
			return fmt.Errorf("%s could not apply change, err %s", concreteAggregate.GetDomain(), err)
		}
	} else {
		return fmt.Errorf("%s could not apply change, unknown type %s", concreteAggregate.GetDomain(), evt.GetDataType())
	}
	return nil
}

Two key points are important to understand:

  1. In Go, embedded instances cannot directly access the "parent" or enclosing instance. To work around this, we pass the "parent" instance (referred to as concreteAggregate) as a parameter.

  2. When processing an Event, the actual event data is stored within the EventData. So when we talk about events like MetricAdded, we are actually talking about EventData. The Event itself is a construct by the framework. The user usually works with EventData, which is automatically packed and unpacked into Events. The Event object provides additional contextual information like timestamps and versioning.

  3. This method also defines our entry point for processing events (OnEventData).

User-Defined Aggregate

To create a custom Aggregate, developers must define a struct that embeds BaseAggregate which implements the Aggregate interface. This struct represents the domain entity that will interact with the Event Sourcing system, encapsulating the data and business logic for a specific bounded context.

go
type MyAggregate struct {
    // embedd BaseAggregate
	base.BaseAggregate

    // from here user fields:

    // References
    OtherAggregateUuid string
    OtherEntityUid string

    // Entities
    MyEntity *MyEntity

    // Value Objects
    Name string
    Value int
}

In Event Sourcing, developers must follow certain conceptual rules to maintain consistency and adhere to best practices in Domain-Driven Design (DDD). Here MyAggregateis seperated into References, Entities, and Value Objects.

References

An Aggregate can only maintain references to other aggregates, ensuring each aggregate remains the sole authority over its internal state. If an aggregate needs to interact with a specific entity within another aggregate, both unique identifiers — the aggregate's and the entity's — must be provided. This guarantees that the other Aggregate remains the single point of control for managing its internal state.

In the example above, OtherAggregateUuid represents a reference to the external aggregate, while OtherEntityUid refers to an entity within that aggregate. By convention, Uuid is used for aggregates, and Uid is used for entities to distinguish their roles clearly.

This also implies that MyAggregate does not have direct access to the internal state of OtherAggregate (or even OtherEntity). Instead, all interactions must occur through a higher level of abstraction, such as Commands, Queries, or though listening to other's events.

A real-world example would be an Identity that has a reference to an Account. However, an Account can have multiple Identities, which is why Identity and Account are aggregates and not entities. Both aggregates can "exist" without the existence of the other.

Entities

Entities represent the core domain objects within an Aggregate, encapsulating the data and business logic for a specific entity type. In the example, MyEntity is an entity that is part of MyAggregate. However, entities can only exist within an aggregate; the user cannot address an entity without an aggregate.

A real-world example would be an Identity aggregate with a related Profile entity. In this case, the Profile can only exist in conjunction with the Identity — if the Identity is deleted, the Profile is automatically deleted as well. This strong lifecycle dependency is why Profile is modeled as an entity rather than an aggregate. As an entity, Profile is directly managed within the boundary of the Identity aggregate, ensuring that its existence and state are tightly coupled to the Identity. Aggregates, by contrast, have independent lifecycles and are treated as separate, self-contained units within the domain.

Value Objects

Value Objects represent immutable, self-contained objects that encapsulate specific attributes or properties within an Aggregate. Unlike entities, Value Objects do not have a unique identity and are defined solely by their attributes. In the example, Name and Value are Value Objects that are part of MyAggregate.

Intentions

An Aggregate holds the responsibility of processing user intentions and generating corresponding events. User intentions are implemented as receiver methods within the Aggregate. When executing an intention, the current state of the aggregate should be checked against the intention.

If the intention is to update a specific value object, but there is no difference with the current aggregate's state, it does not make sense to generate a new Event. However, if there are differences, a new Event is generated using the helper function base.NewEventFromAggregate. This function takes the aggregate instance and the intention's EventData to create a new Event containing the corresponding EventData for this particular aggregate in its current state.

Next, the intention applies the newly generated Event to the aggregate using another helper function base.ApplyEvent. This function takes care of all necessary steps for Event Sourcing, including versioning and tracking new events to be committed in the EventStore.

Moreover, base.ApplyEvent automatically executes the underlying ApplyEvent of the embedded BaseAggregate and triggers OnEventData for the corresponding aggregate, as implemented by the user.

By convention, the intention receiver method and the definition of EventData are implemented in the same file. This design promotes clarity and consistency, making it easier for developers to comprehend and maintain the codebase. Here is one example intention LetItHappen for MyAggregate:

go
type MyAggregateHappendEvent struct {
	MyAggregateUuid string
	Name            string
    Value           int
}

func (agg *MyAggregate) LetItHappen(name string, value int) error {

	// business logic
	if len(name) < 1 {
		return fmt.Errorf("name is required")
	}
    if value <= 0 || value > 100 {
		return fmt.Errorf("value must be zero or greater and less than 100")
	}

	// create new event based on the current aggregate
	evt := base.NewEventFromAggregate(agg, &MyAggregateHappendEvent{
		MyAggregateUuid: agg.GetAggregateUuid(),
		Name:            name,
		Value:           value,
	})

	// apply event on the current aggregate
	return agg.ApplyEvent(agg, evt, true)
}

The new Event was added to the aggregate and will be persisted by the Facade in the EventStore later. By convention EventData names are in past tense. In this example the intention is named LetItHappen and the corresponding EventData is named MyAggregateHappendEvent.

In addition, based on best practices, it is recommended to implement the aggregate and its intentions in a separate package called aggregate, which lives within a domain-specific package. For example, the MyAggregate could be located in domain/my/aggregate/aggregate.go. The intention LetItHappen and the corresponding EventData could be located in domain/my/aggregate/let_it_happen.go.