EventStore for Event Sourcing
⌛ timestamp
📊 data
EventStore for Event Sourcing
Kacper Walczak · 20-08-2024
Dive into Event Sourcing internals, learn how to build Event Store.
Introduction
Event Store is required with Event Sourcing approach. It means to store each events with aggregateID wich will allow us to fetch all Events in a single batch.
AggregateID can be ID of the Aggregate or just an Entity ID to retrieve history of this aggregate/entity.
Store elements
Store is a simple concept, it's just an abstraction over 1 collection/table with Events, mostly append-only.
Interface Event
interface IEvent<T extends Record<string, unknown>> {
aggregateID: string;
type: string;
data: T;
timestamp: Date;
}Interface EventStore
interface EventHandler<T extends Record<string, unknown>> {
(event: IEvent<T>): void;
}
interface IEventStore<T extends Record<string, unknown>> {
dispatch(event: IEvent<T>): Promise<void>;
getEventsForAggregate(aggregateID: string): Promise<IEvent<T>[]>;
subscribe(eventType: string, handler: EventHandler<T>): void;
unsubscribe(eventType: string, handler: EventHandler<T>): void;
}MongoDB Timeseries EventStore
import { MongoClient, Collection } from 'mongodb';
class MongoTimeseriesEventStore<T extends Record<string, unknown>> implements IEventStore<T> {
private client: MongoClient;
private collection: Collection<IEvent<T>>;
private eventHandlers: Map<string, Set<EventHandler<T>>>;
constructor(mongoUri: string, dbName: string, collectionName: string) {
this.client = new MongoClient(mongoUri);
this.collection = this.client.db(dbName).collection<IEvent<T>>(collectionName);
this.eventHandlers = new Map();
}
public async connect(): Promise<void> {
await this.client.connect();
// Configure collection for timeseries if necessary
}
public async dispatch(event: IEvent<T>): Promise<void> {
await this.collection.insertOne(event);
const handlers = this.eventHandlers.get(event.type);
if (handlers) {
handlers.forEach(handler => handler(event));
}
}
public async getEventsForAggregate(aggregateID: string): Promise<IEvent<T>[]> {
return this.collection
.find({ aggregateID })
.sort({ timestamp: 1 })
.toArray();
}
public subscribe(eventType: string, handler: EventHandler<T>): void {
if (!this.eventHandlers.has(eventType)) {
this.eventHandlers.set(eventType, new Set());
}
this.eventHandlers.get(eventType)?.add(handler);
}
public unsubscribe(eventType: string, handler: EventHandler<T>): void {
this.eventHandlers.get(eventType)?.delete(handler);
}
}Usage
interface UserCreatedEvent {
userId: string;
userName: string;
}
const eventStore = new MongoTimeseriesEventStore<UserCreatedEvent>(
'mongodb://localhost:27017',
'myDatabase',
'eventsCollection'
);
async function main() {
await eventStore.connect();
const event: IEvent<UserCreatedEvent> = {
aggregateID: 'user-123',
type: 'UserCreated',
data: {
userId: 'user-123',
userName: 'Alice Bob'
},
timestamp: new Date()
};
await eventStore.dispatch(event);
const events = await eventStore.getEventsForAggregate('user-123');
console.log(events);
eventStore.subscribe('UserCreated', (event) => {
console.log('UserCreated event received:', event);
});
}
main();What should I use in production?
This is just a simplest approach to showcase what is an EventStore for you. If you would like to use something prod-ready go for this DB eventstore.com (opens in a new tab). For your .NET app you can use Marten DB (opens in a new tab).
Next
In this article we have learned how to and when to use Event Store.
Check
Web Architecturesto learn more about different architectures.
Check next:
READ
Latest readings
Readings are sites which will help you with detailed
information about given topic. Read latest ones from Learn.
06-03-2026
Build your own local voice assistant powered by Ollama.
06-03-2026
Generate YouTube thumbnails with FastAPI and Ollama.
05-09-2024
Compare Neo4j and Tigergraph databases, which is easier to work with, etc.