Part 8: Reactor
INFO
Further information about Reactors can be found in the documentation: Reactor
Reactor
Reactors serve as the missing link between commands and queries. Unlike user-triggered actions, reactors respond to events, much like regular event handlers. However, reactors can take additional actions such as triggering new commands, executing queries, or performing other operations that modify the application's state.
One of the most important practical use cases for reactors is automating tasks such as sending emails, executing predefined processes, or dispatching new commands when certain conditions from external domains are met (e.g. payment received, update order status to ready for shipment).
In our example, we implement something similar: we send an email to the customer when an order is placed and update the status of an order when it has been paid.
A reactor is primarily a Readmodel and, as such, embeds the BaseReadmodel
structure. Domain Event handlers are registered similarly to those in a Readmodel; however, they are specifically designed to perform actions or generate new domain interactions rather than merely updating the internal state.
Additionally, we pass the facade to the reactor, enabling it to access the registered email provider and the EventRepository. Using these, we construct an AggregateRepository for managing our aggregates.
// simple/domain/order/reactor/reactor.go
package reactor
import (
"context"
"fmt"
"comby.io/examples/simple/domain/order/aggregate"
"comby.io/examples/simple/domain/order/command"
"github.com/gradientzero/comby/v2"
"github.com/gradientzero/comby/v2/domain/auth"
)
var logger = comby.Logger.With("order", "reactor")
type Reactor struct {
*comby.BaseReadmodel
fc *comby.Facade
AggregateRepository *comby.AggregateRepository[*aggregate.Order]
}
func NewReactor(
fc *comby.Facade,
) *Reactor {
rm := &Reactor{}
rm.fc = fc
rm.BaseReadmodel = comby.NewBaseReadmodel(fc.GetEventRepository())
rm.Domain = "Order"
rm.Name = "SimpleOrderReactor"
rm.AggregateRepository = comby.NewAggregateRepository(
fc.GetEventRepository(),
aggregate.NewAggregate,
)
// register domain event handlers
comby.AddDomainEventHandler(rm, rm.SendEmailWhenOrderPlaced)
comby.AddDomainEventHandler(rm, rm.MarkOrderPaidWhenPaymentReceived)
return rm
}
func (rm *Reactor) RestoreState(ctx context.Context, restoreFromZero bool) (comby.RestorationDoneCh, error) {
// do not restore state in any reactor
return nil, nil
}
func (rm *Reactor) SendEmailWhenOrderPlaced(ctx context.Context, evt comby.Event, domainEvt *aggregate.OrderPlacedEvent) error {
// retrieve existing aggregate from store
order, _ := rm.AggregateRepository.GetAggregate(ctx, evt.GetAggregateUuid())
if order == nil {
return nil
}
// ensure facade has email provider set up
emailTo := []string{"customer@domain.com"}
subject := "Thanks for your Order"
message := "Your order has been placed successfully."
if rm.fc.GetEmail() != nil {
if err := rm.fc.GetEmail().SendMail(emailTo, subject, message); err != nil {
logger.Debug("Could not send email", "emailTo", emailTo)
return err
}
}
logger.Debug("Successfully sent email to customer", "email", emailTo)
return nil
}
// Assume there is an other Aggregate "Payment" which is not shown in this example
type PaymentReceivedEvent struct {
PaymentUuid string `json:"paymentUuid"`
OrderUuid string `json:"orderUuid"`
}
func (rm *Reactor) MarkOrderPaidWhenPaymentReceived(ctx context.Context, evt comby.Event, domainEvt *PaymentReceivedEvent) error {
// retrieve existing aggregate from store
order, _ := rm.AggregateRepository.GetAggregate(ctx, evt.GetAggregateUuid())
if order == nil {
return nil
}
// business logic: assume simple logic here
if order.Status != aggregate.ORDER_STATUS_PLACED {
return fmt.Errorf("order not placed yet")
}
// update order status
cmd, _ := comby.NewCommand("Order", &command.MarkOrderPaid{
OrderUuid: evt.GetAggregateUuid(),
PaymentUuid: domainEvt.PaymentUuid,
})
cmd.SetTenantUuid(order.GetTenantUuid())
// initiator is this system, not a user
reqCtx := comby.NewRequestContext()
reqCtx.ExecuteWaitToFinish = true
reqCtx.Attributes.Set(auth.ExecuteSkipAuthorization, true) // if using comby default
cmd.SetReqCtx(reqCtx)
// dispatch command to CommandBus or broker
if _, err := rm.fc.DispatchCommand(ctx, cmd); err != nil {
return err
}
// wait for command fully processed
if err := rm.fc.WaitForCmd(ctx, cmd); err != nil {
return err
}
logger.Debug("Successfully marked order as paid", "order", order.GetAggregateUuid())
return nil
}
We register two methods: SendEmailWhenOrderPlaced and UpdateOrderStatusWhenOrderPaid. These methods react to the OrderPlacedEvent
and PaymentReceivedEvent
, respectively, and execute the corresponding actions. The latter event is included here artificially for demonstration purposes and would typically originate from another domain.
A key characteristic shared by all reactors is that they never restore their state. They only react to new events. This is crucial because, otherwise, during each restart or manual restoration of the reactor, all events would be processed again, potentially leading to repeated execution of actions such as sending emails or updating statuses. Thats the reason we override the RestoreState method and leave it empty.
The SendEmailWhenOrderPlaced method sends an email to the customer when an OrderPlacedEvent
is received. It utilizes the email provider implementation registered in the facade and first verifies the existence (in production you would check the validity, too) of the order by checking it against the aggregate, which serves as the unprojected source of truth.
The UpdateOrderStatusWhenOrderPaid method generates a new command to update the status of an order when a - artificial - PaymentReceivedEvent
is received. Before creating the command, it verifies that the order is in the correct state to proceed. The AggregateRepository is also used here to load the order from the source of truth.
That's actually it. The reactor is now ready to be used in the application.
Register in Facade
Finally, we need to register our custom reactor in the facade. This allows the reactor to receive domain events and process them accordingly. By registering our Reactor
in the facade, we ensure that it is actively listening for events and can respond to them as needed.
// simple/domain/order/register.go
func Register(ctx context.Context, fc *comby.Facade) error {
// ...
rr := reactor.NewReactor(fc)
if err := comby.RegisterEventHandler(fc, rr); err != nil {
return err
}
// ...
}
Use Cases
In an Event Sourcing (ES) and CQRS architecture, reactors play a crucial role in enhancing the implementation of the Saga Pattern, offering significant advantages over traditional methods that rely on transactional databases or synchronous service calls. One of the key benefits is the asynchronous and decoupled communication enabled by reactors. By triggering each step through events, reactors ensure resilience and fault tolerance, as operations are not blocked by network failures or service downtime.
Consider an operation that typically involves two or more external services and only succeeds if all services execute successfully. In a traditional, synchronous architecture, a failure in one of the services would cause the entire operation to fail.
From an external perspective, a command initiates the process. The reactor responds to the corresponding event and begins its internal representation of the process. This means the reactor can break the operation into multiple steps, implementing each step as an independent event-handling process. If a step fails, the reactor can manage the error and notify the user or the system. This approach allows the user or system to determine how to proceed with the error.
Typically, the user is offered additional commands to address the issue, such as resending an email or manually updating the order status. This provides a robust and flexible way to handle failures, ensuring that processes can continue without compromising the system's overall integrity.