Aggregate
The concept of Aggregates
(or Aggregate Roots) originates from Domain-Driven Design (DDD)
and is fundamental in modeling domain entities within complex systems. An Aggregate represents a cluster of related objects that are treated as a single unit, with the Aggregate (Root) serving as the entry point for managing the internal state of the group. The Aggregate establishes a boundary that ensures consistency within this group, meaning that any changes to its internal entities must go through the Aggregate (Root), maintaining the integrity of the domain's business rules.
In Event Sourcing
, the Aggregate (Root) is the primary entity retrieved from the AggregateRepository
. This repository relies on the EventRepository
, which in turn interacts with the underlying EventStore
. The Aggregate Root is responsible for processing actions (often referred to as intentions) and generating the corresponding Events
. Additionally, it applies these Events
to itself in order to update its state. By replaying past events, the Aggregate (Root) reconstructs its current state without relying on traditional persistence mechanisms, ensuring that its behavior remains consistent with the event stream.
Aggregates are short living objects. They are created, used and discarded in a single transaction. They are not meant to be persisted.
Aggregate interface
In comby an Aggregate
(Root) is defined as an interface
as follows:
type Aggregate interface {
// GetTenantUuid returns the unique identifer of the tenant associated with the aggregate.
GetTenantUuid() string
// SetTenantUuid sets the unique identifer of the tenant associated with the aggregate.
SetTenantUuid(tenantUuid string)
// GetDomain returns the domain of the aggregate.
GetDomain() string
// GetAggregateUuid returns the unique identifer of the aggregate.
GetAggregateUuid() string
// SetAggregateUuid sets the unique identifer of the aggregate.
SetAggregateUuid(aggregateUuid string)
// GetVersion returns the current version of the aggregate.
GetVersion() int64
// SetVersion sets the current version of the aggregate.
SetVersion(version int64)
// GetUncommittedEvents returns a slice of uncommitted events associated with the aggregate.
GetUncommittedEvents() []Event
// AddUncommittedEvents adds a new uncommitted event to the aggregate.
AddUncommittedEvents(evt Event)
// ClearUncommittedEvents clears all uncommitted events from the aggregate.
ClearUncommittedEvents()
// IsDeleted returns true if the aggregate is marked as deleted.
IsDeleted() bool
// EventHandler is embedded to allow domain-specific event handling.
EventHandler
}
The Aggregate
interface defines the core contract for any aggregate in an Event Sourcing system. It provides methods for managing multi-tenancy (TenantUuid
), the aggregate's unique identifier (AggregateUuid
), handling versioning, and registering domain event handlers and accessing uncommitted events. Additionally, it includes a method to check if the aggregate is marked as deleted (IsDeleted
), ensuring proper state management and event consistency.
The interface also embedds the EventHandler
interface which is used internally to handle domain-specific event (DomainEvt
) registration. This allows any aggregate to define typed domain event handling methods, ensuring that domain events are processed correctly and consistently within the aggregate.
Specifically, it allows the user to register their typed domain event-specific methods via a helper function in the aggregate itself. The correct assignment is done by the internal EventHandler, which is implemented in the BaseAggregate
.
Base Aggregate
The framework includes a default implementation of the Aggregate
interface, called BaseAggregate
, which fully implements the required methods and serves as the foundation for developers building domain-specific aggregates. Developers must embed BaseAggregate
within their own custom Aggregate structs to inherit its functionality. By embedding BaseAggregate
into custom aggregate structs, developers can inherit essential functionality, drastically reducing boilerplate code while streamlining the development of domain aggregates.
The BaseAggregate
automates critical Event Sourcing tasks, such as:
- Versioning: Tracks the version of the aggregate.
- Event Management: Handles uncommitted changes and event processing.
- Domain Event Handling: Manages and invokes handlers for domain events.
Definition of BaseAggregate
type BaseAggregate struct {
*BaseIdentifier
TenantUuid string
AggregateUuid string
Version int64
Deleted bool
Changes []Event
domainEventHandlers []DomainEventHandlerEntry
}
The BaseAggregate
provides a comprehensive foundation for managing domain aggregates. It includes metadata management through its integration with BaseIdentifier
, which supplies Domain and Name information. Additionally, the BaseAggregate
incorporates fields such as TenantUuid and AggregateUuid to support multitenancy and uniquely identify aggregate instances.
Event handling is a core capability of the BaseAggregate
. It maintains a list of uncommitted events in the Changes field, allowing developers to track modifications to the aggregate easily. Internally, it uses the domainEventHandlers slice to register and manage handlers for domain-specific events, ensuring that these events are processed correctly.
As an implementation of the EventHandler
interface, the BaseAggregate
provides two crucial methods: GetDomainEventHandlers
and GetDomainEvents
. These methods enable the aggregate to return its registered event handlers and the list of its owned domain events, respectively. This functionality is vital for the framework's Facade
, which relies on this information to load and process domain events accurately.
By combining metadata management, event tracking, and interface implementation, the BaseAggregate
ensures that developers can focus on domain-specific logic while leveraging a robust and consistent foundation for event-sourced aggregates.
User-Defined Aggregate
Developers can easily extend the functionality of the BaseAggregate
by embedding it into their custom aggregate struct. This approach allows the developer's aggregate to inherit all the essential functionality of BaseAggregate
while providing the flexibility to define additional fields and methods specific to the domain.
A user-defined aggregate might look like this:
type MyAggregate struct {
*comby.BaseAggregate // Use a pointer to ensure proper initialization
// User-defined fields:
// References
OtherAggregateUuid string
OtherEntityUid string
// Entities
MyEntity *MyEntity
// Value Objects
Name string
Value int
}
In the context of Event Sourcing, developers are encouraged to follow conceptual rules that align with Domain-Driven Design (DDD) principles. In this example, the MyAggregate
is logically separated into three sections:
- References: Fields like OtherAggregateUuid and OtherEntityUid establish connections to related aggregates or entities.
- Entities: Encapsulated domain entities (e.g.
MyEntity
) that have their lifecycle managed by the aggregate. - Value Objects: Simple fields such as Name and Value.
This structure ensures clarity and consistency, making it easier to reason about the aggregate's role in the system while adhering to DDD best practices.
References
An Aggregate
can only maintain references to other aggregates, ensuring each aggregate remains the sole authority over its internal state. If an aggregate needs to interact with a specific entity within another aggregate, both unique identifiers — the aggregate's and the entity's — must be provided. This guarantees that the other Aggregate remains the single point of control for managing its internal state.
In the example above, OtherAggregateUuid
represents a reference to the external aggregate, while OtherEntityUid
refers to an entity within that aggregate. By convention, Uuid
is used for aggregates, and Uid
is used for entities to distinguish their roles clearly.
This also implies that MyAggregate
does not have direct access to the internal state of OtherAggregate
(or even OtherEntity
). Instead, all interactions must occur through a higher level of abstraction, such as Commands
, Queries
, or though listening to other's events.
A real-world example would be an Identity
that has a reference to an Account
. However, an Account
can have multiple Identities
, which is why Identity
and Account
are aggregates and not entities. Both aggregates can "exist" without the existence of the other.
Entities
Entities represent the core domain objects within an Aggregate, encapsulating the data and business logic for a specific entity type. In the example, MyEntity
is an entity that is part of MyAggregate
. However, entities can only exist within an aggregate; the user cannot address an entity without an aggregate.
A real-world example would be an Identity
aggregate with a related Profile
entity. In this case, the Profile
can only exist in conjunction with the Identity
— if the Identity
is deleted, the Profile
is automatically deleted as well. This strong lifecycle dependency is why Profile
is modeled as an entity rather than an aggregate. As an entity, Profile is directly managed within the boundary of the Identity aggregate, ensuring that its existence and state are tightly coupled to the Identity. Aggregates, by contrast, have independent lifecycles and are treated as separate, self-contained units within the domain.
Value Objects
Value Objects represent immutable, self-contained objects that encapsulate specific attributes or properties within an Aggregate. Unlike entities, Value Objects do not have a unique identity and are defined solely by their attributes. In the example, Name
and Value
are Value Objects that are part of MyAggregate
.
NewAggregate()
In comby, it is a convention for every aggregate to provide a NewAggregate
method. This convention is essential to ensure that the aggregate is correctly initialized with all necessary internal structures and fields. Without this method, attempting to use the aggregate struct directly would likely result in incorrect or incomplete initialization, breaking critical functionality required for Event Sourcing and Domain-Driven Design (DDD).
// background: NewAggregate must fullfill this signature
type NewAggregateFunc[T Aggregate] func() T
The NewAggregate
method acts as a factory function, encapsulating the initialization logic for both the embedded BaseAggregate
and any additional fields and domain event handlers defined by the developer. Here's an example:
func NewAggregate() *MyAggregate {
agg := &MyAggregate{}
agg.BaseAggregate = comby.NewBaseAggregate()
// Additional initialization logic can be added here if needed.
return agg
}
This pattern ensures:
- The
BaseAggregate
is properly initialized, setting up its core functionality, such as versioning, event tracking, and domain-event handling. - Developers have a central place to define any custom initialization logic, making the aggregate ready for use immediately after creation.
If you were to use the aggregate struct directly (e.g. by instantiating it with a simple &MyAggregate{}
), the embedded BaseAggregate
would not be initialized, and crucial features like event handling and metadata management would not function correctly. This could lead to runtime errors or inconsistencies in the system's behavior.
The aggregate struct MyAggregate
and its corresponding factory function (NewAggregate
) could be implemented in domain/my/aggregate/aggregate.go.
Add Domain Event Handler
In comby, domain event handlers are registered using the comby.AddDomainEventHandler
method. This approach ensures that domain event handlers are correctly linked to their respective aggregate and can be invoked during event processing.
For instance, if you have two domain events, AEventHappened
and EventB
, and your aggregate needs to handle these domain events, you would define their handlers and register them in the NewAggregate
method as follows:
func NewAggregate() *MyAggregate {
agg := &MyAggregate{}
agg.BaseAggregate = comby.NewBaseAggregate()
agg.Domain = "MyDomain"
comby.AddDomainEventHandler(agg, agg.OnEventA)
comby.AddDomainEventHandler(agg, agg.AndOnEventB)
return agg
}
OnEventA
and AndOnEventB
are strongly typed methods defined in the MyAggregate
struct using the corresponding domain events as parameter. These methods implement the logic for handling AEventHappened
and EventB
, respectively for the aggregate. In other words, they define how the aggregate should react when these domain events occur.
The comby.AddDomainEventHandler
function binds these methods to the appropriate domain events, ensuring they are called whenever the corresponding domain event is processed. Instead of directly using agg.AddDomainEventHandler
, the framework uses comby.AddDomainEventHandler
to centralize and standardize the registration process. This abstraction enforces type safety and ensures compatibility with the underlying event-handling infrastructure. For more details on this design decision, refer to the FAQ.
The Domain Event Handlers OnEventA
and AndOnEventB
are strongly typed. Both methods must have a specific signature to ensure they can be correctly invoked by the framework:
// background: `OnEventA` and `AndOnEventB` must fullfill this signature
type TypedDomainEventHandlerFunc[T DomainEvt] func(ctx context.Context, evt Event, domainEvt T) error
For example, the domain event handler OnEventA
might look like this:
type AEventHappend struct {
Value string `json:"value"`
}
func (agg *MyAggregate) OnEventA(ctx context.Context, evt comby.Event, domainEvt *AEventHappend) error {
agg.Value = domainEvt.Value
return nil
}
The AEventHappend
struct defines the schema of the domain event, which includes fields such as MyAggUuid to identify the target aggregate and Value to hold domain event-specific data. The OnEventA
method processes this event with a clear and concise signature.
It accepts three parameters:
- context.Context (ctx) - which allows for cancellation or deadline management during execution
- comby.Event (evt) - which serves as a wrapper providing metadata like version or timestamp
- strongly-typed domainEvt of type *AEventHappend - containing the payload specific to the event
This ensures that the aggregate reflects the latest changes from the event. Finally, the method returns an error
, allowing the system to handle any issues that might occur during the processing of the event. If no error occurs, the method signals successful processing by returning nil
. This design maintains the principles of event sourcing by cleanly separating event processing and state mutation.
Add Domain Intention
An Aggregate
in comby is responsible for processing user intentions and generating corresponding events. Intentions are implemented as receiver methods on the aggregate, allowing them to encapsulate business logic and ensure the state of the aggregate is appropriately modified in response to user actions.
When an intention is executed, the aggregate's current state must be checked against the requested change. If no difference exists—such as when an update to a value object matches the existing state—it is unnecessary to generate a new event. However, if the requested change introduces a difference, a new event is created using the helper function comby.ApplyDomainEvent
. This function takes the aggregate instance and the DomainEvt
object representing the intention and generates a new event that reflects the aggregate's state and the specific domain event.
comby.ApplyDomainEvent
does more than just create the event. It applies the event to the aggregate, handling all steps required for Event Sourcing, such as updating the aggregate's Version and tracking new events to be persisted in the EventStore
. Additionally, it automatically invokes the appropriate domain event handler, as defined by the developer, to update the aggregate's state based on the event's data.
By convention, the receiver method for the intention and the definition of the corresponding DomainEvt
are implemented in the same file. This organizational pattern promotes clarity and consistency, making it easier for developers to understand and maintain the codebase. For example, consider the following implementation of an intention, DoSomethingA
:
func (agg *MyAggregate) DoSomethingA(value string) error {
// business logic
if len(value) < 1 {
return fmt.Errorf("value must be set")
}
// create new event based on the current aggregate
domainEvt := &AEventHappend{
Value: value,
}
// apply event on the current aggregate
return comby.ApplyDomainEvent(agg, domainEvt, true)
}
In this example:
- The
DoSomethingA
method validates the input, ensuring the business logic constraints are met. - If the constraints are satisfied, it creates a new domain event
AEventHappend
, which encapsulates the intention. - The
comby.ApplyDomainEvent
function applies the event, ensuring all Event Sourcing mechanisms are executed, such as versioning, state mutation, and event tracking.
The generated event is then added to the aggregate and will later be persisted in the EventStore
by the Facade
.
INFO
DomainEvt
names are written in the past tense (e.g.,AEventHappend
)- intention methods use descriptive names reflecting the action (e.g.,
DoSomethingA
).
For better maintainability and adherence to best practices, it is recommended to organize aggregates and their intentions within a dedicated aggregate package inside a domain-specific package. For instance:
- The
MyAggregate
struct could be defined in domain/my/aggregate/aggregate.go. - The intention
DoSomethingA
and its corresponding domain event (AEventHappend
) and domain event handler (OnEventA
) could be implemented in domain/my/aggregate/do.something.a.go.
Writing Tests
When writing tests for aggregates, it is essential to cover the aggregate's core functionality, including event generation, state mutation, and event handling. By testing these aspects, developers can ensure that the aggregate behaves as expected and maintains consistency throughout its lifecycle.
Here is one simply example to test the Place
intention. The test checks if the aggregate correctly processes the quantity of the to place order and updates its state accordingly:
// simple/domain/order/aggregate/aggregate_test.go
package aggregate_test
import (
"testing"
"comby.io/examples/simple/domain/order/aggregate"
"github.com/gradientzero/comby/v2"
)
func TestOrderPlaceInvalidQuantity(t *testing.T) {
// create aggregate
agg := aggregate.NewAggregate()
// create order item
itemQ1 := &aggregate.Item{
ItemUid: comby.NewUuid(),
Sku: "sku",
Quantity: 1,
}
itemQ0 := &aggregate.Item{
ItemUid: comby.NewUuid(),
Sku: "sku",
Quantity: 0,
}
// execute logic
if err := agg.Place(itemQ1); err != nil {
t.Fatal(err)
}
if err := agg.Place(itemQ0); err == nil {
t.Fatalf("expected error")
}
// check generated event
evts := agg.GetUncommittedEvents()
if len(evts) != 1 {
t.Fatalf("expected one event")
}
evt := evts[0]
// technical check
if evt.GetDomainEvtName() != comby.GetTypeName(aggregate.OrderPlacedEvent{}) {
t.Fatal(evt)
}
// content check
domainEvt := evt.GetDomainEvt().(*aggregate.OrderPlacedEvent)
if domainEvt.Item.Quantity != 1 {
t.Fatalf("expected quantity 1")
}
if domainEvt.Item.Sku != "sku" {
t.Fatalf("expected sku")
}
}
Field Shadowing
If an user-defined aggregate (or struct in general) contains an embedded type and both the outer struct and the embedded type declare fields with the same name, the outer struct's fields take precedence and "shadow" the embedded type's fields, which can result in unexpected behavior.
The following fields require your attention when defining custom aggregates:
- Domain
- Name
- TenantUuid
- AggregateUuid
- Version
- Deleted
- Changes
For example: If Domain is defined as an additional field in the MyAggregate
, the developer must distinguish between his Domain field (MyAggregate) and the embedded Domain (BaseIdentifer) by explicitly using the embedded variable.
type MyAggregate struct {
*comby.BaseAggregate
Domain string // <- Domain is shadowed (a)
Version int64 // <- Version is shadowed (b)
}
...
func NewAggregate() *MyAggregate {
agg := &MyAggregate{}
agg.BaseAggregate = comby.NewBaseAggregate()
agg.Domain = "MyDomain" // <- shadowed (a), not setting the embedded Domain
agg.BaseIdentifier.Domain = "MyDomain" // <- correct
agg.Version = 100 // <- shadowed (b), not setting the embedded Version
agg.BaseAggregate.Version = 0 // <- correct
return agg
}