Skip to content

Projection

In an application framework that follows the Event Sourcing (EV) and Command Query Responsibility Segregation (CQRS) patterns, there is a strict separation between the write and read sides. The write side handles commands and business logic, ensuring that changes are applied to the system's state, while the read side focuses on serving data, often in the form of projections or read models. This separation allows for greater flexibility and scalability, as each side can be optimized independently.

Projections, which are objects fullfilling the EventHandler interface, are built by applying domain events to form a materialized view of the system’s state. Using Golang's generics and interfaces, it becomes easier to define these projections by reusing code to transform domain aggregates into read models. This flexibility allows developers to efficiently create tailored read models while maintaining the strict separation of concerns within the CQRS architecture.

Projections, or Readmodels, are typically used to serve data in response to Queries in a CQRS-based architecture. The read side focuses on efficient data retrieval. Projections are built by processing these events to form a queryable representation of the system’s state. When a Query is executed, it typically interacts with one or more projections to return the required data in a fast and efficient manner.

Simple Creation of Projections from Aggregates

In many cases, existing aggregates can be directly used to build projections without impacting the aggregates themselves or the write side of the system. This allows for a straightforward way to represent the current state of the domain in a readmodel, leveraging the data that is already available in the system.

By using this approach, you can create efficient projections while maintaining the strict separation between the read and write sides. In the next section, we will explore how custom projections can be built, offering more flexibility to tailor readmodels to specific query requirements.

go
projRm := comby.NewProjectionAggregate(
	fc,
	aggregate.NewAggregate,
)
if err := comby.RegisterEventHandler(fc, projRm); err != nil {
	return err
}

// next use projection somewhere (ignore error handling for now)
key := "MyKey"
val, _ := projRm.GetModel(key)
go
// register (predefined) query handler for the readmodel
projQh := comby.NewProjectionQueryHandler(projRm)
if err := comby.RegisterQueryHandler(fc, projQh); err != nil {
	return err
}

ProjectionAggregate Options

Backend Store

You can configure which ReadmodelStoreBackend the ProjectionAggregate uses for persistence:

go
projRm := comby.NewProjectionAggregate(
	fc,
	aggregate.NewAggregate,
	comby.ProjectionAggregateOptionWithBackend(backend),
)
if err := comby.RegisterEventHandler(fc, projRm); err != nil {
	return err
}

History Events (On-Demand)

Since v2.15.0, history is loaded on-demand from the EventStore — no configuration is needed when creating the ProjectionAggregate. Simply request history when querying:

go
// No history option needed during creation
projRm := comby.NewProjectionAggregate(
	fc,
	aggregate.NewAggregate,
)
if err := comby.RegisterEventHandler(fc, projRm); err != nil {
	return err
}

// Query with history included
model, found, err := projRm.GetModel(
	aggregateUuid,
	comby.ProjectionModelGetWithHistoryEvent(true),
)
if found {
	for _, historyEvent := range model.HistoryEvents {
		fmt.Printf("Event: %s at %d\n",
			historyEvent.DomainEvtName,
			historyEvent.CreatedAt)
	}
}

Querying Lists with History

For list queries, history is also opt-in:

go
models, total, err := projRm.GetModelList(
	comby.ProjectionModelListOptionWithTenantUuid(tenantUuid),
	comby.ProjectionModelListOptionWithPage(1),
	comby.ProjectionModelListOptionWithPageSize(10),
	comby.ProjectionModelListWithHistoryEvent(true),
)

Performance Note

Each history request triggers an on-demand query to the EventStore. Only request history in queries when needed, and avoid requesting history on large list queries.

See Readmodel History Events for more details on the HistoryEventModel structure and best practices.

Projection Query Handler

When you register a projection query handler with NewProjectionQueryHandler, two query types are automatically registered:

  • Get a single projection by aggregate UUID
  • List projections with pagination, filtering, and ordering

Request and Response Types

Get

go
// Request
type ProjectionQueryGetRequest[T any] struct {
	AggregateUuid  string `json:"aggregateUuid"`
	IncludeHistory bool   `json:"includeHistory,omitempty"`
}

// Response
type ProjectionQueryGetResponse[T any] struct {
	*ProjectionModel[T] `json:"item,omitempty"`
}

List

go
// Request
type ProjectionQueryListRequest[T any] struct {
	TenantUuid     string `json:"tenantUuid,omitempty"`
	Page           int64  `json:"page,omitempty"`
	PageSize       int64  `json:"pageSize,omitempty"`
	OrderBy        string `json:"orderBy,omitempty"`
	Attributes     string `json:"attributes,omitempty"`
	IncludeHistory bool   `json:"includeHistory,omitempty"`
}

// Response
type ProjectionQueryListResponse[T any] struct {
	Models   []*ProjectionModel[T] `json:"models"`
	Total    int64                 `json:"total"`
	Page     int64                 `json:"page"`
	PageSize int64                 `json:"pageSize"`
}

Each ProjectionModel[T] wraps the aggregate with metadata:

go
type ProjectionModel[T any] struct {
	TenantUuid    string               `json:"tenantUuid,omitempty"`
	AggregateUuid string               `json:"aggregateUuid,omitempty"`
	Model         T                    `json:"model,omitempty"`
	Version       int64                `json:"version,omitempty"`
	CreatedAt     int64                `json:"createdAt,omitempty"`
	UpdatedAt     int64                `json:"updatedAt,omitempty"`
	HistoryEvents []*HistoryEventModel `json:"historyEvents,omitempty"`
}

Field Reference

FieldTypeDescription
TenantUuidstringFilter by tenant (List only)
Pageint64Page number, starting at 1 (List only)
PageSizeint64Items per page (List only, default 1000)
OrderBystringSort field (List only)
AttributesstringAttribute filter expression (List only)
IncludeHistoryboolInclude history events in response (loads on-demand from EventStore)
AggregateUuidstringUUID of the aggregate to retrieve (Get only)

Cross-Domain Projection Queries

A common pattern is to query projection data from another domain without coupling the domains at the code level. This is useful for building derived views, search functionality, or dashboards that aggregate data from multiple domains.

The key idea: use comby.NewQuery with the target domain name and a ProjectionQueryListRequest (or ProjectionQueryGetRequest) typed with the target domain's aggregate. The framework routes the query to the correct projection query handler via the domain name.

Step-by-Step Example

Suppose you have an existing Task domain with a registered projection, and you want to query its data from a Search domain.

1. Create the request with the target domain's aggregate type:

go
import (
	taskAggregate "github.com/yourorg/yourapp/domain/task/aggregate"
)

domainQry := &comby.ProjectionQueryListRequest[*taskAggregate.Task]{
	TenantUuid: tenantUuid,
	Page:       1,
	PageSize:   100,
}

2. Wrap it in a Query addressed to the target domain:

go
qry, err := comby.NewQuery("Task", domainQry)
if err != nil {
	return err
}

3. Set up the request context:

go
reqCtx := comby.NewRequestContext()
reqCtx.SenderTenantUuid = tenantUuid
reqCtx.SenderIdentityUuid = identityUuid
qry.SetReqCtx(reqCtx)
qry.SetTenantUuid(tenantUuid)

4. Dispatch the query and type-assert the response:

go
res, err := fc.DispatchQuery(ctx, qry)
if err != nil {
	return err
}

listRes, ok := res.(*comby.ProjectionQueryListResponse[*taskAggregate.Task])
if !ok {
	return fmt.Errorf("unexpected response type")
}

// Use listRes.Models, listRes.Total, etc.
for _, item := range listRes.Models {
	fmt.Println(item.AggregateUuid, item.Model)
}

Full Example: Search Query Handler

Below is a simplified query handler in a Search domain that queries projections from the Task domain:

go
package query

import (
	"context"
	"fmt"

	"github.com/gradientzero/comby/v2"
	taskAggregate "github.com/yourorg/yourapp/domain/task/aggregate"
)

type SearchRequest struct {
	TenantUuid string `json:"tenantUuid"`
	Term       string `json:"term"`
}

type SearchResponse struct {
	Results []SearchResult `json:"results"`
}

type SearchResult struct {
	AggregateUuid string `json:"aggregateUuid"`
	Domain        string `json:"domain"`
	Label         string `json:"label"`
}

func (qh *qryHandler) Search(
	ctx context.Context,
	qry comby.Query,
	domainQry *SearchRequest,
) (*SearchResponse, error) {

	// Query the Task domain's projection
	taskListQry := &comby.ProjectionQueryListRequest[*taskAggregate.Task]{
		TenantUuid: domainQry.TenantUuid,
		PageSize:   1000,
	}
	taskQry, err := comby.NewQuery("Task", taskListQry)
	if err != nil {
		return nil, fmt.Errorf("failed to create task query: %w", err)
	}

	// Set request context (reuse from incoming query)
	taskQry.SetReqCtx(qry.GetReqCtx())
	taskQry.SetTenantUuid(domainQry.TenantUuid)

	// Dispatch
	taskRes, err := qh.fc.DispatchQuery(ctx, taskQry)
	if err != nil {
		return nil, fmt.Errorf("failed to query tasks: %w", err)
	}

	// Type-assert the response
	taskListRes, ok := taskRes.(*comby.ProjectionQueryListResponse[*taskAggregate.Task])
	if !ok {
		return nil, fmt.Errorf("unexpected task query response type")
	}

	// Filter and transform results
	response := &SearchResponse{}
	for _, model := range taskListRes.Models {
		// apply search term filtering on the results
		response.Results = append(response.Results, SearchResult{
			AggregateUuid: model.AggregateUuid,
			Domain:        "Task",
			Label:         fmt.Sprintf("%v", model.Model),
		})
	}

	return response, nil
}

Use Attributes for Pre-Filtering

The Attributes field on ProjectionQueryListRequest lets you pass a filter expression to the projection store. Use it to narrow results before they reach your code, reducing the amount of data transferred and processed.

go
domainQry := &comby.ProjectionQueryListRequest[*taskAggregate.Task]{
	TenantUuid: tenantUuid,
	Attributes: `{"status":"active"}`,
	PageSize:   100,
}

Skip Authorization for Internal Queries

When dispatching cross-domain queries from server-side code (e.g., inside a reactor or query handler), you may want to bypass the authorization check. Set the ExecuteSkipAuthorization attribute on the request context:

go
import "github.com/gradientzero/comby/v2/domain/auth"

reqCtx := comby.NewRequestContext()
reqCtx.SenderTenantUuid = tenantUuid
reqCtx.Attributes.Set(auth.ExecuteSkipAuthorization, true)
qry.SetReqCtx(reqCtx)

Only use this for trusted, internal queries where the calling code has already verified authorization.