Skip to content

Aggregate Repository

The AggregateRepository in comby is a type-safe generic an short-lived repository for managing aggregates in a CQRS (Command Query Responsibility Segregation) and Event Sourcing architecture. It provides operations to retrieve, list, and delete aggregates while leveraging the EventRepository to apply events to aggregates dynamically. Unlike the EventRepository, the AggregateRepository is a generic, short-lived repository for managing aggregates specifically created, used and afterwards discarded. This means that users must instantiate a new AggregateRepository for each aggregate to perform operations.

INFO

In the diagram above, we use the EventRepository from the facade. However, this is not strictly necessary. Depending on the use case, an EventRepository can be created independently, requiring only the EventStore and DomainEventProvider to be passed to it. This is particularly useful for testing purposes. In production environments, however, the EventRepository is typically always accessed via the facade. See Event Respository documentation for more information.

Integration with EventRepository

To create an AggregateRepository, you need the EventRepository and the NewAggregate function of the aggregate. The EventRepository is typically accessed through the facade, which already provides a ready to use EventRepository. The NewAggregate function is obtained from the aggregate itself, as every aggregate is required to provide this function.

go
import "github.com/gradientzero/comby/v2"
import "github.com/gradientzero/comby/v2/domain/tenant/aggregate"

// create facade with event store
fc, _ := comby.NewFacade()

// create tenant aggregate repository
tenantAggregateRepo := comby.NewAggregateRepository(
    fc.GetEventRepository(),
    aggregate.NewAggregate,
)

// fully loaded aggregate "myAgg" of type *aggregate.Tenant
myAgg, _ := tenantAggregateRepo.GetAggregate(ctx, AnyTenantUuid)

Core Responsibilities

  • Generic Type Support: The repository uses Go generics to allow type-safe handling of aggregates, ensuring compile-time correctness and flexibility for different aggregate types.
  • State Reconstruction: Aggregates are retrieved by replaying a sequence of events stored in the EventStore, allowing precise reconstruction of aggregate state.
  • Retrieve: Fetches an aggregate by UUID and applies events up to a specified version if provided.
  • List: Lists aggregates with filtering and pagination options.
  • Delete: Deletes all events associated with an aggregate, effectively removing its state from the system.
  • Concurrency: Implements concurrent aggregate loading for improved performance in bulk operations.
  • Filter and Sort: Supports filtering by tenant UUID, pagination via offset and limit, sorting by version, and optional inclusion of deleted aggregates.

Key Methods

Get Aggregate

Fetches a specific aggregate by its UUID. It retrieves all associated events from the EventRepository, applies them to a new instance of the aggregate, and optionally stops applying events once a specified Version is reached.

go
GetAggregate(ctx context.Context, aggregateUuid string, opts ...AggregateRepositoryGetOption) (T, error)

Returns the aggregate instance and an error if the operation fails.

OptionsDescription
AggregateRepositoryGetOptionWithVersion(version int64)stops applying events once the specified version is reached (including).

Example:

go
ctx := context.Background()
repo := NewAggregateRepository[*MyAggregate](eventRepo, func() *MyAggregate { return &MyAggregate{} })
aggregate, err := repo.GetAggregate(ctx, "some-aggregate-uuid")
if err != nil {
    log.Fatal(err)
}

List Aggregates

Lists aggregates with advanced filtering and pagination. Aggregates are identified by their unique UUIDs, and their state is reconstructed dynamically. Supports filtering by tenant UUID and includes an option to include deleted aggregates.

go
ListAggregates(ctx context.Context, opts ...AggregateRepositoryListOption) ([]T, int64, error)

Returns a list of aggregates, the total amount of found aggregates, and an error if the operation fails.

OptionsDescription
AggregateRepositoryListOptionWithTenantUuid(tenantUuid string)filters aggregates by tenant unique identifier.
AggregateRepositoryListOptionOffset(offset int64)sets the offset for pagination.
AggregateRepositoryListOptionLimit(limit int64)sets the limit for pagination.
AggregateRepositoryListOptionAscending(ascending bool)sets the sorting order to ascending.
AggregateRepositoryListOptionWithIncludeDeleted(includeDeleted bool)includes deleted aggregates in the result list.

Example:

go
ctx := context.Background()
aggregates, total, err := repo.ListAggregates(ctx,
    AggregateRepositoryListOptionWithTenantUuid("tenant-uuid"),
    AggregateRepositoryListOptionLimit(10),
    AggregateRepositoryListOptionOffset(0),
)
if err != nil {
    log.Fatal(err)
}

Delete Aggregate

Deletes all events associated with a specific aggregate UUID. This operation removes the aggregate's state from the system without directly interacting with the aggregate instance.

go
DeleteAggregate(ctx context.Context, aggregateUuid string) error

Returns an error if the operation fails.

Example:

go
ctx := context.Background()
err := repo.DeleteAggregate(ctx, "some-aggregate-uuid")
if err != nil {
    log.Fatal(err)
}