CommandStore
The CommandStore
interface in comby defines the contract for managing commands within a Command Query Responsibility Segregation (CQRS) architecture. It enables the storage, retrieval, modification, and deletion of commands in a scalable and efficient manner. Developers can implement this interface to integrate with various storage backends, ensuring flexibility and adaptability across different environments.
Interface
The CommandStore
interface provides methods for initialization, command management (creation, retrieval, modification, and deletion), metadata utilities (e.g., totals, store info), and maintenance (e.g., resetting and closing connections). This ensures flexible configuration, efficient command handling, and adaptability for various use cases.
type CommandStore interface {
// Init initializes the command store with the provided options.
Init(ctx context.Context, opts ...CommandStoreOption) error
// Create adds a new command to the store.
Create(ctx context.Context, opts ...CommandStoreCreateOption) error
// Get retrieves a command by its unique identifier.
Get(ctx context.Context, opts ...CommandStoreGetOption) (Command, error)
// List retrieves a list of commands based on the provided options.
List(ctx context.Context, opts ...CommandStoreListOption) ([]Command, int64, error)
// Update modifies an existing command in the store.
Update(ctx context.Context, opts ...CommandStoreUpdateOption) error
// Delete removes a command by its unique identifier.
Delete(ctx context.Context, opts ...CommandStoreDeleteOption) error
// Total returns the total number of commands in the store.
Total(ctx context.Context) int64
// Close closes the command store connection.
Close(ctx context.Context) error
// Options returns the configuration options of the command store.
Options() CommandStoreOptions
// String returns a string representation of the command store.
String() string
// Info provides detailed information about the command store.
Info(ctx context.Context) (*CommandStoreInfoModel, error)
// Reset clears all commands from the store.
Reset(ctx context.Context) error
}
Options and Methods
Init
The Init method initializes the command store with configuration options, such as read-only mode or logging level. This step ensures the store is properly configured for the specific use case.
Options | Description |
---|---|
CommandStoreOptionWithReadOnly(readOnly bool) | Specifies whether the store is read-only. |
CommandStoreOptionWithAttribute(key string, value any) | Sets custom attributes for the command store. |
CommandStoreOptionWithCryptoService(cryptoService *comby.CryptoService) | Sets the crypto service for encrypting and decrypting domain data in the command store. |
Create
The Create method enables the addition of new commands to the store.
Options | Description |
---|---|
CommandStoreCreateOptionWithCommand(cmd Command) | Specifies the command to be added. |
CommandStoreCreateOptionWithAttribute(key string, value any) | Sets custom attributes for the Create option. |
Get
Commands can be fetched individually using Get method.
Options | Description |
---|---|
CommandStoreGetOptionWithCommandUuid(commandUuid string) | Retrieves the specific command by its UUID. |
CommandStoreGetOptionWithAttribute(key string, value any) | Sets custom attributes for the Get option. |
List
Commands can be fetched in bulk using List, with support for filtering and pagination.
Options | Description |
---|---|
CommandStoreListOptionWithTenantUuid(tenantUuid string) | Filters commands by tenant UUID. |
CommandStoreListOptionWithDomain(domain string) | Filters commands by domain. |
CommandStoreListOptionWithSessionUuid(sessionUuid string) | Filters commands by session UUID. |
CommandStoreListOptionWithDataType(dataType string) | Filters commands by data type (=type as string of the domain command). |
CommandStoreListOptionBefore(unixTimestamp int64) | Filters commands created before the specified timestamp. |
CommandStoreListOptionAfter(unixTimestamp int64) | Filters commands created after the specified timestamp. |
CommandStoreListOptionOffset(offset int64) | Sets the offset for pagination. |
CommandStoreListOptionLimit(limit int64) | Sets the limit for pagination. |
CommandStoreListOptionOrderBy(orderBy string) | Sets the field to order the commands by. |
CommandStoreListOptionAscending(ascending bool) | Sets the sorting order (ascending or descending). |
CommandStoreListOptionWithAttribute(key string, value any) | Sets custom attributes for the List object. |
Update
The Update method allows updates to existing commands.
Options | Description |
---|---|
CommandStoreUpdateOptionWithCommand(cmd Command) | Specifies the command to be updated. |
CommandStoreUpdateOptionWithAttribute(key string, value any) | Sets custom attributes for the Update object. |
Delete
Commands can be removed using the Delete method.
Options | Description |
---|---|
CommandStoreDeleteOptionWithCommandUuid(commandUuid string) | Specifies the UUID of the command to delete. |
CommandStoreDeleteOptionWithAttribute(key string, value any) | Sets custom attributes for the Delete object. |
Total
Retrieves the total count of commands in the store.
Close
Gracefully closes the connection to the command store.
Options
Returns the configuration options of the command store.
String
Returns a string representation of the command store.
Info
Returns metadata about the store, such as the type of store, number of commands, and connection details. Details are returned in the following structure:
type CommandStoreInfoModel struct {
StoreType string `json:"storeType"` // Type of the command store.
LastItemCreatedAt int64 `json:"lastItemCreatedAt"` // Timestamp of the last created command.
NumItems int64 `json:"numItems"` // Number of commands in the store.
ConnectionInfo string `json:"connectionInfo"` // Connection details of the command store.
}
Reset
Clears all commands from the store, providing a clean slate for testing or initialization.
Store Synchronization
Comby provides built-in synchronization capabilities to copy commands between CommandStore instances. This is particularly useful for:
- Migration scenarios: Moving commands from one storage backend to another (e.g., in-memory to PostgreSQL)
- Audit and compliance: Creating immutable copies of command history for regulatory requirements
- Multi-region deployment: Synchronizing commands across different geographical locations
- Command replay and debugging: Copying command history to testing environments for issue reproduction
Synchronization Function
The SyncCommandStore
function copies all commands from a source CommandStore to a destination CommandStore:
func SyncCommandStore(ctx context.Context, source, destination CommandStore, opts ...CommandStoreSyncOption) error
Synchronization Options
Option | Description |
---|---|
CommandStoreSyncOptionWithBatchSize(batchSize int64) | Sets the batch size for pagination (default: 100). Useful for controlling memory usage with large datasets. |
CommandStoreSyncOptionWithTenantUuid(tenantUuid string) | Filter commands to synchronize by tenant UUID. |
CommandStoreSyncOptionWithProgressFunc(progressFunc func(copied, total int64)) | Callback function for tracking synchronization progress. |
CommandStoreSyncOptionWithSkipOnError(skip bool) | Continue synchronization even if individual commands fail to copy. |
CommandStoreSyncOptionWithResumeFromDest(resume bool) | Automatically resume from the destination's last command timestamp (default: true). |
CommandStoreSyncOptionWithStartAfterTime(timestamp int64) | Manually set starting timestamp for copying (overrides ResumeFromDest). |
CommandStoreSyncOptionWithDryRun(dryRun bool) | Preview synchronization without making actual changes. |
Usage Example
import postgresStore "github.com/gradientzero/comby-store-postgres"
import sqliteStore "github.com/gradientzero/comby-store-sqlite"
// Create source and destination command stores
sourceStore := sqliteStore.NewCommandStoreSQLite(...)
destStore := postgresStore.NewCommandStorePostgres(...)
// Initialize both stores (CommandStoreOptionWithCryptoService can be passed here if needed)
sourceStore.Init(...)
destStore.Init(...)
// Synchronize with batch processing and progress reporting
err := comby.SyncCommandStore(ctx, sourceStore, destStore,
comby.CommandStoreSyncOptionWithBatchSize(250),
comby.CommandStoreSyncOptionWithProgressFunc(func(copied, total int64) {
fmt.Printf("Progress: %d/%d commands synchronized\n", copied, total)
}),
comby.CommandStoreSyncOptionWithResumeFromDest(true),
comby.CommandStoreSyncOptionWithSkipOnError(false),
)
Important Notes
- Resumable Operations: By default, synchronization automatically resumes from the last command timestamp in the destination store. This makes it safe to retry failed synchronizations without duplicating commands.
- Batch Processing: Commands are processed in batches to efficiently handle large datasets while managing memory usage.
- Time-Ordered: Commands are synchronized in chronological order (by
created_at
timestamp), preserving the temporal sequence of the command stream. - CQRS Pattern: Command synchronization is essential for maintaining audit trails and command history in CQRS architectures.
Extensibility and Implementation
comby provides the following default implementations of the CommandStore
interface:
- In-Memory Store: Useful for testing and development, offering fast and transient command storage. Used by default if no specific store is configured.
- SQLite Store: A lightweight, file-based implementation suitable for small to medium-sized applications. The underlying used driver is modernc.org/sqlite which means the comby retains CGo-free. Link: https://github.com/gradientzero/comby-store-sqlite
- PostgreSQL Store: A robust, production-ready implementation leveraging PostgreSQL as the backend. The underlying used driver is lib/pq. Link: https://github.com/gradientzero/comby-store-postgres
Users can implement the CommandStore interface to integrate with alternative storage systems, such as:
- NoSQL databases (e.g., MongoDB, DynamoDB).
- Distributed log systems (e.g., Apache Kafka, Pulsar).
- Cloud-native solutions (e.g., AWS S3, Google Cloud Firestore).
To create a custom command store, implement all methods in the interface, adhering to the specified contracts for behavior and parameters.