Skip to content

Distributed Environment

Comby is designed to support distributed environments using Event Sourcing (ES) and Command Query Responsibility Segregation (CQRS) principles. These patterns are ideal for scalable, event-driven architectures where multiple applications or services need to communicate or synchronize state changes.

To facilitate communication in distributed setups, comby leverages the Broker interface. The Broker acts as the central component for publishing and subscribing to events across different applications. Events generated within comby are propagated through the Broker, enabling other services to react to changes in real time.

Commands in Comby can be dispatched to the Broker, enabling a flexible architecture where arbitrary instances of the application do not necessarily process commands directly. This approach is particularly useful in distributed systems where a single write instance, or a designated set of instances, is responsible for processing all commands or specific subsets of commands. By offloading command handling to a central or controlled processing instance, you ensure consistency and maintain a single source of truth for state-changing operations.

To illustrate the architecture in a distributed system, let’s consider an example with three applications: A, B, and C.

  • Application A is a comby-based application that processes only queries directly. It forwards commands to the write instance B via the broker.
  • Application B is a comby-based application capable of processing both queries and commands.
  • Application C is an external application that listens for new events via the broker and reacts with its own logic. This scenario can be easily extended to include additional instances or external applications, all communicating through the broker and consuming events as needed.

Broker

The Broker in comby serves as a messaging hub that integrates CommandBus, EventBus and hook management for efficient communication and coordination in distributed systems. It abstracts the complexity of message passing, ensuring seamless interaction between components while supporting extensibility and customization through its options.

A Hook like a callback for commands managed through the facade. Hooks can be registered for specific commands (specified by ExecuteWaitToFinish) and are invoked during the command execution process (used by WaitForCmd). This concept also works across applications using a Broker, which supports Hooks via BrokerHookBus as well. This enables seamless integration of hooks for commands in distributed environments regardless of whether the command is handled locally or by another application via the broker.

INFO

Comby includes a concrete implementation of the Broker interface using NATS, a high-performance messaging system. The NATS broker is capable of publishing events to different applications within the same ecosystem and allows external applications to subscribe to events. Link: https://github.com/gradientzero/comby-broker-nats

Key Responsibilities

The Broker manages three core messaging functionalities:

  • Commands: Facilitates the distribution of commands throughout the system and thus enables direct execution of commands in the corresponding write instance application.
  • Events: Handles the publication and subscription of events, promoting loose coupling between producers and consumers in an event-driven architecture.
  • BrokerHooks: Manages broker hooks, which are callbacks between application instances.

Configuration and Initialization

The Broker is configurable via the BrokerOptions struct, which includes settings such as the application name, read-only mode, and the specific CommandBus and EventBus to use.

The facade typically configures all necessary options for the broker. For instance, if the facade operates in read-only mode, the broker is configured accordingly. Additionally, the facade passes its buses to the broker. That said, users should at least consider customizing the application name to avoid conflicts with other comby-based applications using the same broker instance.

However, using BrokerOption functions following options are available:

  • BrokerOptionWithAppName to set the unique application name. The application name is also used to define the names of the send and receive channels used inside the broker. A broker could also be used by other applications.
  • BrokerOptionWithReadOnly by default should be in sync with Facade's read-only option. If set to true, the broker cannot publish events or subscribe to the command channel. The broker can only publish new commands and receive new events. Publishing new commands means that the broker can forward commands to the write instance.
  • BrokerOptionWithCommandBus and BrokerOptionWithEventBus by default, Facade's CommandBus and EventBus should be passed so that the broker can pass on commands and events to the facade accordingly. Other buses can also be used optionally and as required.

The Init method initializes the broker with the provided options, ensuring that all components are configured correctly before the broker is used. This is also where the channels are defined, the actual connection to the broker (e.g. NATS) is established and the subscribe goroutines are started.

Integration with the Facade

The broker is passed to the facade as an option and must be initialized beforehand. The facade uses the broker to publish events and send commands. Additionally, the facade leverages the broker to manage hooks.

This means that when a user sends a command through the broker to another instance, the user can still wait for the command's result via the broker, even though the command is executed on a different instance. By integrating the broker into the facade, seamless communication is enabled between various applications and services utilizing the comby framework, ensuring efficient and consistent interactions in distributed environments.

The Broker plays a crucial role in the comby architecture by serving as the communication layer for the Facade. It allows the Facade to interact with the command and event buses, enabling it to dispatch commands and publish events seamlessly. The addition of hooks further extends the broker's capabilities, allowing for dynamic extensions and callbacks based on specific system actions.

go
import (
    "github.com/gradientzero/comby/v2"
    "github.com/gradientzero/comby-broker-nats"
)
...
// create new concrete broker (here NATS)
concreteBroker := broker.NewBrokerNats("nats://127.0.0.1:4222")

// create facade - broker options are passed automatically by facade
fc, err := comby.NewFacade(
    comby.FacadeWithBroker(concreteBroker),
)
...

All broker options are automatically passed from the facade to the broker in the NewFacade function, so the user does not need to specify these options explicitly. Specifically, these include the application name, read-only mode, CommandBus, and EventBus.

Extensibility and Use Cases

The Broker's design allows for integration with different messaging systems, including in-memory implementations for development or testing and distributed messaging systems for production. Its flexible configuration makes it suitable for a wide range of use cases, such as:

  • Orchestrating workflows through command dispatching.
  • Enabling real-time event-driven systems with pub-sub capabilities.
  • Managing hooks for dynamic extensions or custom behaviors triggered by commands.

With its robust and modular design, the Broker ensures reliable and efficient communication between system components, forming the backbone of the comby messaging infrastructure.

Sample Code

If we start with a simple example, we have two applications: one write application and one read application. The write application processes commands, updates its own read models, and simultaneously sends the new events to the broker. The read application listens to these events and updates its own read models accordingly. The broker (in this case, NATS) acts as the central component to distribute the events.

Note: In a real-world scenario, both applications would share access to an EventStore, where both would have at least read access to perform the RestoreState operation. This operation reconstructs the application's state by replaying past events. However, in this example, both applications are in-memory to reduce complexity.

Read-only App

The read-only application does not process commands itself; instead, it sends them to the broker instance. The broker instance forwards the commands to the write instance. The write instance processes the commands and publishes the resulting events through the broker instance. The broker instance then distributes these events to the read instance.

To simplify this example and avoid authentication concerns, we configure the command and query attributes to inform comby's default authentication middleware that authentication is not required. Using the ExecuteWaitToFinish attribute of the command, we register a hook—also via the broker—that is triggered once the command is fully executed. This hook is automatically released after the command completes (or timeouts). Until the command execution is complete, we use WaitForCmd to wait for the result.

go
// broker-app-ro/main.go
package main

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"github.com/danielgtaylor/huma/v2"
	"github.com/danielgtaylor/huma/v2/adapters/humago"
	broker "github.com/gradientzero/comby-broker-nats"
	"github.com/gradientzero/comby/v2"
	combyApi "github.com/gradientzero/comby/v2/api"
	combyDomain "github.com/gradientzero/comby/v2/domain"
	"github.com/gradientzero/comby/v2/domain/auth"
	tenantCommand "github.com/gradientzero/comby/v2/domain/tenant/command"
	tenantQuery "github.com/gradientzero/comby/v2/domain/tenant/query"
)

// Before executing this code, ensure to run nats-server:
// nats-server -p 4222

func main() {
	// create new concrete broker
	brokerNats := broker.NewBrokerNats("nats://127.0.0.1:4222")

	// create facade - broker options are passed automatically by facade
	fc, err := comby.NewFacade(
		comby.FacadeWithIsReadOnly(true),
		comby.FacadeWithBroker(brokerNats),
	)
	if err != nil {
		panic(err)
	}

	// register comby domains.
	if err := combyDomain.RegisterDefaults(context.Background(), fc); err != nil {
		panic(err)
	}

	// restore state
	if err := fc.RestoreState(); err != nil {
		panic(err)
	}

	// create new mux
	mux := http.NewServeMux()

	// create huma api
	humaApi := humago.New(mux, huma.DefaultConfig("My API", "1.0.0"))

	// add comby's default api
	if err := combyApi.RegisterDefaults(fc, humaApi); err != nil {
		panic(err)
	}

	ctx := context.Background()
	go func(ctx context.Context) {
		time.Sleep(3 * time.Second)

		// create command
		tenantUuid := comby.NewUuid()
		cmd, _ := comby.NewCommand("Tenant", &tenantCommand.TenantCommandCreate{
			TenantUuid: tenantUuid,
			Name:       fmt.Sprintf("T-%s", tenantUuid),
		})
		cmd.GetReqCtx().ExecuteWaitToFinish = true
		cmd.GetReqCtx().Attributes.Set(auth.ExecuteSkipAuthorization, true)

		// dispatch command to CommandBus or broker
		if _, err := fc.DispatchCommand(ctx, cmd); err != nil {
			panic(err)
		}

		// wait for command fully processed
		if err := fc.WaitForCmd(ctx, cmd); err != nil {
			panic(err)
		}

		// create query
		qry, _ := comby.NewQuery("Tenant", &tenantQuery.TenantQueryModel{
			TenantUuid: tenantUuid,
		})
		qry.GetReqCtx().Attributes.Set(auth.ExecuteSkipAuthorization, true)

		// dispatch query on this instance
		if res, err := fc.DispatchQuery(ctx, qry); err != nil {
			panic(err)
		} else {
			// ok
			comby.Logger.Info("success: cmd sent, broker event received, my projections updated, tenant info received!", "res", res)
		}
	}(ctx)

	fmt.Println(fc.PrintInfo())
	http.ListenAndServe("127.0.0.1:8081", mux)
}

Read/Write App

In contrast to the read-only application, the facade in the write application is not set to read-only. This allows the application to process commands and publish events. In a read/write instance no broker hooks are used because (instance can directly execute the commands).

go
// broker-app-rw/main.go
package main

import (
	"context"
	"fmt"
	"net/http"

	"github.com/danielgtaylor/huma/v2"
	"github.com/danielgtaylor/huma/v2/adapters/humago"
	broker "github.com/gradientzero/comby-broker-nats"
	"github.com/gradientzero/comby/v2"
	combyApi "github.com/gradientzero/comby/v2/api"
	combyDomain "github.com/gradientzero/comby/v2/domain"
)

// Before executing this code, ensure to run nats-server:
// nats-server -p 4222

func main() {
	// create new concrete broker
	brokerNats := broker.NewBrokerNats("nats://127.0.0.1:4222")

	// create facade - broker options are passed automatically by facade
	fc, err := comby.NewFacade(
		comby.FacadeWithBroker(brokerNats),
	)
	if err != nil {
		panic(err)
	}

	// register comby domains.
	// Note: This will automatically create new Tenant (as we are using in-memory)
	if err := combyDomain.RegisterDefaults(context.Background(), fc); err != nil {
		panic(err)
	}

	// restore state
	if err := fc.RestoreState(); err != nil {
		panic(err)
	}

	// create new mux
	mux := http.NewServeMux()

	// create huma api
	humaApi := humago.New(mux, huma.DefaultConfig("My API", "1.0.0"))

	// add comby's default api
	if err := combyApi.RegisterDefaults(fc, humaApi); err != nil {
		panic(err)
	}

	fmt.Println(fc.PrintInfo())
	http.ListenAndServe("127.0.0.1:8082", mux)
}

Note: If the read-only application is started first, followed by the write application, the events generated by the write instance from the seed will already appear and be distributed to the read-only application.