Skip to content

Aggregate

The concept of Aggregates (or Aggregate Roots) originates from Domain-Driven Design (DDD) and is fundamental in modeling domain entities within complex systems. An Aggregate represents a cluster of related objects that are treated as a single unit, with the Aggregate (Root) serving as the entry point for managing the internal state of the group. The Aggregate establishes a boundary that ensures consistency within this group, meaning that any changes to its internal entities must go through the Aggregate (Root), maintaining the integrity of the domain's business rules.

In Event Sourcing, the Aggregate (Root) is the primary entity retrieved from the AggregateRepository. This repository relies on the EventRepository, which in turn interacts with the underlying EventStore. The Aggregate Root is responsible for processing actions (often referred to as intentions) and generating the corresponding Events. Additionally, it applies these Events to itself in order to update its state. By replaying past events, the Aggregate (Root) reconstructs its current state without relying on traditional persistence mechanisms, ensuring that its behavior remains consistent with the event stream.

Aggregates are short living objects. They are created, used and discarded in a single transaction. They are not meant to be persisted.

Aggregate interface

In comby an Aggregate (Root) is defined as an interface as follows:

go
type Aggregate interface {
	// GetTenantUuid returns the unique identifer of the tenant associated with the aggregate.
	GetTenantUuid() string

	// SetTenantUuid sets the unique identifer of the tenant associated with the aggregate.
	SetTenantUuid(tenantUuid string)

	// GetDomain returns the domain of the aggregate.
	GetDomain() string

	// GetAggregateUuid returns the unique identifer of the aggregate.
	GetAggregateUuid() string

	// SetAggregateUuid sets the unique identifer of the aggregate.
	SetAggregateUuid(aggregateUuid string)

	// GetVersion returns the current version of the aggregate.
	GetVersion() int64

	// SetVersion sets the current version of the aggregate.
	SetVersion(version int64)

	// GetUncommittedEvents returns a slice of uncommitted events associated with the aggregate.
	GetUncommittedEvents() []Event

	// AddUncommittedEvents adds a new uncommitted event to the aggregate.
	AddUncommittedEvents(evt Event)

	// ClearUncommittedEvents clears all uncommitted events from the aggregate.
	ClearUncommittedEvents()

	// IsDeleted returns true if the aggregate is marked as deleted.
	IsDeleted() bool

	// EventHandler is embedded to allow domain-specific event handling.
	EventHandler
}

The Aggregate interface defines the core contract for any aggregate in an Event Sourcing system. It provides methods for managing multi-tenancy (TenantUuid), the aggregate's unique identifier (AggregateUuid), handling versioning, and registering domain event handlers and accessing uncommitted events. Additionally, it includes a method to check if the aggregate is marked as deleted (IsDeleted), ensuring proper state management and event consistency.

The interface also embedds the EventHandler interface which is used internally to handle domain-specific event (DomainEvt) registration. This allows any aggregate to define typed domain event handling methods, ensuring that domain events are processed correctly and consistently within the aggregate.

Specifically, it allows the user to register their typed domain event-specific methods via a helper function in the aggregate itself. The correct assignment is done by the internal EventHandler, which is implemented in the BaseAggregate.

Base Aggregate

The framework includes a default implementation of the Aggregate interface, called BaseAggregate, which fully implements the required methods and serves as the foundation for developers building domain-specific aggregates. Developers must embed BaseAggregate within their own custom Aggregate structs to inherit its functionality. By embedding BaseAggregate into custom aggregate structs, developers can inherit essential functionality, drastically reducing boilerplate code while streamlining the development of domain aggregates.

The BaseAggregate automates critical Event Sourcing tasks, such as:

  • Versioning: Tracks the version of the aggregate.
  • Event Management: Handles uncommitted changes and event processing.
  • Domain Event Handling: Manages and invokes handlers for domain events.

Definition of BaseAggregate

go
type BaseAggregate struct {
	*BaseIdentifier
	TenantUuid          string
	AggregateUuid       string
	Version             int64
	Deleted             bool
	Changes             []Event
	domainEventHandlers []DomainEventHandlerEntry
}

The BaseAggregate provides a comprehensive foundation for managing domain aggregates. It includes metadata management through its integration with BaseIdentifier, which supplies Domain and Name information. Additionally, the BaseAggregate incorporates fields such as TenantUuid and AggregateUuid to support multitenancy and uniquely identify aggregate instances.

Event handling is a core capability of the BaseAggregate. It maintains a list of uncommitted events in the Changes field, allowing developers to track modifications to the aggregate easily. Internally, it uses the domainEventHandlers slice to register and manage handlers for domain-specific events, ensuring that these events are processed correctly.

As an implementation of the EventHandler interface, the BaseAggregate provides two crucial methods: GetDomainEventHandlers and GetDomainEvents. These methods enable the aggregate to return its registered event handlers and the list of its owned domain events, respectively. This functionality is vital for the framework's Facade, which relies on this information to load and process domain events accurately.

By combining metadata management, event tracking, and interface implementation, the BaseAggregate ensures that developers can focus on domain-specific logic while leveraging a robust and consistent foundation for event-sourced aggregates.

User-Defined Aggregate

Developers can easily extend the functionality of the BaseAggregate by embedding it into their custom aggregate struct. This approach allows the developer's aggregate to inherit all the essential functionality of BaseAggregate while providing the flexibility to define additional fields and methods specific to the domain.

A user-defined aggregate might look like this:

go
type MyAggregate struct {
	*comby.BaseAggregate 	// Use a pointer to ensure proper initialization
	
	// User-defined fields:

    // References
    OtherAggregateUuid string
    OtherEntityUid string

    // Entities
    MyEntity *MyEntity

    // Value Objects
    Name string
    Value int
}

In the context of Event Sourcing, developers are encouraged to follow conceptual rules that align with Domain-Driven Design (DDD) principles. In this example, the MyAggregate is logically separated into three sections:

  • References: Fields like OtherAggregateUuid and OtherEntityUid establish connections to related aggregates or entities.
  • Entities: Encapsulated domain entities (e.g. MyEntity) that have their lifecycle managed by the aggregate.
  • Value Objects: Simple fields such as Name and Value.

This structure ensures clarity and consistency, making it easier to reason about the aggregate's role in the system while adhering to DDD best practices.

References

An Aggregate can only maintain references to other aggregates, ensuring each aggregate remains the sole authority over its internal state. If an aggregate needs to interact with a specific entity within another aggregate, both unique identifiers — the aggregate's and the entity's — must be provided. This guarantees that the other Aggregate remains the single point of control for managing its internal state.

In the example above, OtherAggregateUuid represents a reference to the external aggregate, while OtherEntityUid refers to an entity within that aggregate. By convention, Uuid is used for aggregates, and Uid is used for entities to distinguish their roles clearly.

This also implies that MyAggregate does not have direct access to the internal state of OtherAggregate (or even OtherEntity). Instead, all interactions must occur through a higher level of abstraction, such as Commands, Queries, or though listening to other's events.

A real-world example would be an Identity that has a reference to an Account. However, an Account can have multiple Identities, which is why Identity and Account are aggregates and not entities. Both aggregates can "exist" without the existence of the other.

Entities

Entities represent the core domain objects within an Aggregate, encapsulating the data and business logic for a specific entity type. In the example, MyEntity is an entity that is part of MyAggregate. However, entities can only exist within an aggregate; the user cannot address an entity without an aggregate.

A real-world example would be an Identity aggregate with a related Profile entity. In this case, the Profile can only exist in conjunction with the Identity — if the Identity is deleted, the Profile is automatically deleted as well. This strong lifecycle dependency is why Profile is modeled as an entity rather than an aggregate. As an entity, Profile is directly managed within the boundary of the Identity aggregate, ensuring that its existence and state are tightly coupled to the Identity. Aggregates, by contrast, have independent lifecycles and are treated as separate, self-contained units within the domain.

Value Objects

Value Objects represent immutable, self-contained objects that encapsulate specific attributes or properties within an Aggregate. Unlike entities, Value Objects do not have a unique identity and are defined solely by their attributes. In the example, Name and Value are Value Objects that are part of MyAggregate.

NewAggregate()

In comby, it is a convention for every aggregate to provide a NewAggregate method. This convention is essential to ensure that the aggregate is correctly initialized with all necessary internal structures and fields. Without this method, attempting to use the aggregate struct directly would likely result in incorrect or incomplete initialization, breaking critical functionality required for Event Sourcing and Domain-Driven Design (DDD).

go
// background: NewAggregate must fullfill this signature
type NewAggregateFunc[T Aggregate] func() T

The NewAggregate method acts as a factory function, encapsulating the initialization logic for both the embedded BaseAggregate and any additional fields and domain event handlers defined by the developer. Here's an example:

go
func NewAggregate() *MyAggregate {
	agg := &MyAggregate{}
	agg.BaseAggregate = comby.NewBaseAggregate()
	// Additional initialization logic can be added here if needed.
	return agg
}

This pattern ensures:

  • The BaseAggregate is properly initialized, setting up its core functionality, such as versioning, event tracking, and domain-event handling.
  • Developers have a central place to define any custom initialization logic, making the aggregate ready for use immediately after creation.

If you were to use the aggregate struct directly (e.g. by instantiating it with a simple &MyAggregate{}), the embedded BaseAggregate would not be initialized, and crucial features like event handling and metadata management would not function correctly. This could lead to runtime errors or inconsistencies in the system's behavior.

The aggregate struct MyAggregate and its corresponding factory function (NewAggregate) could be implemented in domain/my/aggregate/aggregate.go.

Add Domain Event Handler

In comby, domain event handlers are registered using the comby.AddDomainEventHandler method. This approach ensures that domain event handlers are correctly linked to their respective aggregate and can be invoked during event processing.

For instance, if you have two domain events, AEventHappened and EventB, and your aggregate needs to handle these domain events, you would define their handlers and register them in the NewAggregate method as follows:

go
func NewAggregate() *MyAggregate {
	agg := &MyAggregate{}
	agg.BaseAggregate = comby.NewBaseAggregate()
	agg.Domain = "MyDomain"
	comby.AddDomainEventHandler(agg, agg.OnEventA)
	comby.AddDomainEventHandler(agg, agg.AndOnEventB)
	return agg
}

OnEventA and AndOnEventB are strongly typed methods defined in the MyAggregate struct using the corresponding domain events as parameter. These methods implement the logic for handling AEventHappened and EventB, respectively for the aggregate. In other words, they define how the aggregate should react when these domain events occur.

The comby.AddDomainEventHandler function binds these methods to the appropriate domain events, ensuring they are called whenever the corresponding domain event is processed. Instead of directly using agg.AddDomainEventHandler, the framework uses comby.AddDomainEventHandler to centralize and standardize the registration process. This abstraction enforces type safety and ensures compatibility with the underlying event-handling infrastructure. For more details on this design decision, refer to the FAQ.

The Domain Event Handlers OnEventA and AndOnEventB are strongly typed. Both methods must have a specific signature to ensure they can be correctly invoked by the framework:

go
// background: `OnEventA` and `AndOnEventB` must fullfill this signature
type TypedDomainEventHandlerFunc[T DomainEvt] func(ctx context.Context, evt Event, domainEvt T) error

For example, the domain event handler OnEventA might look like this:

go
type AEventHappend struct {
	Value      string  `json:"value"`
}

func (agg *MyAggregate) OnEventA(ctx context.Context, evt comby.Event, domainEvt *AEventHappend) error {
	agg.Value = domainEvt.Value
	return nil
}

The AEventHappend struct defines the schema of the domain event, which includes fields such as MyAggUuid to identify the target aggregate and Value to hold domain event-specific data. The OnEventA method processes this event with a clear and concise signature.

It accepts three parameters:

  • context.Context (ctx) - which allows for cancellation or deadline management during execution
  • comby.Event (evt) - which serves as a wrapper providing metadata like version or timestamp
  • strongly-typed domainEvt of type *AEventHappend - containing the payload specific to the event

This ensures that the aggregate reflects the latest changes from the event. Finally, the method returns an error, allowing the system to handle any issues that might occur during the processing of the event. If no error occurs, the method signals successful processing by returning nil. This design maintains the principles of event sourcing by cleanly separating event processing and state mutation.

Add Domain Intention

An Aggregate in comby is responsible for processing user intentions and generating corresponding events. Intentions are implemented as receiver methods on the aggregate, allowing them to encapsulate business logic and ensure the state of the aggregate is appropriately modified in response to user actions.

When an intention is executed, the aggregate's current state must be checked against the requested change. If no difference exists—such as when an update to a value object matches the existing state—it is unnecessary to generate a new event. However, if the requested change introduces a difference, a new event is created using the helper function comby.ApplyDomainEvent. This function takes the aggregate instance and the DomainEvt object representing the intention and generates a new event that reflects the aggregate's state and the specific domain event.

comby.ApplyDomainEvent does more than just create the event. It applies the event to the aggregate, handling all steps required for Event Sourcing, such as updating the aggregate's Version and tracking new events to be persisted in the EventStore. Additionally, it automatically invokes the appropriate domain event handler, as defined by the developer, to update the aggregate's state based on the event's data.

By convention, the receiver method for the intention and the definition of the corresponding DomainEvt are implemented in the same file. This organizational pattern promotes clarity and consistency, making it easier for developers to understand and maintain the codebase. For example, consider the following implementation of an intention, DoSomethingA:

go
func (agg *MyAggregate) DoSomethingA(value string) error {

	// business logic
	if len(value) < 1 {
		return fmt.Errorf("value must be set")
	}

	// create new event based on the current aggregate
	domainEvt := &AEventHappend{
		Value:     value,
	}

	// apply event on the current aggregate
	return comby.ApplyDomainEvent(agg, domainEvt, true)
}

In this example:

  • The DoSomethingA method validates the input, ensuring the business logic constraints are met.
  • If the constraints are satisfied, it creates a new domain event AEventHappend, which encapsulates the intention.
  • The comby.ApplyDomainEvent function applies the event, ensuring all Event Sourcing mechanisms are executed, such as versioning, state mutation, and event tracking.

The generated event is then added to the aggregate and will later be persisted in the EventStore by the Facade.

INFO

  • DomainEvt names are written in the past tense (e.g., AEventHappend)
  • intention methods use descriptive names reflecting the action (e.g., DoSomethingA).

For better maintainability and adherence to best practices, it is recommended to organize aggregates and their intentions within a dedicated aggregate package inside a domain-specific package. For instance:

  • The MyAggregate struct could be defined in domain/my/aggregate/aggregate.go.
  • The intention DoSomethingA and its corresponding domain event (AEventHappend) and domain event handler (OnEventA) could be implemented in domain/my/aggregate/do.something.a.go.

Writing Tests

When writing tests for aggregates, it is essential to cover the aggregate's core functionality, including event generation, state mutation, and event handling. By testing these aspects, developers can ensure that the aggregate behaves as expected and maintains consistency throughout its lifecycle.

Here is one simply example to test the Place intention. The test checks if the aggregate correctly processes the quantity of the to place order and updates its state accordingly:

go
// simple/domain/order/aggregate/aggregate_test.go
package aggregate_test

import (
	"testing"

	"comby.io/examples/simple/domain/order/aggregate"
	"github.com/gradientzero/comby/v2"
)

func TestOrderPlaceInvalidQuantity(t *testing.T) {
	// create aggregate
	agg := aggregate.NewAggregate()

	// create order item
	itemQ1 := &aggregate.Item{
		ItemUid:  comby.NewUuid(),
		Sku:      "sku",
		Quantity: 1,
	}
	itemQ0 := &aggregate.Item{
		ItemUid:  comby.NewUuid(),
		Sku:      "sku",
		Quantity: 0,
	}

	// execute logic
	if err := agg.Place(itemQ1); err != nil {
		t.Fatal(err)
	}
	if err := agg.Place(itemQ0); err == nil {
		t.Fatalf("expected error")
	}

	// check generated event
	evts := agg.GetUncommittedEvents()
	if len(evts) != 1 {
		t.Fatalf("expected one event")
	}
	evt := evts[0]

	// technical check
	if evt.GetDomainEvtName() != comby.GetTypeName(aggregate.OrderPlacedEvent{}) {
		t.Fatal(evt)
	}

	// content check
	domainEvt := evt.GetDomainEvt().(*aggregate.OrderPlacedEvent)
	if domainEvt.Item.Quantity != 1 {
		t.Fatalf("expected quantity 1")
	}
	if domainEvt.Item.Sku != "sku" {
		t.Fatalf("expected sku")
	}
}

Field Shadowing

If an user-defined aggregate (or struct in general) contains an embedded type and both the outer struct and the embedded type declare fields with the same name, the outer struct's fields take precedence and "shadow" the embedded type's fields, which can result in unexpected behavior.

The following fields require your attention when defining custom aggregates:

  • Domain
  • Name
  • TenantUuid
  • AggregateUuid
  • Version
  • Deleted
  • Changes

For example: If Domain is defined as an additional field in the MyAggregate, the developer must distinguish between his Domain field (MyAggregate) and the embedded Domain (BaseIdentifer) by explicitly using the embedded variable.

go
type MyAggregate struct {
	*comby.BaseAggregate
	Domain string	// <- Domain is shadowed (a)
	Version int64	// <- Version is shadowed (b)
}

...
func NewAggregate() *MyAggregate {
	agg := &MyAggregate{}
	agg.BaseAggregate = comby.NewBaseAggregate()
	agg.Domain = "MyDomain" // <- shadowed (a), not setting the embedded Domain
	agg.BaseIdentifier.Domain = "MyDomain" // <- correct
	agg.Version = 100 // <- shadowed (b), not setting the embedded Version
	agg.BaseAggregate.Version = 0 // <- correct
	return agg
}