Projection
In an application framework that follows the Event Sourcing (EV) and Command Query Responsibility Segregation (CQRS) patterns, there is a strict separation between the write and read sides. The write side handles commands and business logic, ensuring that changes are applied to the system's state, while the read side focuses on serving data, often in the form of projections or read models. This separation allows for greater flexibility and scalability, as each side can be optimized independently.
Projections, which are objects fullfilling the EventHandler interface, are built by applying domain events to form a materialized view of the system’s state. Using Golang's generics and interfaces, it becomes easier to define these projections by reusing code to transform domain aggregates into read models. This flexibility allows developers to efficiently create tailored read models while maintaining the strict separation of concerns within the CQRS architecture.
Projections, or Readmodels, are typically used to serve data in response to Queries in a CQRS-based architecture. The read side focuses on efficient data retrieval. Projections are built by processing these events to form a queryable representation of the system’s state. When a Query is executed, it typically interacts with one or more projections to return the required data in a fast and efficient manner.
Simple Creation of Projections from Aggregates
In many cases, existing aggregates can be directly used to build projections without impacting the aggregates themselves or the write side of the system. This allows for a straightforward way to represent the current state of the domain in a readmodel, leveraging the data that is already available in the system.
By using this approach, you can create efficient projections while maintaining the strict separation between the read and write sides. In the next section, we will explore how custom projections can be built, offering more flexibility to tailor readmodels to specific query requirements.
projRm := comby.NewProjectionAggregate(
fc,
aggregate.NewAggregate,
)
if err := comby.RegisterEventHandler(fc, projRm); err != nil {
return err
}
// next use projection somewhere (ignore error handling for now)
key := "MyKey"
val, _ := projRm.GetModel(key)// register (predefined) query handler for the readmodel
projQh := comby.NewProjectionQueryHandler(projRm)
if err := comby.RegisterQueryHandler(fc, projQh); err != nil {
return err
}ProjectionAggregate Options
Backend Store
You can configure which ReadmodelStoreBackend the ProjectionAggregate uses for persistence:
projRm := comby.NewProjectionAggregate(
fc,
aggregate.NewAggregate,
comby.ProjectionAggregateOptionWithBackend(backend),
)
if err := comby.RegisterEventHandler(fc, projRm); err != nil {
return err
}History Events (On-Demand)
Since v2.15.0, history is loaded on-demand from the EventStore — no configuration is needed when creating the ProjectionAggregate. Simply request history when querying:
// No history option needed during creation
projRm := comby.NewProjectionAggregate(
fc,
aggregate.NewAggregate,
)
if err := comby.RegisterEventHandler(fc, projRm); err != nil {
return err
}
// Query with history included
model, found, err := projRm.GetModel(
aggregateUuid,
comby.ProjectionModelGetWithHistoryEvent(true),
)
if found {
for _, historyEvent := range model.HistoryEvents {
fmt.Printf("Event: %s at %d\n",
historyEvent.DomainEvtName,
historyEvent.CreatedAt)
}
}Querying Lists with History
For list queries, history is also opt-in:
models, total, err := projRm.GetModelList(
comby.ProjectionModelListOptionWithTenantUuid(tenantUuid),
comby.ProjectionModelListOptionWithPage(1),
comby.ProjectionModelListOptionWithPageSize(10),
comby.ProjectionModelListWithHistoryEvent(true),
)Performance Note
Each history request triggers an on-demand query to the EventStore. Only request history in queries when needed, and avoid requesting history on large list queries.
See Readmodel History Events for more details on the HistoryEventModel structure and best practices.
Projection Query Handler
When you register a projection query handler with NewProjectionQueryHandler, two query types are automatically registered:
- Get a single projection by aggregate UUID
- List projections with pagination, filtering, and ordering
Request and Response Types
Get
// Request
type ProjectionQueryGetRequest[T any] struct {
AggregateUuid string `json:"aggregateUuid"`
IncludeHistory bool `json:"includeHistory,omitempty"`
}
// Response
type ProjectionQueryGetResponse[T any] struct {
*ProjectionModel[T] `json:"item,omitempty"`
}List
// Request
type ProjectionQueryListRequest[T any] struct {
TenantUuid string `json:"tenantUuid,omitempty"`
Page int64 `json:"page,omitempty"`
PageSize int64 `json:"pageSize,omitempty"`
OrderBy string `json:"orderBy,omitempty"`
Attributes string `json:"attributes,omitempty"`
IncludeHistory bool `json:"includeHistory,omitempty"`
}
// Response
type ProjectionQueryListResponse[T any] struct {
Models []*ProjectionModel[T] `json:"models"`
Total int64 `json:"total"`
Page int64 `json:"page"`
PageSize int64 `json:"pageSize"`
}Each ProjectionModel[T] wraps the aggregate with metadata:
type ProjectionModel[T any] struct {
TenantUuid string `json:"tenantUuid,omitempty"`
AggregateUuid string `json:"aggregateUuid,omitempty"`
Model T `json:"model,omitempty"`
Version int64 `json:"version,omitempty"`
CreatedAt int64 `json:"createdAt,omitempty"`
UpdatedAt int64 `json:"updatedAt,omitempty"`
HistoryEvents []*HistoryEventModel `json:"historyEvents,omitempty"`
}Field Reference
| Field | Type | Description |
|---|---|---|
TenantUuid | string | Filter by tenant (List only) |
Page | int64 | Page number, starting at 1 (List only) |
PageSize | int64 | Items per page (List only, default 1000) |
OrderBy | string | Sort field (List only) |
Attributes | string | Attribute filter expression (List only) |
IncludeHistory | bool | Include history events in response (loads on-demand from EventStore) |
AggregateUuid | string | UUID of the aggregate to retrieve (Get only) |
Cross-Domain Projection Queries
A common pattern is to query projection data from another domain without coupling the domains at the code level. This is useful for building derived views, search functionality, or dashboards that aggregate data from multiple domains.
The key idea: use comby.NewQuery with the target domain name and a ProjectionQueryListRequest (or ProjectionQueryGetRequest) typed with the target domain's aggregate. The framework routes the query to the correct projection query handler via the domain name.
Step-by-Step Example
Suppose you have an existing Task domain with a registered projection, and you want to query its data from a Search domain.
1. Create the request with the target domain's aggregate type:
import (
taskAggregate "github.com/yourorg/yourapp/domain/task/aggregate"
)
domainQry := &comby.ProjectionQueryListRequest[*taskAggregate.Task]{
TenantUuid: tenantUuid,
Page: 1,
PageSize: 100,
}2. Wrap it in a Query addressed to the target domain:
qry, err := comby.NewQuery("Task", domainQry)
if err != nil {
return err
}3. Set up the request context:
reqCtx := comby.NewRequestContext()
reqCtx.SenderTenantUuid = tenantUuid
reqCtx.SenderIdentityUuid = identityUuid
qry.SetReqCtx(reqCtx)
qry.SetTenantUuid(tenantUuid)4. Dispatch the query and type-assert the response:
res, err := fc.DispatchQuery(ctx, qry)
if err != nil {
return err
}
listRes, ok := res.(*comby.ProjectionQueryListResponse[*taskAggregate.Task])
if !ok {
return fmt.Errorf("unexpected response type")
}
// Use listRes.Models, listRes.Total, etc.
for _, item := range listRes.Models {
fmt.Println(item.AggregateUuid, item.Model)
}Full Example: Search Query Handler
Below is a simplified query handler in a Search domain that queries projections from the Task domain:
package query
import (
"context"
"fmt"
"github.com/gradientzero/comby/v2"
taskAggregate "github.com/yourorg/yourapp/domain/task/aggregate"
)
type SearchRequest struct {
TenantUuid string `json:"tenantUuid"`
Term string `json:"term"`
}
type SearchResponse struct {
Results []SearchResult `json:"results"`
}
type SearchResult struct {
AggregateUuid string `json:"aggregateUuid"`
Domain string `json:"domain"`
Label string `json:"label"`
}
func (qh *qryHandler) Search(
ctx context.Context,
qry comby.Query,
domainQry *SearchRequest,
) (*SearchResponse, error) {
// Query the Task domain's projection
taskListQry := &comby.ProjectionQueryListRequest[*taskAggregate.Task]{
TenantUuid: domainQry.TenantUuid,
PageSize: 1000,
}
taskQry, err := comby.NewQuery("Task", taskListQry)
if err != nil {
return nil, fmt.Errorf("failed to create task query: %w", err)
}
// Set request context (reuse from incoming query)
taskQry.SetReqCtx(qry.GetReqCtx())
taskQry.SetTenantUuid(domainQry.TenantUuid)
// Dispatch
taskRes, err := qh.fc.DispatchQuery(ctx, taskQry)
if err != nil {
return nil, fmt.Errorf("failed to query tasks: %w", err)
}
// Type-assert the response
taskListRes, ok := taskRes.(*comby.ProjectionQueryListResponse[*taskAggregate.Task])
if !ok {
return nil, fmt.Errorf("unexpected task query response type")
}
// Filter and transform results
response := &SearchResponse{}
for _, model := range taskListRes.Models {
// apply search term filtering on the results
response.Results = append(response.Results, SearchResult{
AggregateUuid: model.AggregateUuid,
Domain: "Task",
Label: fmt.Sprintf("%v", model.Model),
})
}
return response, nil
}Use Attributes for Pre-Filtering
The Attributes field on ProjectionQueryListRequest lets you pass a filter expression to the projection store. Use it to narrow results before they reach your code, reducing the amount of data transferred and processed.
domainQry := &comby.ProjectionQueryListRequest[*taskAggregate.Task]{
TenantUuid: tenantUuid,
Attributes: `{"status":"active"}`,
PageSize: 100,
}Skip Authorization for Internal Queries
When dispatching cross-domain queries from server-side code (e.g., inside a reactor or query handler), you may want to bypass the authorization check. Set the ExecuteSkipAuthorization attribute on the request context:
import "github.com/gradientzero/comby/v2/domain/auth"
reqCtx := comby.NewRequestContext()
reqCtx.SenderTenantUuid = tenantUuid
reqCtx.Attributes.Set(auth.ExecuteSkipAuthorization, true)
qry.SetReqCtx(reqCtx)Only use this for trusted, internal queries where the calling code has already verified authorization.