Skip to content

Part 8: Reactor

INFO

Further information about Reactors can be found in the documentation: Reactor

Reactor

Reactors serve as the missing link between commands and queries. Unlike user-triggered actions, reactors respond to events, much like regular event handlers. However, reactors can take additional actions such as triggering new commands, executing queries, or performing other operations that modify the application's state.

One of the most important practical use cases for reactors is automating tasks such as sending emails, executing predefined processes, or dispatching new commands when certain conditions from external domains are met (e.g. payment received, update order status to ready for shipment).

In our example, we implement something similar: we send an email to the customer when an order is placed and update the status of an order when it has been paid.

A reactor is primarily a Readmodel and, as such, embeds the BaseReadmodel structure. Domain Event handlers are registered similarly to those in a Readmodel; however, they are specifically designed to perform actions or generate new domain interactions rather than merely updating the internal state.

Additionally, we pass the facade to the reactor, enabling it to access the registered email provider and the EventRepository. Using these, we construct an AggregateRepository for managing our aggregates.

go
// simple/domain/order/reactor/reactor.go
package reactor

import (
	"context"
	"fmt"

	"comby.io/examples/simple/domain/order/aggregate"
	"comby.io/examples/simple/domain/order/command"
	"github.com/gradientzero/comby/v2"
	"github.com/gradientzero/comby/v2/domain/auth"
)

var logger = comby.Logger.With("order", "reactor")

type Reactor struct {
	*comby.BaseReadmodel
	fc                  *comby.Facade
	AggregateRepository *comby.AggregateRepository[*aggregate.Order]
}

func NewReactor(
	fc *comby.Facade,
) *Reactor {
	rm := &Reactor{}
	rm.fc = fc
	rm.BaseReadmodel = comby.NewBaseReadmodel(fc.GetEventRepository())
	rm.Domain = "Order"
	rm.Name = "SimpleOrderReactor"
	rm.AggregateRepository = comby.NewAggregateRepository(
		fc.GetEventRepository(),
		aggregate.NewAggregate,
	)

	// register domain event handlers
	comby.AddDomainEventHandler(rm, rm.SendEmailWhenOrderPlaced)
	comby.AddDomainEventHandler(rm, rm.MarkOrderPaidWhenPaymentReceived)

	return rm
}

func (rm *Reactor) RestoreState(ctx context.Context, restoreFromZero bool) (comby.RestorationDoneCh, error) {
	// do not restore state in any reactor
	return nil, nil
}

func (rm *Reactor) SendEmailWhenOrderPlaced(ctx context.Context, evt comby.Event, domainEvt *aggregate.OrderPlacedEvent) error {
	// retrieve existing aggregate from store
	order, _ := rm.AggregateRepository.GetAggregate(ctx, evt.GetAggregateUuid())
	if order == nil {
		return nil
	}

	// ensure facade has email provider set up
	emailTo := []string{"customer@domain.com"}
	subject := "Thanks for your Order"
	message := "Your order has been placed successfully."
	if rm.fc.GetEmail() != nil {
		if err := rm.fc.GetEmail().SendMail(emailTo, subject, message); err != nil {
			logger.Debug("Could not send email", "emailTo", emailTo)
			return err
		}
	}

	logger.Debug("Successfully sent email to customer", "email", emailTo)
	return nil
}

// Assume there is an other Aggregate "Payment" which is not shown in this example
type PaymentReceivedEvent struct {
	PaymentUuid string `json:"paymentUuid"`
	OrderUuid   string `json:"orderUuid"`
}

func (rm *Reactor) MarkOrderPaidWhenPaymentReceived(ctx context.Context, evt comby.Event, domainEvt *PaymentReceivedEvent) error {
	// retrieve existing aggregate from store
	order, _ := rm.AggregateRepository.GetAggregate(ctx, evt.GetAggregateUuid())
	if order == nil {
		return nil
	}

	// business logic: assume simple logic here
	if order.Status != aggregate.ORDER_STATUS_PLACED {
		return fmt.Errorf("order not placed yet")
	}

	// update order status
	cmd, _ := comby.NewCommand("Order", &command.MarkOrderPaid{
		OrderUuid:   evt.GetAggregateUuid(),
		PaymentUuid: domainEvt.PaymentUuid,
	})
	cmd.SetTenantUuid(order.GetTenantUuid())

	// initiator is this system, not a user
	reqCtx := comby.NewRequestContext()
	reqCtx.ExecuteWaitToFinish = true
	reqCtx.Attributes.Set(auth.ExecuteSkipAuthorization, true) // if using comby default
	cmd.SetReqCtx(reqCtx)

	// dispatch command to CommandBus or broker
	if _, err := rm.fc.DispatchCommand(ctx, cmd); err != nil {
		return err
	}

	// wait for command fully processed
	if err := rm.fc.WaitForCmd(ctx, cmd); err != nil {
		return err
	}

	logger.Debug("Successfully marked order as paid", "order", order.GetAggregateUuid())
	return nil
}

We register two methods: SendEmailWhenOrderPlaced and UpdateOrderStatusWhenOrderPaid. These methods react to the OrderPlacedEvent and PaymentReceivedEvent, respectively, and execute the corresponding actions. The latter event is included here artificially for demonstration purposes and would typically originate from another domain.

A key characteristic shared by all reactors is that they never restore their state. They only react to new events. This is crucial because, otherwise, during each restart or manual restoration of the reactor, all events would be processed again, potentially leading to repeated execution of actions such as sending emails or updating statuses. Thats the reason we override the RestoreState method and leave it empty.

The SendEmailWhenOrderPlaced method sends an email to the customer when an OrderPlacedEvent is received. It utilizes the email provider implementation registered in the facade and first verifies the existence (in production you would check the validity, too) of the order by checking it against the aggregate, which serves as the unprojected source of truth.

The UpdateOrderStatusWhenOrderPaid method generates a new command to update the status of an order when a - artificial - PaymentReceivedEvent is received. Before creating the command, it verifies that the order is in the correct state to proceed. The AggregateRepository is also used here to load the order from the source of truth.

That's actually it. The reactor is now ready to be used in the application.

Register in Facade

Finally, we need to register our custom reactor in the facade. This allows the reactor to receive domain events and process them accordingly. By registering our Reactor in the facade, we ensure that it is actively listening for events and can respond to them as needed.

go
// simple/domain/order/register.go
func Register(ctx context.Context, fc *comby.Facade) error {
    // ...
    rr := reactor.NewReactor(fc)
    if err := comby.RegisterEventHandler(fc, rr); err != nil {
        return err
    }
    // ...
}

Use Cases

In an Event Sourcing (ES) and CQRS architecture, reactors play a crucial role in enhancing the implementation of the Saga Pattern, offering significant advantages over traditional methods that rely on transactional databases or synchronous service calls. One of the key benefits is the asynchronous and decoupled communication enabled by reactors. By triggering each step through events, reactors ensure resilience and fault tolerance, as operations are not blocked by network failures or service downtime.

Consider an operation that typically involves two or more external services and only succeeds if all services execute successfully. In a traditional, synchronous architecture, a failure in one of the services would cause the entire operation to fail.

From an external perspective, a command initiates the process. The reactor responds to the corresponding event and begins its internal representation of the process. This means the reactor can break the operation into multiple steps, implementing each step as an independent event-handling process. If a step fails, the reactor can manage the error and notify the user or the system. This approach allows the user or system to determine how to proceed with the error.

Typically, the user is offered additional commands to address the issue, such as resending an email or manually updating the order status. This provides a robust and flexible way to handle failures, ensuring that processes can continue without compromising the system's overall integrity.