Einleitung

Dieser Blogbeitrag ist der dritte in einer Serie über die Anatomie von Command Query Responsibility Segregation (CQRS) und Event Sourcing in Java. Im ersten Blogbeitrag habe ich bereits die Anatomie von CQRS-Architekturen anhand einer konkreten Implementierung in Java erläutert. Dabei hatte ich das Pattern Event Sourcing weitestgehend ausgeklammert. Ziel war es, zu zeigen, dass CQRS auch ohne Event Sourcing auskommt. Im zweiten Beitrag widmete ich mich dem Event Sourcing, klammerte dafür aber CQRS explizit aus. Das Ziel sollte klar sein – Event Sourcing bedarf keines CQRS.
Wie beide Pattern zusammenspielen und warum man denken könnte, dass sie anatomisch zusammen gehören, ist Teil dieses dritten Teils. Analog zu den ersten beiden Beiträgen erläutere ich nachfolgend wieder Schritt für Schritt anhand einer Implementierung, wie CQRS und Event Sourcing zusammenspielen.

Events als Rückrad des Zusammenspiels

CQRS trennt Kommandos (Commands) und Abfragen (Queries) voneinander. Bei der Implementierung im ersten Beitrag habe ich diese Trennung durch Ereignisse (Events) umgesetzt. Jedes Kommando hat ein oder mehrere Ereignisse ausgelöst und über einen Event Bus veröffentlicht. Ein Ereignishandler hat diese Ereignisse verarbeitet und ein Lesemodell bzw. eine Eager Read Derivation aktualisiert.

Auch Event Sourcing greift natürlich auf das Konzept von Ereignissen zurück. Bei der Implementierung im zweiten Beitrag wurden diese in einem Ereignisstrom pro Aggregat gespeichert. Allerdings wurden sie nicht auf einen Event Bus veröffentlicht. Was liegt nun näher, als die Event Sourcing Lösung um einen Event Bus zu erweitern?

Kommandos als Auslöser von Ereignissen

Im zweiten Beitrag hatte ich folgendes Beispiel aufgezeigt, um das Event Sourcing zu verdeutlichen.

IStoreEventStreams eventStreamStore = new InMemoryEventStreamStore();
Orders orders = new EventSourcingOrderRepository(eventStreamStore);

UUID orderId = UUID.randomUUID();
Order order = new Order(orderId, "Jon Doe");
order.addItem("4260107220015", 2);
order.addItem("4260107222972", 0);
orders.save(order);

System.out.println("Stored events:");
eventStreamStore.get(orderId).getEvents().forEach(event ->
    System.out.println(" - " + event.getClass().getSimpleName())
);

Bestellungen (Order) und Positionen (Item) werden hier ohne Kommandos erzeugt. Alleine der Aufruf des Konstruktors oder der Methode addItem hat in der Implementierung der Klasse Order (ein EventSourcedAggregate) ein entsprechendes Ereignis gespeichert. Beim Aufruf der Methode save wurden diese durch das EventSourcingOrderRepository ausgelesen und in einen EventStreamStore gespeichert. Im ersten Beitrag habe ich Bestellungen jedoch folgendermaßen mit Kommandos erzeugt. Nachfolgendes Beispiel zeigt, wie das die Erzeugung und Veränderung von Aggregaten mittels Kommandos auch in Kombination mit dem Event Sourcing Ansatz kombiniert werden können.

IStoreEventStreams eventStreamStore = new InMemoryEventStreamStore();
Orders orders = new EventSourcingOrderRepository(eventStreamStore);

InMemoryCommandDispatcher dispatcher = new InMemoryCommandDispatcher();
dispatcher.registerHandler(AddOrder.class, new AddOrderHandler(orders));
dispatcher.registerHandler(AddOrderItem.class, new AddOrderItemHandler(orders));

UUID orderId = dispatcher.dispatch(new AddOrder("Jon Doe"));
dispatcher.dispatch(new AddOrderItem(orderId, "4260107220015", 2));
dispatcher.dispatch(new AddOrderItem(orderId, "4260107222972", 0));

System.out.println("Stored events:");
eventStreamStore.get(orderId).getEvents().forEach(event ->
    System.out.println(" - " + event.getClass().getSimpleName())
);

Die Konsolenausgabe beider Beispiele ist vollkommen identisch.
Was nun noch fehlt sind die Lesemodelle.

Lesemodelle als Konsumenten von Ereignissen

Um die Lesemodell zu aktualisieren, registriere ich nun einen Event Handler.

InMemoryEventBus eventBus = new InMemoryEventBus();
OrdersByCustomer ordersByCustomer = new OrdersByCustomer();
eventBus.registerConsumer(OrderCreated.class, ordersByCustomer);

IStoreEventStreams eventStreamStore = new InMemoryEventStreamStore();
Orders orders = new EventSourcingOrderRepository(eventStreamStore);

InMemoryCommandDispatcher dispatcher = new InMemoryCommandDispatcher();
dispatcher.registerHandler(AddOrder.class, new AddOrderHandler(orders));
dispatcher.registerHandler(AddOrderItem.class, new AddOrderItemHandler(orders));

UUID orderId = dispatcher.dispatch(new AddOrder("Jon Doe"));
dispatcher.dispatch(new AddOrderItem(orderId, "4260107220015", 2));
dispatcher.dispatch(new AddOrderItem(orderId, "4260107222972", 0));

System.out.println("Stored events:");
eventStreamStore.get(orderId).getEvents().forEach(event ->
    System.out.println(" - " + event.getClass().getSimpleName())
);

ordersByCustomer.print();

Leider hat diese Änderung keinen Effekt. Der Aufruf von ordersByCustomer.print() erzeugt keine Ausgabe. Woran liegt das? Die Veröffentlichung der Ereignisse auf den EventBus erfolgt schlichtweg nicht. Um dies zu ermöglichen, hatte ich im ersten Beitrag über einen Command Handler (AddOrderHandler) einen Event Bus in das Aggregat (Order) injected. Damit konnte das Aggregat die von ihm ausgelösten Ereignisse auf den Event Bus veröffentlichen. Das wäre an dieser Stelle auch möglich, ist aber gar nicht notwendig. Da beim Speichern eines Aggregats durch ein Repository sowieso alle neu erzeugten Ereignisse ausgelesen werden müssen, kann die Veröffentlichung der Ereignisse auch durch das Repository übernommen werden. Das hat vor allem den Vorteil, dass nicht jedes Aggregat diesen Mechanismus immer wieder implementieren muss.

abstract class EventSourcingRepository <A> implements IStoreAggregates<A> { 
  private final IStoreEventStreams streamStore; 
  private final ITransferEvents eventBus; 

  EventSourcingRepository(IStoreEventStreams streamStore, ITransferEvents eventBus) { 
    this.streamStore = streamStore; 
    this.eventBus = eventBus; 
  } 

  @Override 
  public final void save(A aggregate) { 
    EventStream eventStream = streamStore.get(aggregate.getId()); 
    List uncommittedEvents = aggregate.getUncommittedEvents(); 
    if (eventStream == null) { 
      eventStream = new EventStream(aggregate.getId(), uncommittedEvents); 
    } else { 
      eventStream.addAll(uncommittedEvents); 
    } 
    streamStore.save(eventStream); 
    uncommittedEvents.forEach(eventBus::send); 
    aggregate.markEventsAsCommitted(); 
  } 

    //... 
} 

class EventSourcingOrderRepository extends EventSourcingRepository implements Orders { 
  EventSourcingOrderRepository(IStoreEventStreams streamStore, ITransferEvents eventBus) { 
    super(streamStore, eventBus); 
  } 
} 

Mit dieser minimalen Anpassung und der Injection des Event Bus in das Repsitory, funktioniert das Zusammenspiel aller Komponenten wieder.

Orders orders = new EventSourcingOrderRepository(eventStreamStore, eventBus);

Um dies zu validieren nachfolgend die Konsolenausgabe dieser kombinierten Lösung, die der aus dem CQRS-Beispiel im ersten Beitrag entspricht.

Stored events:
- OrderCreated
- OrderItemAdded
- OrderItemAdded
Customer: Jon Doe
- eeedee8d-e61d-452e-9d2b-e253b3a7dc60

Damit sollte auch klar sein, warum für viele CQRS und Event Sourcing anatomisch zusammen gehören. Beide Ansätze zu kombinieren ist auf Basis einer ereignisbasierten Architektur trivial. Wie ich aber auch dargelegt habe, gibt es keine Notwendigkeit beide Ansätze zu kombinieren. Für viele Anwendungsfälle ist es vollkommen ausreichend allein CQRS oder Event Sourcing einzusetzen. Wann und wie es sich lohnt das eine oder andere Pattern einzusetzen erfahren Sie übrigens auch in unseren DDD Workshops mit Architekturschwerpunkt.