Aggregate Repository
The AggregateRepository
in comby is a type-safe generic an short-lived repository for managing aggregates in a CQRS (Command Query Responsibility Segregation) and Event Sourcing architecture. It provides operations to retrieve, list, and delete aggregates while leveraging the EventRepository
to apply events to aggregates dynamically. Unlike the EventRepository
, the AggregateRepository
is a generic, short-lived repository for managing aggregates specifically created, used and afterwards discarded. This means that users must instantiate a new AggregateRepository
for each aggregate to perform operations.
INFO
In the diagram above, we use the EventRepository
from the facade. However, this is not strictly necessary. Depending on the use case, an EventRepository can be created independently, requiring only the EventStore
and DomainEventProvider
to be passed to it. This is particularly useful for testing purposes. In production environments, however, the EventRepository is typically always accessed via the facade. See Event Respository documentation for more information.
Integration with EventRepository
To create an AggregateRepository
, you need the EventRepository
and the NewAggregate function of the aggregate. The EventRepository is typically accessed through the facade, which already provides a ready to use EventRepository. The NewAggregate function is obtained from the aggregate itself, as every aggregate is required to provide this function.
import "github.com/gradientzero/comby/v2"
import "github.com/gradientzero/comby/v2/domain/tenant/aggregate"
// create facade with event store
fc, _ := comby.NewFacade()
// create tenant aggregate repository
tenantAggregateRepo := comby.NewAggregateRepository(
fc.GetEventRepository(),
aggregate.NewAggregate,
)
// fully loaded aggregate "myAgg" of type *aggregate.Tenant
myAgg, _ := tenantAggregateRepo.GetAggregate(ctx, AnyTenantUuid)
Core Responsibilities
- Generic Type Support: The repository uses Go generics to allow type-safe handling of aggregates, ensuring compile-time correctness and flexibility for different aggregate types.
- State Reconstruction: Aggregates are retrieved by replaying a sequence of events stored in the
EventStore
, allowing precise reconstruction of aggregate state. - Retrieve: Fetches an aggregate by UUID and applies events up to a specified version if provided.
- List: Lists aggregates with filtering and pagination options.
- Delete: Deletes all events associated with an aggregate, effectively removing its state from the system.
- Concurrency: Implements concurrent aggregate loading for improved performance in bulk operations.
- Filter and Sort: Supports filtering by tenant UUID, pagination via offset and limit, sorting by version, and optional inclusion of deleted aggregates.
Key Methods
Get Aggregate
Fetches a specific aggregate by its UUID. It retrieves all associated events from the EventRepository
, applies them to a new instance of the aggregate, and optionally stops applying events once a specified Version is reached.
GetAggregate(ctx context.Context, aggregateUuid string, opts ...AggregateRepositoryGetOption) (T, error)
Returns the aggregate instance and an error if the operation fails.
Options | Description |
---|---|
AggregateRepositoryGetOptionWithVersion(version int64) | stops applying events once the specified version is reached (including). |
Example:
ctx := context.Background()
repo := NewAggregateRepository[*MyAggregate](eventRepo, func() *MyAggregate { return &MyAggregate{} })
aggregate, err := repo.GetAggregate(ctx, "some-aggregate-uuid")
if err != nil {
log.Fatal(err)
}
List Aggregates
Lists aggregates with advanced filtering and pagination. Aggregates are identified by their unique UUIDs, and their state is reconstructed dynamically. Supports filtering by tenant UUID and includes an option to include deleted aggregates.
ListAggregates(ctx context.Context, opts ...AggregateRepositoryListOption) ([]T, int64, error)
Returns a list of aggregates, the total amount of found aggregates, and an error if the operation fails.
Options | Description |
---|---|
AggregateRepositoryListOptionWithTenantUuid(tenantUuid string) | filters aggregates by tenant unique identifier. |
AggregateRepositoryListOptionOffset(offset int64) | sets the offset for pagination. |
AggregateRepositoryListOptionLimit(limit int64) | sets the limit for pagination. |
AggregateRepositoryListOptionAscending(ascending bool) | sets the sorting order to ascending. |
AggregateRepositoryListOptionWithIncludeDeleted(includeDeleted bool) | includes deleted aggregates in the result list. |
Example:
ctx := context.Background()
aggregates, total, err := repo.ListAggregates(ctx,
AggregateRepositoryListOptionWithTenantUuid("tenant-uuid"),
AggregateRepositoryListOptionLimit(10),
AggregateRepositoryListOptionOffset(0),
)
if err != nil {
log.Fatal(err)
}
Delete Aggregate
Deletes all events associated with a specific aggregate UUID. This operation removes the aggregate's state from the system without directly interacting with the aggregate instance.
DeleteAggregate(ctx context.Context, aggregateUuid string) error
Returns an error if the operation fails.
Example:
ctx := context.Background()
err := repo.DeleteAggregate(ctx, "some-aggregate-uuid")
if err != nil {
log.Fatal(err)
}