In den letzten Jahren habe ich in Projekten, die sich Microservice-Architekturen zu nutze machen, und der häufig damit verbundenen Nutzung von Domain Driven Design (DDD), immer wieder bestimmte Architekturstile bzw Patterns in der Praxis beobachten und einsetzen können. Zwei dieser Patterns sind Command Query Responsibility Segregation (CQRS) und Event Sourcing. Erstaunlicherweise sieht man beide Patterns oftmals einhergehend, so dass es fast schon den Anschein erweckt, als wären beide Patterns anatomisch untrennbar. In einer kurzen Serie von Blogbeiträgen werde ich anhand einfacher Beispiele zeigen, dass die einzelnen Patterns sehr wohl ohne einander auskommen. Allerdings werde ich auch aufzeigen, wie sie zusammen spielen können. Aufgrund meiner festen Überzeugung, dass man Patterns am besten lernt, wenn man sie zunächst einmal selbst implementiert hat, erläutere ich anhand konkreter Code-Beispiele Schritt für Schritt, wie die grundlegende Anatomie von CQRS- und Event Sourcing-Anwendungen in Java aussieht.

An dieser Stelle sei vorweg noch die Anmerkung gestattet, dass diejenigen, die weniger am Erlernen der Patterns, sondern vielmehr am Einsatz der Patterns in der Praxis interessiert sind, zunächst die gängigen Frameworks wie z.B. Axon oder Lagom, in Betracht ziehen sollten.

 

Command Query Responsibility Segregation (CQRS)

In diesem ersten Blogbeitrag widme ich mich zunächst ausschließlich der Implementierung von CQRS ohne auf Event Sourcing einzugehen. Ich erkläre, dass und wie CQRS vollkommen ohne Event Sourcing implementierbar ist. Der zweite Beitrag dreht den Spieß gewissermaßen um, bevor ich im dritten Beitrag aufzeige, wie beide Patterns gut zusammenspielen können.

Aber was ist dieses CQRS eigentlich? Es handelt sich dabei um ein Architektur-Pattern bzw. einen Architekturstil. CQRS basiert auf der Idee, dass für die Aktualisierung bzw. das Schreiben von Daten ein anderes Modell verwendet werden kann, als für das Lesen von Daten. Für Details verweise ich an dieser Stelle auf den guten Artikel von Martin Fowler zu CQRS. Dort findet man die notwendigen theoretischen Details, die ich hier nicht alle wiederholen möchte. Ich starte hier lieber direkt mit der Implementierung.

 

Commands in CQRS

Für die Implementierung von CQRS widme ich zunächst dem C in CQRS, also den Kommandos (Commands). Um das Konzept eines Kommandos zu implementieren bediene ich mich einfach einem einfachen Interface.

interface IAmACommand<R> {
}

Da Kommandos in der Regel ein Ergebnis erzeugen, besitzt das Interface einen generischen Typparameter R, der den Typ des Ergebnisses spezifiziert.

Implementierungen dieses Interfaces stellen somit später die konkreten Kommandos dar. Um diese ausführen zu können, bedarf es der Verarbeitung der in einem Kommando enthaltenen Informationen. Für die Verarbeitung sind s.g. Command Handler zuständig. Auch diese definiere ich einfach durch ein Interface:

@FunctionalInterface
interface IHandleCommands <C extends IAmACommand<R>, R> {
  R handle(C cmd);
}

Das Interface deklariert eine Methode mit dem Namen handle, die ein spezielles Kommando cmd vom Typ C verarbeiten kann. Dabei ist der Typ zunächst unspezifiziert und muss nur eine Implementierung des Interfaces IAmACommand sein. Der Rückgabetyp der Methode entspricht dem Ergebnistyps eines Kommandos.

Damit ein Command Handler ein Kommando ausführen kann, wird ein Command Dispatcher benötigt, dessen Aufgabe darin besteht, den richtigen Command Handler für ein spezifisches Kommando zu finden und die Ausführung des Kommandos an diesen zu delegieren.

@FunctionalInterface
interface IDispatchCommands {
  <C extends IAmACommand<R>, R> R dispatch(C cmd);
}

Auch in diesem Fall entspricht der Rückgabetyp der dispatch-Methode dem Ergebnistyp eines Kommandos.

Eine einfach Implementierung kann zum Beispiel wie folgt aussehen:

class InMemoryCommandDispatcher implements IDispatchCommands {

 private final Map<Class<? extends IAmACommand>, IHandleCommands<? extends IAmACommand, ?>> handlers = new HashMap<>();

 @Override
 @SuppressWarnings("unchecked")
 public <C extends IAmACommand<R>, R> R dispatch(C cmd) {
   IHandleCommands<? extends IAmACommand, ?> handler = handlers.get(cmd.getClass());
   return ((IHandleCommands<C, R>) handler).handle(cmd);
 }

 <C extends IAmACommand<R>, R> void registerHandler(Class commandType, IHandleCommands<C, R> handler) {
   handlers.put(commandType, handler);
 }

}

Dieser Ansatz erlaubt es, über die Methode registerHandler für einzelne Kommandos entsprechende Command Handler zu registrieren. Sie werden in diesem einfachen Beispiel einfach in einer Map im Speicher vorgehalten.

Damit ist das C von CQRS auf einfache Art bereits umgesetzt.

 

Eine (sehr) einfache Beispiel-Domäne

Zur Erläuterung der Funktionsweise, nehme ich ein fiktives Beispiel an, in dem Bestellungen angelegt und Bestellpositionen zu Bestellungen hinzugefügt werden können. Das wäre einfach wie folgt zu implementieren.

Zunächst einmal benötige ich zwei Klassen für eine Bestellung und Bestellposition.

class Order {

  private final UUID id;
  private final String customer;
  private final Set orderItems = new HashSet<>();

  Order(UUID id, String customer) {
    this.id = id;
    this.customer = customer;
  }

  void addItem(String itemType, int amount) {
    orderItems.add(new OrderItem(itemType, amount));
  }

  UUID getId() {
    return id;
  }

}

class OrderItem {

  private final String ean;
  private final int amount;

  OrderItem(String ean, int amount) {
    this.ean = ean;
    this.amount = amount;
  }

}

Darüber hinaus bedarf es zweier Kommandos, die einfache POJOs sind und das Kommando-Interface implementieren.

class AddOrder implements IAmACommand<UUID> {

  private final String customer;

  AddOrder(String customer) {
    this.customer = customer;
  }

  String getCustomer() {
    return customer;
  }
}

class AddOrderItem implements IAmACommand<Void> {

  private final UUID orderId;
  private final String ean;
  private final int amount;

  AddOrderItem(UUID orderId, String ean, int amount) {
    this.orderId = orderId;
    this.ean = ean;
    this.amount = amount;
  }

  UUID getOrderId() {
    return orderId;
  }

  String getEAN() {
    return ean;
  }

  int getAmount() {
    return amount;
  }

}

Die Beispiele verdeutlichen, wofür der Ergebnistyp eines Kommandos genutzt werden kann. Das Kommando AddOrder hat den Ergebnistyp UUID. Dieser dient dazu, die ID der neu angelegten Bestellung an den Aufrufer zu übermitteln. Der Aufrufer referenziert damit die Bestellung und kann anhand dieser weitere Änderungen an der Bestellung (z.B. mittels des Kommandos AddOrderItem) vornehmen.

Analog zu den Kommandos bedarf es auch zweier Command Handler. Der eine Command Handler für das Anlegen einer neuen Bestellung generiert die zuvor genannte ID für die Bestellung.

class AddOrderHandler implements IHandleCommands<AddOrder, UUID> {

  private final Orders orders;

  AddOrderHandler(Orders orders) {
    this.orders = orders;
  }

  @Override
  public UUID handle(AddOrder cmd) {
    UUID orderId = UUID.randomUUID();
    orders.add(new Order(orderId, cmd.getCustomer()));
    return orderId;
  }

}

Anschließend speichert er die neu angelegte Bestellung in einem Repository namens Orders.

interface Orders {
  void add(Order order);
  Order get(UUID orderId);
}

Der andere Command Handler für das Hinzufügen von Bestellpositionen lädt anhand der ID die Bestellung.

class AddOrderItemHandler implements IHandleCommands<AddOrderItem, Void> {

  private final Orders orders;

  AddOrderItemHandler(Orders orders) {
    this.orders = orders;
  }

  @Override
  public Void handle(AddOrderItem cmd) {
    Order order = orders.get(cmd.getOrderId());
    order.addItem(cmd.getEAN(), cmd.getAmount());
    return null;
  }

}

Anschließend ruft er die Methode addItem an der Bestellung auf, die die Zustandsveränderung vornimmt. Dies ist dahingehend zu betonen, dass nicht der Command Handler selbst den Zustand von Aggregaten verändert. Die Zustandsänderung erfolgt immer über das Aggregat selbst. Ich werde in einem der nächsten Blogbeiträge erläutern inwiefern dies für die Implementierung von Event Sourcing relevant ist.

 

Zustandsänderungen mit Kommandos

Für eine einfache Anwendung muss nun zunächst die „Infrastruktur“ initialisiert werden:

InMemoryCommandDispatcher dispatcher = new InMemoryCommandDispatcher();
Orders orders = new InMemoryOrdersRepository();
dispatcher.registerHandler(AddOrder.class, new AddOrderHandler(orders));
dispatcher.registerHandler(AddOrderItem.class, new AddOrderItemHandler(orders));

Die Infrastruktur für die Verarbeitung von Kommandos besteht im Kern aus zwei Komponenten:

  1. einem Command Dispatcher, der die Kommandos entgegen nimmt und sie an den entsprechend registrierten Command Handler routet
  2. einer Implementierung für das Repository Orders. Die Implementierung kann hier analog zum InMemoryCommandDispatcher erfolgen:
class InMemoryOrdersRepository implements Orders {

  private final Map<UUID, Order> orders = new HashMap<>();

  @Override
  public void add(Order order) {
    orders.put(order.getId(), order);
  }

  @Override
  public Order get(UUID orderId) {
    return orders.get(orderId);
  }
}

Anschließend können Kommandos über den Dispatcher abgesetzt werden, um neue Bestellungen anzulegen.

AddOrder addOrder = new AddOrder("Jon Doe");
UUID orderId = dispatcher.dispatch(addOrder);

Analog können Bestellpositionen hinzugefügt werden.

AddOrderItem addOrderItem = new AddOrderItem(orderId, "4260107220015", 2);
dispatcher.dispatch(addOrderItem);

Im Wesentlichen ist das auch schon alles, was sich hinter dem Pattern von Kommandos in einer CQRS-Architektur verbirgt.

 

Queries in CQRS

An dieser Stelle werde ich mich daher direkt den lesenden Abfragen (Queries), dem Q in CQRS, widmen. Solche Abfragen sind im Prinzip genau so leichtgewichtig implementierbar wie Kommandos. Dabei nutze ich zwar das Konzept von Ereignissen (Events) zur Trennung der Abfragen von den Kommandos, spare das Thema Event Sourcing aber wie versprochen zunächst aus.

 

Lesemodelle auf Basis von Ereignissen

Für die Trennung nutze ich das Konzept von Ereignissen und bilde es analog zu den Kommandos über ein Interface ab.

interface IAmAnEvent {
}

Die Lesemodelle, die ich für die Abfragen benötige, leite ich einfach aus den Ereignissen ab. Die Lesemodelle sind sozusagen Konsumenten oder Empfänger dieser Ereignisse. Das Konzept der Konsumenten bilde ich ebenfalls durch ein einfaches Interface ab.

interface IConsumeEvents extends Consumer<IAmAnEvent> {

  @Override
  default void accept(IAmAnEvent event) {
    EventApplier.apply(this, event, "consume");
  }

  default void consume(IAmAnEvent event) {
  }
}

Ein solcher Konsument hat zwei Standard-Methoden accept und  consume. Die Methode accept nimmt ein Ereignis als Parameter entgegen und dient als eigentliche Schnittstelle des Konsumenten. Da Java kein polymorphisches Dispatching erlaubt und ich hier nicht in epischer Breite ein Visitor-Pattern einführen möchte, nutzt die Implementierung die Klasse  EventApplier. Diese erlaubt es die für das Ereignis spezifischte Überladung der Methode consume zu ermitteln und aufzurufen. Hierdurch ist es möglich, dass ein einzelner Konsument unterschiedliche Arten von Ereignissen verarbeiten kann. Die Details der Implementierung der Klasse  EventApplier lasse ich an dieser Stelle aus. Eine Implementierung ist aber auf Github zu finden.

Ereignisse und Konsumenten erlauben nun das Erstellen von Lesemodellen. Für die Beispiel-Domäne kann ein Lesemodell beispielsweise folgendermaßen erstellt werden.

class OrdersByCustomer implements IConsumeEvents {

  private final Map<String, Set> ordersByCustomer = new HashMap<>();

  void consume(OrderCreated orderCreated) {
    ordersByCustomer.computeIfAbsent(orderCreated.getCustomer(),
        orderId -> new HashSet<>()).add(orderCreated.getOrderId());
  }

  void print() {
    ordersByCustomer.forEach((customer, orderIds) -> {
      System.out.println("Customer: " + customer);
      orderIds.forEach(orderId -> System.out.println(" - " + orderId));
    });
  }
}

Das Lesemodell hält alle Bestellungen nach Kunden sortiert vor. Eine praxisrelevante Anwendung persistiert ein solches Modell (hier insbesondere die Property  ordersByCustomer) in der Regel spezifisch für den entsprechenden Anwendungszweck – das Abfragen aller Bestellungen nach Kunden sortiert. Wie man sieht, ist an dieser Stelle kein Zugriff auf das Schreibmodell oder die Kommandos notwendig. Daher ist es auch durchaus denkbar, Lesemodelle asynchron zu aktualisieren. Es bleibt allerdings die Frage offen, wie nun die Kommandos mit den Ereignissen und den Lesemodellen zusammenspielen. Insbesondere die Herkunft des Ereignisses  OrderCreated ist bislang unklar.

 

Zusammenspiel von Commands und Queries

Ereignisse werden durch Kommandos ausgelöst. Dazu ist es notwendig das Ereignis zunächst einmal zu definieren.

class OrderCreated implements IAmAnEvent {

  private final UUID orderId;
  private final String customer;

  OrderCreated(UUID orderId, String customer) {
    this.orderId = orderId;
    this.customer = customer;
  }

  UUID getOrderId() {
    return orderId;
  }

  String getCustomer() {
    return customer;
  }
}

Anschließend muss auf der Schreibseite das Aggregat erweitert werden, um das Ereignis zu veröffentlichen.

class Order {

  private final UUID id;
  private final String customer;
  private final Set orderItems = new HashSet<>();
  private final ITransferEvents eventBus;

  Order(UUID id, String customer, ITransferEvents eventBus) {
    this.id = id;
    this.customer = customer;
    this.eventBus = eventBus;
    eventBus.send(new OrderCreated(id, customer));
  }

  // ... 

}

Die Veröffentlichung des Ereignisses erfolgt über einen EventBus wie folgt:

@FunctionalInterface
interface ITransferEvents {
  void send(IAmAnEvent event);
}
class InMemoryEventBus implements ITransferEvents {

  private final Map<Class<? extends IAmAnEvent>, Set> eventConsumer = new HashMap<>();

  @Override
  public void send(IAmAnEvent event) {
    eventConsumer.getOrDefault(event.getClass(), Collections.emptySet())
      .forEach(consumer -> consumer.accept(event));
  }

  void registerConsumer(Class<? extends IAmAnEvent> eventType, IConsumeEvents consumer) {
    eventConsumer.computeIfAbsent(eventType, key -> new HashSet<>()).add(consumer);
  }
}

Der Einfachheit halber gehe ich auch hier von einer in-memory Implementierung aus. Diese übermittelt die Ereignisse an alle Konsumenten, die über die Methode registerConsumer entsprechend registriert wurden.

Die Einzelteile können einfach über die „Infrastruktur“ miteinander verdrahtet werden.

InMemoryCommandDispatcher dispatcher = new InMemoryCommandDispatcher();
InMemoryEventBus eventBus = new InMemoryEventBus();

OrdersByCustomer ordersByCustomer = new OrdersByCustomer();
eventBus.registerConsumer(OrderCreated.class, ordersByCustomer);

Orders orders = new InMemoryOrdersRepository();
dispatcher.registerHandler(AddOrder.class, new AddOrderHandler(orders, eventBus));
dispatcher.registerHandler(AddOrderItem.class, new AddOrderItemHandler(orders));

Beim Absetzen von Kommandos aktualisiert sich das Lesemodell nun automatisch indem es alle dafür relevanten Ereignisse konsumiert.

AddOrder addOrder = new AddOrder("Jon Doe");
UUID orderId = dispatcher.dispatch(addOrder);

AddOrderItem addOrderItem = new AddOrderItem(orderId, "4260107220015", 2);
dispatcher.dispatch(addOrderItem);

dispatcher.dispatch(new AddOrder("Jane Doe"));
dispatcher.dispatch(new AddOrder("Jon Doe"));

ordersByCustomer.print();

Die Ausgabe der Methode ordersByCustomer.print und somit das Lesemodell sieht in etwa folgendermaßen aus:

Customer: Jon Doe
 - 9ed901e2-49ab-4843-887f-8e0697aef515
 - 338f7857-fe74-4b0b-be76-82b6a5944ee6
Customer: Jane Doe
 - cb57b378-784a-4dc7-8531-148e29c6a30b

Durch die Trennung der Kommandos und der Abfragen sowie dem Zusammenspiel auf Basis von Ereignissen, ist es nun möglich losgekoppelt weitere Abfragen bzw. Lesemodelle zu definieren.

 
Zusammengefasst ist das auch schon die ganze Magie hinter einer CQRS-Implementierung in Java:

  • Kommandos verändern den Zustand des Schreibmodells
  • Kommandos und Zustandsänderungen lösen Ereignisse aus
  • Lesemodelle aktualisieren sich auf Basis der Ereignisse
  • Abfragen nutzen die Lesemodelle, um die notwendigen Daten bereitzustellen

Wie versprochen erläutere ich aufbauen hierauf in meinen nächsten Beitrag die Anatomie von Event Sourcing anhand einer konkreten Implementierung in Java. Im Anschluss widme ich mich dann dem Zusammenspiel der Pattern. So, stay tuned!