Skip to content

CommandStore

The CommandStore interface in comby defines the contract for managing commands within a Command Query Responsibility Segregation (CQRS) architecture. It enables the storage, retrieval, modification, and deletion of commands in a scalable and efficient manner. Developers can implement this interface to integrate with various storage backends, ensuring flexibility and adaptability across different environments.

Interface

The CommandStore interface provides methods for initialization, command management (creation, retrieval, modification, and deletion), metadata utilities (e.g., totals, store info), and maintenance (e.g., resetting and closing connections). This ensures flexible configuration, efficient command handling, and adaptability for various use cases.

go
type CommandStore interface {
	// Init initializes the command store with the provided options.
	Init(ctx context.Context, opts ...CommandStoreOption) error

	// Create adds a new command to the store.
	Create(ctx context.Context, opts ...CommandStoreCreateOption) error

	// Get retrieves a command by its unique identifier.
	Get(ctx context.Context, opts ...CommandStoreGetOption) (Command, error)

	// List retrieves a list of commands based on the provided options.
	List(ctx context.Context, opts ...CommandStoreListOption) ([]Command, int64, error)

	// Update modifies an existing command in the store.
	Update(ctx context.Context, opts ...CommandStoreUpdateOption) error

	// Delete removes a command by its unique identifier.
	Delete(ctx context.Context, opts ...CommandStoreDeleteOption) error

	// Total returns the total number of commands in the store.
	Total(ctx context.Context) int64

	// Close closes the command store connection.
	Close(ctx context.Context) error

	// Options returns the configuration options of the command store.
	Options() CommandStoreOptions

	// String returns a string representation of the command store.
	String() string

	// Info provides detailed information about the command store.
	Info(ctx context.Context) (*CommandStoreInfoModel, error)

	// Reset clears all commands from the store.
	Reset(ctx context.Context) error
}

Options and Methods

Init

The Init method initializes the command store with configuration options, such as read-only mode or logging level. This step ensures the store is properly configured for the specific use case.

OptionsDescription
CommandStoreOptionWithReadOnly(readOnly bool)Specifies whether the store is read-only.
CommandStoreOptionWithAttribute(key string, value any)Sets custom attributes for the command store.
CommandStoreOptionWithCryptoService(cryptoService *comby.CryptoService)Sets the crypto service for encrypting and decrypting domain data in the command store.

Create

The Create method enables the addition of new commands to the store.

OptionsDescription
CommandStoreCreateOptionWithCommand(cmd Command)Specifies the command to be added.
CommandStoreCreateOptionWithAttribute(key string, value any)Sets custom attributes for the Create option.

Get

Commands can be fetched individually using Get method.

OptionsDescription
CommandStoreGetOptionWithCommandUuid(commandUuid string)Retrieves the specific command by its UUID.
CommandStoreGetOptionWithAttribute(key string, value any)Sets custom attributes for the Get option.

List

Commands can be fetched in bulk using List, with support for filtering and pagination.

OptionsDescription
CommandStoreListOptionWithTenantUuid(tenantUuid string)Filters commands by tenant UUID.
CommandStoreListOptionWithDomain(domain string)Filters commands by domain.
CommandStoreListOptionWithSessionUuid(sessionUuid string)Filters commands by session UUID.
CommandStoreListOptionWithDataType(dataType string)Filters commands by data type (=type as string of the domain command).
CommandStoreListOptionBefore(unixTimestamp int64)Filters commands created before the specified timestamp.
CommandStoreListOptionAfter(unixTimestamp int64)Filters commands created after the specified timestamp.
CommandStoreListOptionOffset(offset int64)Sets the offset for pagination.
CommandStoreListOptionLimit(limit int64)Sets the limit for pagination.
CommandStoreListOptionOrderBy(orderBy string)Sets the field to order the commands by.
CommandStoreListOptionAscending(ascending bool)Sets the sorting order (ascending or descending).
CommandStoreListOptionWithAttribute(key string, value any)Sets custom attributes for the List object.

Update

The Update method allows updates to existing commands.

OptionsDescription
CommandStoreUpdateOptionWithCommand(cmd Command)Specifies the command to be updated.
CommandStoreUpdateOptionWithAttribute(key string, value any)Sets custom attributes for the Update object.

Delete

Commands can be removed using the Delete method.

OptionsDescription
CommandStoreDeleteOptionWithCommandUuid(commandUuid string)Specifies the UUID of the command to delete.
CommandStoreDeleteOptionWithAttribute(key string, value any)Sets custom attributes for the Delete object.

Total

Retrieves the total count of commands in the store.

Close

Gracefully closes the connection to the command store.

Options

Returns the configuration options of the command store.

String

Returns a string representation of the command store.

Info

Returns metadata about the store, such as the type of store, number of commands, and connection details. Details are returned in the following structure:

go
type CommandStoreInfoModel struct {
	StoreType         string `json:"storeType"`         // Type of the command store.
	LastItemCreatedAt int64  `json:"lastItemCreatedAt"` // Timestamp of the last created command.
	NumItems          int64  `json:"numItems"`          // Number of commands in the store.
	ConnectionInfo    string `json:"connectionInfo"`    // Connection details of the command store.
}

Reset

Clears all commands from the store, providing a clean slate for testing or initialization.

Store Synchronization

Comby provides built-in synchronization capabilities to copy commands between CommandStore instances. This is particularly useful for:

  • Migration scenarios: Moving commands from one storage backend to another (e.g., in-memory to PostgreSQL)
  • Audit and compliance: Creating immutable copies of command history for regulatory requirements
  • Multi-region deployment: Synchronizing commands across different geographical locations
  • Command replay and debugging: Copying command history to testing environments for issue reproduction

Synchronization Function

The SyncCommandStore function copies all commands from a source CommandStore to a destination CommandStore:

go
func SyncCommandStore(ctx context.Context, source, destination CommandStore, opts ...CommandStoreSyncOption) error

Synchronization Options

OptionDescription
CommandStoreSyncOptionWithBatchSize(batchSize int64)Sets the batch size for pagination (default: 100). Useful for controlling memory usage with large datasets.
CommandStoreSyncOptionWithTenantUuid(tenantUuid string)Filter commands to synchronize by tenant UUID.
CommandStoreSyncOptionWithProgressFunc(progressFunc func(copied, total int64))Callback function for tracking synchronization progress.
CommandStoreSyncOptionWithSkipOnError(skip bool)Continue synchronization even if individual commands fail to copy.
CommandStoreSyncOptionWithResumeFromDest(resume bool)Automatically resume from the destination's last command timestamp (default: true).
CommandStoreSyncOptionWithStartAfterTime(timestamp int64)Manually set starting timestamp for copying (overrides ResumeFromDest).
CommandStoreSyncOptionWithDryRun(dryRun bool)Preview synchronization without making actual changes.

Usage Example

go
import postgresStore "github.com/gradientzero/comby-store-postgres"
import sqliteStore "github.com/gradientzero/comby-store-sqlite"

// Create source and destination command stores
sourceStore := sqliteStore.NewCommandStoreSQLite(...)
destStore := postgresStore.NewCommandStorePostgres(...)

// Initialize both stores (CommandStoreOptionWithCryptoService can be passed here if needed)
sourceStore.Init(...)
destStore.Init(...)

// Synchronize with batch processing and progress reporting
err := comby.SyncCommandStore(ctx, sourceStore, destStore,
    comby.CommandStoreSyncOptionWithBatchSize(250),
    comby.CommandStoreSyncOptionWithProgressFunc(func(copied, total int64) {
        fmt.Printf("Progress: %d/%d commands synchronized\n", copied, total)
    }),
    comby.CommandStoreSyncOptionWithResumeFromDest(true),
    comby.CommandStoreSyncOptionWithSkipOnError(false),
)

Important Notes

  • Resumable Operations: By default, synchronization automatically resumes from the last command timestamp in the destination store. This makes it safe to retry failed synchronizations without duplicating commands.
  • Batch Processing: Commands are processed in batches to efficiently handle large datasets while managing memory usage.
  • Time-Ordered: Commands are synchronized in chronological order (by created_at timestamp), preserving the temporal sequence of the command stream.
  • CQRS Pattern: Command synchronization is essential for maintaining audit trails and command history in CQRS architectures.

Extensibility and Implementation

comby provides the following default implementations of the CommandStore interface:

Users can implement the CommandStore interface to integrate with alternative storage systems, such as:

  • NoSQL databases (e.g., MongoDB, DynamoDB).
  • Distributed log systems (e.g., Apache Kafka, Pulsar).
  • Cloud-native solutions (e.g., AWS S3, Google Cloud Firestore).

To create a custom command store, implement all methods in the interface, adhering to the specified contracts for behavior and parameters.