Skip to content

Part 5: Commands

TIP

Further information about Aggregates can be found in the documentation. The starting point is: Documentation

What is a Command?

In the CQRS pattern, a command is a message that represents an intention to modify the state of the system. Commands are sent to the system and handled by a CommandHandler. The CommandHandler is responsible for validating the command, invoking the aggregate's logic, and emitting new events as a result of executing the command. Similar to events, a command is simple a structure that is processed by a CommandHandler, establishing a one-to-one relationship between the command and the CommandHandler. While a single CommandHandler can handle multiple commands, each command is always processed by only one specific CommandHandler.

Commands are created by users and sent to the facade. Unlike queries, commands are processed asynchronously, meaning the user does not immediately receive the result of the execution. However, in practice, the WaitForCmd method of the facade is often used to wait for the completion of a command. This is particularly useful in scenarios such as a REST API, where the client might need to wait for the operation to finish before proceeding.

As illustrated in the diagram, a command is dispatched by the user to the facade. The facade forwards the command to the appropriate CommandHandler, which validates the command and invokes the logic of the aggregate(s). The aggregate executes the business logic and returns any resulting events. The CommandHandler then passes these events—or an error, if one occurs—back to the facade.

The facade stores the new events in the EventStore and updates all Readmodels that react to these events. Finally, the facade returns a response to the user.

Optionally, the user can wait for the completion of the command by making a second call using comby.WaitForCmd(ctx, cmd) after dispatching the command with comby.DispatchCommand(ctx, cmd). This approach is especially useful in scenarios where immediate feedback about the command's execution status is required.

Command Handler

Like most key components in comby, our custom Command Handler (cmdHandler) is built on a base class called BaseCommandHandler. This class provides a solid foundation for implementing Command Handlers, including essential functions and structures for registering domain command handlers.

The AggregateRepository is used to load the aggregate instance and execute the aggregate's intended logic. However, not every Command Handler requires an AggregateRepository to fulfill its custom tasks. Developers are free to define their Command Handlers as needed, as long as the base class is embedded.

Here is the structure of our custom Command Handler:

go
// simple/domain/order/command/command.go
package command

import (
	"comby.io/examples/simple/domain/order/aggregate"
	"github.com/gradientzero/comby/v2"
)

type cmdHandler struct {
	*comby.BaseCommandHandler
	AggregateRepository *comby.AggregateRepository[*aggregate.Order]
}

func NewCommandHandler(
	AggregateRepository *comby.AggregateRepository[*aggregate.Order],
) *cmdHandler {
	ch := &cmdHandler{}
	ch.BaseCommandHandler = comby.NewBaseCommandHandler()
	ch.Domain = "Order"
	ch.AggregateRepository = AggregateRepository

	// register domain comannd handlers
	comby.AddDomainCommandHandler(ch, ch.PlaceOrder)
	comby.AddDomainCommandHandler(ch, ch.MarkOrderPaid)
	comby.AddDomainCommandHandler(ch, ch.MarkOrderShipped)
	comby.AddDomainCommandHandler(ch, ch.MarkOrderDelivered)
	comby.AddDomainCommandHandler(ch, ch.MarkOrderCancelled)
	comby.AddDomainCommandHandler(ch, ch.DeleteOrder)
	comby.AddDomainCommandHandler(ch, ch.UpdateOrder)
	return ch
}

Similar to aggregates, we register our domain command handlers within the Command Handler, each responsible for processing a specific domain command. For instance: The receiver method PlaceOrder processes the PlaceOrder struct, which is a domain command. UpdateOrder processes the UpdateOrder and DeleteOrder processes the DeleteOrder. The names of these domain command handlers methods can be freely chosen. What matters most is the method signature and its alignment with the associated domain command.

Domain Command Handling

Commands, like queries, are typically sent to the facade via a REST API. A command - and query - also include a request context, which contains information about the initiator and the desired intention. When using the default setup in comby, this context must be populated with the relevant details like for example: SenderTenantUuid, SenderIdentityUuid and TargetTenantUuid . The default REST API provides helper methods for this purpose, such as extracting the sender information from the Authorization Header and more. For more information, see the REST API.

It's important to note, however, that the user has the flexibility to populate the context manually—or omit it entirely—depending on the specific requirements and structure of their application.

If needed, the user can also access the request context within their domain command event handlers. However, in most cases, only the facade or its middleware requires the request context. This is typically used to perform actions such as verifying authorization (if using default) or setting a timeout for the command before it is executed. Let's start with a typical example of a domain command handler:

Place Order

The receiver method PlaceOrder takes three parameters: the context, the command, and the domain command. This fulfills the signature required for a domain command handler as defined by comby. While the method name happens to match the structure PlaceOrder, it is not mandatory to use the same name—the method name and the domain command struct name are entirely customizable.

In this case, the PlaceOrder structure represents our domain command. When a command containing this domain command structure is dispatched to the facade, this method is executed to handle it.

go
// simple/domain/order/command/place.order.go
package command

import (
	"context"
	"fmt"

	"comby.io/examples/simple/domain/order/aggregate"

	"github.com/gradientzero/comby/v2"
)

type PlaceOrderItem struct {
	Sku      string `json:"sku"`
	Quantity int64  `json:"quantity"`
}

type PlaceOrder struct {
	OrderUuid string          `json:"orderUuid"`
	Item      *PlaceOrderItem `json:"item"`
}

func (ch *cmdHandler) PlaceOrder(ctx context.Context, cmd comby.Command, domainCmd *PlaceOrder) ([]comby.Event, error) {
	// validate uuid
	if err := comby.ValidateUuid(domainCmd.OrderUuid); err != nil {
		return nil, fmt.Errorf("%s failed - orderUuid is invalid", comby.GetTypeName(domainCmd))
	}

	// retrieve existing aggregate from store
	if _agg, _ := ch.AggregateRepository.GetAggregate(ctx, domainCmd.OrderUuid); _agg != nil {
		return nil, fmt.Errorf("%s failed - order already exist", comby.GetTypeName(domainCmd))
	}

	// create new aggregate
	agg := aggregate.NewAggregate()
	agg.AggregateUuid = domainCmd.OrderUuid

	// execute logic
	if err := agg.Place(&aggregate.Item{
		Sku:      domainCmd.Item.Sku,
		Quantity: domainCmd.Item.Quantity,
	}); err != nil {
		return nil, err
	}

	// return new events
	return agg.GetUncommittedEvents(), nil
}

The PlaceOrder method itself doesn’t perform much beyond orchestrating the necessary steps. First, it checks via the AggregateRepository whether an Order aggregate with the same UUID already exists. If not, a new aggregate is created, and the Plac intention of the new aggregate object is invoked.

This Place method, defined within the aggregate structure, contains the actual business logic—in this case, adding the order item to the aggregate current object. The result of this intention could be one or more events, which can be retrieved via the GetUncommittedEvents method.

It’s important to note that at this point, nothing has been persisted to the EventStore. The intention has merely been executed, producing new events that represent the changes to the existing / new created aggregate. These changes are not visible in the system until the facade, after executing this domain command handler, persists the new events to the Event Store.

It is also important to note that multiple different aggregates and multiple intentions within a single aggregate can be invoked in a single domain command handler. The generated events from these actions are combined and returned as a single list.

However, it is strongly discouraged to call external services, such as sending an email, directly within a domain command handler.

The reason for this separation is as follows: a command is intended to invoke an aggregate's intention and modify its state, for example, setting a field in an aggregate to OrderPaid. If the domain command handler sends an email before invoking the intention, and the email service is down, the intention cannot be executed, blocking the aggregate's state truth. Such actions should be handled by a reactor, which can be used to send emails once the field has been updated. More on this: Reactors.

Update Order

The domain command handler for updating an aggregate demonstrates a convention that users can choose to adopt or ignore. The structure — our domain command: UpdateOrder — includes a key field called PatchedFields in addition to the UUID and other metadata fields. This field specifies which fields are to be updated.

In practice, a PUT/POST request with a JSON payload contains fields and their values, but it can be unreliable to distinguish whether a JSON value should be explicitly set to empty or if the field was not provided at all. This issue often depends on the underlying used API framework (e.g., chi, gin, etc.).

To remain independent of such API-specific behaviors, the PatchedFields field explicitly indicates which fields should be modified. This approach has proven effective in practice.

go
// simple/domain/order/command/update.order.go
package command

import (
	"context"
	"fmt"

	"github.com/gradientzero/comby/v2"
)

type UpdateOrder struct {
	OrderUuid     string   `json:"orderUuid"`
	Comment       string   `json:"comment,omitempty"`
	AnyOtherField string   `json:"anyOtherField,omitempty"`
	PatchedFields []string `json:"patchedFields"`
}

func (ch *cmdHandler) UpdateOrder(ctx context.Context, cmd comby.Command, domainCmd *UpdateOrder) ([]comby.Event, error) {
	// validate uuid
	if err := comby.ValidateUuid(domainCmd.OrderUuid); err != nil {
		return nil, fmt.Errorf("%s failed - orderUuid is invalid", comby.GetTypeName(domainCmd))
	}

	// retrieve existing aggregate from store
	agg, _ := ch.AggregateRepository.GetAggregate(ctx, domainCmd.OrderUuid)
	if agg == nil {
		return nil, fmt.Errorf("%s failed - order does not exist", comby.GetTypeName(domainCmd))
	}

	// prepare execution
	comment := agg.Comment
	anyOtherField := agg.AnyOtherField

	if comby.StringInSlice("comment", domainCmd.PatchedFields) {
		comment = domainCmd.Comment
	}
	if comby.StringInSlice("anyOtherField", domainCmd.PatchedFields) {
		anyOtherField = domainCmd.AnyOtherField
	}

	// execute logic
	if err := agg.Update(comment, anyOtherField); err != nil {
		return nil, err
	}

	// return new events
	return agg.GetUncommittedEvents(), nil
}

The process involves loading the existing Order aggregate, retrieving its current values, and checking if any of these values need to be updated. If changes are required, the Update intention is invoked to apply the modifications. The generated events resulting from this update are then returned. Of course, the code could be written more elegantly, but the focus here is on demonstrating the principle.

Delete Order

The final important domain command is the deletion of an aggregate. Every aggregate inherits a Deleted field, so it is not necessary to define a custom field for this purpose in your Aggregate. The Deleted field ensures that the deleted aggregate is no longer loaded from the repositories by default. However, it can still be explicitly queried if deleted aggregates need to be retrieved.

go
// simple/domain/order/command/delete.order.go
package command

import (
	"context"
	"fmt"

	"github.com/gradientzero/comby/v2"
)

type DeleteOrder struct {
	OrderUuid string `json:"orderUuid"`
}

func (ch *cmdHandler) DeleteOrder(ctx context.Context, cmd comby.Command, domainCmd *DeleteOrder) ([]comby.Event, error) {
	// validate uuid
	if err := comby.ValidateUuid(domainCmd.OrderUuid); err != nil {
		return nil, fmt.Errorf("%s failed - orderUuid is invalid", comby.GetTypeName(domainCmd))
	}

	// retrieve existing aggregate from store
	agg, _ := ch.AggregateRepository.GetAggregate(ctx, domainCmd.OrderUuid)
	if agg == nil {
		return nil, fmt.Errorf("%s failed - order not found", comby.GetTypeName(domainCmd))
	}

	// execute intention
	if err := agg.Delete(); err != nil {
		return nil, err
	}

	// return new events
	return agg.GetUncommittedEvents(), nil
}

Deleting an aggregate is straightforward in terms of implementation. The aggregate is loaded, and the Delete intention is invoked. This intention sets the Deleted field to true and returns an OrderDeletedEvent. The event is then persisted in the Event Store, marking the deletion of the aggregate in the system.

Additional Domain Commands

All other domain command handlers follow a technically similar structure. They load the respective aggregate, execute the necessary intentions, and return the generated events.

go
// simple/domain/order/command/mark.order.cancelled.go
package command

import (
	"context"
	"fmt"

	"github.com/gradientzero/comby/v2"
)

type MarkOrderCancelled struct {
	OrderUuid string `json:"orderUuid"`
	Reason    string `json:"string,omitempty"`
}

func (ch *cmdHandler) MarkOrderCancelled(ctx context.Context, cmd comby.Command, domainCmd *MarkOrderCancelled) ([]comby.Event, error) {
	// validate uuid
	if err := comby.ValidateUuid(domainCmd.OrderUuid); err != nil {
		return nil, fmt.Errorf("%s failed - orderUuid is invalid", comby.GetTypeName(domainCmd))
	}

	// retrieve existing aggregate from store
	agg, _ := ch.AggregateRepository.GetAggregate(ctx, domainCmd.OrderUuid)
	if agg == nil {
		return nil, fmt.Errorf("%s failed - order does not exist", comby.GetTypeName(domainCmd))
	}
	// execute logic
	if err := agg.MarkCancelled(domainCmd.Reason); err != nil {
		return nil, err
	}

	// return new events
	return agg.GetUncommittedEvents(), nil
}
go
// simple/domain/order/command/mark.order.paid.go
package command

import (
	"context"
	"fmt"

	"github.com/gradientzero/comby/v2"
)

type MarkOrderPaid struct {
	OrderUuid   string `json:"orderUuid"`
	PaymentUuid string `json:"paymentUuid"`
}

func (ch *cmdHandler) MarkOrderPaid(ctx context.Context, cmd comby.Command, domainCmd *MarkOrderPaid) ([]comby.Event, error) {
	// validate uuid
	if err := comby.ValidateUuid(domainCmd.OrderUuid); err != nil {
		return nil, fmt.Errorf("%s failed - orderUuid is invalid", comby.GetTypeName(domainCmd))
	}
	if err := comby.ValidateUuid(domainCmd.PaymentUuid); err != nil {
		return nil, fmt.Errorf("%s failed - paymentUuid is invalid", comby.GetTypeName(domainCmd))
	}

	// retrieve existing aggregate from store
	agg, _ := ch.AggregateRepository.GetAggregate(ctx, domainCmd.OrderUuid)
	if agg == nil {
		return nil, fmt.Errorf("%s failed - order does not exist", comby.GetTypeName(domainCmd))
	}
	// execute logic
	if err := agg.MarkPaid(domainCmd.PaymentUuid); err != nil {
		return nil, err
	}

	// return new events
	return agg.GetUncommittedEvents(), nil
}
go
// simple/domain/order/command/mark.order.shipped.go
package command

import (
	"context"
	"fmt"

	"github.com/gradientzero/comby/v2"
)

type MarkOrderShipped struct {
	OrderUuid       string `json:"orderUuid"`
	ShippingAddress string `json:"shippingAddress"`
}

func (ch *cmdHandler) MarkOrderShipped(ctx context.Context, cmd comby.Command, domainCmd *MarkOrderShipped) ([]comby.Event, error) {
	// validate uuid
	if err := comby.ValidateUuid(domainCmd.OrderUuid); err != nil {
		return nil, fmt.Errorf("%s failed - orderUuid is invalid", comby.GetTypeName(domainCmd))
	}

	// retrieve existing aggregate from store
	agg, _ := ch.AggregateRepository.GetAggregate(ctx, domainCmd.OrderUuid)
	if agg == nil {
		return nil, fmt.Errorf("%s failed - order does not exist", comby.GetTypeName(domainCmd))
	}
	// execute logic
	if err := agg.MarkShipped(domainCmd.ShippingAddress); err != nil {
		return nil, err
	}

	// return new events
	return agg.GetUncommittedEvents(), nil
}
go
// simple/domain/order/command/mark.order.delivered.go
package command

import (
	"context"
	"fmt"

	"github.com/gradientzero/comby/v2"
)

type MarkOrderDelivered struct {
	OrderUuid string `json:"orderUuid"`
}

func (ch *cmdHandler) MarkOrderDelivered(ctx context.Context, cmd comby.Command, domainCmd *MarkOrderDelivered) ([]comby.Event, error) {
	// validate uuid
	if err := comby.ValidateUuid(domainCmd.OrderUuid); err != nil {
		return nil, fmt.Errorf("%s failed - orderUuid is invalid", comby.GetTypeName(domainCmd))
	}

	// retrieve existing aggregate from store
	agg, _ := ch.AggregateRepository.GetAggregate(ctx, domainCmd.OrderUuid)
	if agg == nil {
		return nil, fmt.Errorf("%s failed - order does not exist", comby.GetTypeName(domainCmd))
	}
	// execute logic
	if err := agg.MarkDelivered(); err != nil {
		return nil, err
	}

	// return new events
	return agg.GetUncommittedEvents(), nil
}

Register in Facade

Finally, we need to register our command handler in the facade. This is done by calling the RegisterCommandHandler method. This step ensures that the facade can route commands to the correct handler for processing. Again, we could also pass the facade to the command handler to access the necessary repositories and services there. This is just an example of how to register a command handler in the facade.

go
// simple/domain/order/register.go
func Register(ctx context.Context, fc *comby.Facade) error {
    // ...
    eventRepository := fc.GetEventRepository()
    aggregateRepository := comby.NewAggregateRepository(
        eventRepository,
        aggregate.NewAggregate,
    )
    ch := command.NewCommandHandler(aggregateRepository)
    if err := comby.RegisterCommandHandler(fc, ch); err != nil {
        return err
    }
    // ...
}