Dieser Blogbeitrag ist der zweite in einer Serie über die Anatomie von Command Query Responsibility Segregation (CQRS) und Event Sourcing in Java. Im ersten Blogbeitrag habe ich bereits die Anatomie von CQRS-Architekturen anhand einer konkreten Implementierung in Java erläutert. Dabei hatte ich das Pattern Event Sourcing weitestgehend ausgeklammert. Ziel dabei war es zu zeigen, dass CQRS auch ohne Event Sourcing auskommt. In diesem zweiten Beitrag widme ich mich nun umgekehrt dem Event Sourcing, klammere dafür aber CQRS explizit aus. Das Ziel sollte klar sein – Event Sourcing bedarf keines CQRS. Wie beide Pattern zusammenspielen und warum man denken könnte, dass sie anatomisch zusammen gehören, ist Teil eines späteren Beitrags.

Analog zum ersten Beitrag erläutere ich nachfolgend wieder Schritt für Schritt anhand einer Implementierung, wie
die Anatomie von Event Sourcing aussieht.

 

Event Sourcing

Aber was ist dieses Event Sourcing eigentlich? Vereinfacht gesagt speichert Event Sourcing alle Änderungen des Zustandes einer Applikation in Form einer Sequenz von Ereignissen. Wer sich mit dem Pattern nochmal in der Theorie vertraut machen möchte, dem empfehle ich den Artikel von Martin Fowler.

Im Rahmen dieses Blogbeitrags vereinfache ich dahingehend, dass der wesentliche Unterschied von Anwendungen, die Event Sourcing nutzen, zu denen, die es nicht tun, in der Persistierung von Aggregaten liegt. Beim Event Sourcing wird nicht der aktuelle Zustand eines Aggregates persistiert, sondern die Historie an Ereignissen, die aufgetreten sind, um diesen Zustand zu erreichen.

 

Speichern des aktuellen Zustands

Im ersten Beitrag habe ich ein Bestellung mit Bestellpositionen als Beispiel für ein Aggregat genutzt. Dieses sah folgendermaßen aus.

class Order {

  private final UUID id;
  private final String customer;
  private final Set orderItems = new HashSet<>();

  Order(UUID id, String customer) {
    this.id = id;
    this.customer = customer;
  }

  void addItem(String itemType, int amount) {
    orderItems.add(new OrderItem(itemType, amount));
  }

  UUID getId() {
    return id;
  }

}

class OrderItem {

  private final String ean;
  private final int amount;

  OrderItem(String ean, int amount) {
    this.ean = ean;
    this.amount = amount;
  }

}

Um dieses Aggregat zu speichern, nutzte ich ein Repository, dass den aktuellen Zustand dieses Aggregates speichern konnte. Das nachfolgende Code-Beispiel zeigt eine verallgemeinerte Implementierung. Im Vergleich zum ersten Blogbeitrag habe ich hier lediglich ein paar Abstraktionen eingeführt. Diese sollen helfen, das Repository einfacher dahingehend anzupassen, das Event Sourcing Pattern für die Speicherung zu nutzen.

interface ICanBeIdentified {
  UUID getId();
}

class Order implements ICanBeIdentified {
  // ...
}

interface IStoreAggregates

{ void save(A aggregate); A get(UUID id); } interface Orders extends IStoreAggregates { } class InMemoryOrdersRepository implements Orders { private final Map<UUID, Order> orders = new HashMap<>(); @Override public void save(Order order) { orders.put(order.getId(), order); } @Override public Order get(UUID orderId) { return orders.get(orderId); } }

Die erste Abstraktion ist die Einführung eines Interfaces ICanBeIdentified für alle Aggregate, die eine ID besitzen. Die zweite Abstraktion ist ein Interface IStoreAggregates für Repositories.

 

Speichern von Ereignisströmen

Anstelle des aktuellen Zustands möchte ich nun einen Strom von Ereignissen speichern. Der Plan ist es, hierzu ein Repository zu implementieren, dass diese Art der Speicherung unterstützt. Dazu führe ich zunächst ein Interface ein, das es erlaubt die Aggregate zu identifizieren, die durch solche Repositories persistiert werden können.

interface ICanBeSourcedFromEvents {

  Collection getUncommittedEvents();

  void markEventsAsCommitted();

  void rebuildFromHistory(EventStream history);

}

Das Interface definiert eine Property getUncommittedEvents. Sie liefert alle Ereignisse zurück, die zu Zustandsänderungen seit der letzten Persistierung des Aggregates geführt haben. Sie erlaubt es einem Repository die Ereignisse auszulesen und entsprechend zu persistieren. Nach der Persistierung kann das Repository die Ereignisse mittels der Methode markEventsAsCommitted als persistiert markieren. Die Methode rebuildFromHistory erlaubt es, den Zustand des Aggregates aus einem Strom von Ereignissen (EventStream) wiederherzustellen.

 

Ereignisströme

Das Interface nutzt zwei neue Klassen IAmAnEvent und EventStream. Bei der Klasse IAmAnEvent handelt es sich lediglich um ein Marker-Interface für Ereignisse. Die Klasse EventStream implementiert das Konzept eines Stroms von Ereignissen. Die Implementierung sieht folgendermaßen aus:

class EventStream implements ICanBeIdentified {

  private final UUID id;
  private final List events;

  EventStream(UUID id, List events) {
    this.id = id;
    this.events = new ArrayList<>(events);
  }

  @Override
  public UUID getId() {
    return id;
  }

  List getEvents() {
    return Collections.unmodifiableList(events);
  }

  void add(IAmAnEvent event) {
    this.events.add(event);
  }

  void addAll(List events) {
    this.events.addAll(events);
  }

}

Ein Ereignisstrom hat eine ID, um ihn zu identifizieren. Außerdem besteht er aus einer Liste von Ereignissen (events).

 

Ereignisstrom-basierte Repositories

Auf Basis eines solchen Ereignisstroms lässt sich nun sehr einfach ein abstraktes Repository implementieren.

interface IStoreEventStreams {
  void save(EventStream eventStream);

  EventStream get(UUID id);
}

abstract class EventSourcingRepository

implements IStoreAggregates { private final IStoreEventStreams streamStore; EventSourcingRepository(IStoreEventStreams streamStore) { this.streamStore = streamStore; } @Override public final void save(A aggregate) { EventStream eventStream = streamStore.get(aggregate.getId()); if (eventStream == null) { eventStream = new EventStream(aggregate.getId(), aggregate.getUncommittedEvents()); } else { eventStream.addAll(aggregate.getUncommittedEvents()); } streamStore.save(eventStream); aggregate.markEventsAsCommitted(); } @Override public final A get(UUID id) { EventStream eventStream = streamStore.get(id); if (eventStream == null) { return null; } else { A aggregate = createAggregate(); aggregate.rebuildFromHistory(eventStream); return aggregate; } } @SuppressWarnings(„unchecked“) private A createAggregate() { Class aggregateType = (Class) ((ParameterizedType) getClass().getGenericSuperclass()) .getActualTypeArguments()[0]; try { return aggregateType.newInstance(); } catch (InstantiationException | IllegalAccessException e) { return null; } } } class EventSourcingOrderRepository extends EventSourcingRepository implements Orders { EventSourcingOrderRepository(IStoreEventStreams streamStore) { super(streamStore); } }

Die save-Methode zum Speichern des Aggregats ermittelt nun zunächst, ob für das Aggregat bereits ein Ereignisstrom existiert. Ist dies nicht der Fall, erzeugt sie einen Ereignisstrom mit allen noch nicht persistierten Ereignissen des Aggregats. Gibt es bereits einen Ereignisstrom, ergänzt die Methode alle Ereignisse an den vorhandenen Ereignisstrom. Anschließend speichert sie den Ereignisstrom und markiert alle Ereignisse als persistiert.
Die get-Methode zum Lesen des Aggregats, liest den persistierten Ereignisstrom anhand seiner ID ein, erzeugt eine Instanz des Aggregats und stellt über Aufruf der rebuildFromHistory-Methode den Zustand des Aggregats wieder her.

 

Ereignis-basierte Aggregate

Damit bleiben zwei Fragen offen:

  • Wie erfolgt die Wiederherstellung des Zustands aus einem Ereignisstrom?
  • Wie erzeugt das Aggregat eigentlich seine Ereignisse?

Auch dies lässt sich an einer abstrakten Implementierung eines Aggregats sehr gut erläutern. Diese kann exemplarisch wie folgt angenommen werden:

abstract class EventSourcedAggregate implements ICanBeIdentified, ICanBeSourcedFromEvents {

  private UUID id;
  private List uncommittedEvents = new ArrayList<>();

  @Override
  public final UUID getId() {
    return id;
  }

  protected void setId(UUID id) {
    this.id = id;
  }

  @Override
  public final List getUncommittedEvents() {
    return Collections.unmodifiableList(uncommittedEvents);
  }

  @Override
  public final void markEventsAsCommitted() {
    uncommittedEvents.clear();
  }

  @Override
  public final void rebuildFromHistory(final EventStream history) {
    history.getEvents().forEach(this::applyHistoryEvent);
  }

  protected final void apply(final IAmAnEvent event) {
    apply(event, true);
  }

  private void applyHistoryEvent(IAmAnEvent event) {
    apply(event, false);
  }

  private void apply(final IAmAnEvent event, final boolean isNew) {
    if (isNew) {
      uncommittedEvents.add(event);
    }
    EventApplier.apply(this, event, "handle");
  }

  private void handle(IAmAnEvent event) {
  }

}

Die Implementierung ist sehr einfach. Zunächst einmal wird zur Wiederherstellung des Zustandes in der Methode rebuildFromHistory jedes Ereignis aus dem Ereignisstrom mittels der Methode applyHistoryEvent auf das Aggregat angewendet. Um ein neues Ereignis zu erzeugen bzw. auszulösen, kann ein Aggregat die Methode apply(IAmAnEvent) nutzen. Auch in diesem Fall wird das Ereignis auf das Aggregat angewendet. Zusätzlich wird sich aber das Ereignis als „neu“ in der Liste der noch nicht persistierten Ereignisse gemerkt. Zur Anwendung der Ereignisse auf das Aggregat kommt die bereits aus dem ersten Blogbeitrag verwendete Klasse EventApplier zum Einsatz. Zur Erinnerung: Die Klasse ermöglicht es die für den Eventtyp spezifischte Überladung einer Methode aufzurufen – hier die handle(IAmAnEvent)-Methode.

 

Anwenden von Ereignissen statt Ändern des Zustands

Damit das funktioniert, sind einige Anpassungen am Aggregat Order notwendig.

class Order extends EventSourcedAggregate {

  private final Set orderItems;
  private String customer;

  Order() {
    super();
  }

  Order(UUID id, String customer) {
    apply(new OrderCreated(id, customer));
  }

  String getCustomer() {
    return customer;
  }

  void addItem(String ean, int amount) {
    apply(new OrderItemAdded(getId(), ean, amount));
  }

  void handle(OrderCreated event) {
    setId(event.getOrderId());
    orderItems = new HashSet<>();
    customer = event.getCustomer();
  }

  void handle(OrderItemAdded event) {
    orderItems.add(new OrderItem(event.getEan(), event.getAmount()));
  }

  Collection getOrderItems() {
    return orderItems;
  }
}

Zunächst wird ein weiterer Konstruktor benötigt, der es dem EventSourcingRepository in der Methode createAggregate ermöglicht, eine Instanz des Aggregats zu erzeugen. Die eigentliche Änderung liegt aber nun darin, wie Zuständsänderungen durchgeführt werden. Betroffen sind hier der Konstruktor und die Methode addItem. Beide Methoden verändern nun nicht mehr direkt den Zustand des Aggregats, sondern erzeugen jeweils ein Ereignis und wenden dieses auf das Aggregat an. Da das Aggregat von der Klasse EventSourcedAggregate ableitet, wird nun jeweils die handle-Methode für das entsprechende Ereignis aufgerufen. Die handle-Methoden führen nun die eigentlichen Zuständsänderungen durch. Das mag zunächst etwas umständlich oder ungewohnt erscheinen, aber genau dieser Mechanismus erlaubt es, den Zustand aus dem Ereignisstrom eins zu eins wiederherzustellen.

Die Implementierung ist damit abgeschlossen und alle Komponenten, die für ein einfaches Event Sourcing notwendig sind, vorhanden.

An dieser Stelle möchte ich kurz ein wenig spoilern: Wer aufmerksam bis hier her gefolgt ist, mag sich die Frage stellen, woher plötzlich die Ereignisse OrderCreated und OrderItemAdded kommen. Diese hatte ich bereits im ersten Blogbeitrag für die Trennung von Kommandos und Abfragen in CQRS eingeführt. Ich werde diesen Punkt im nächsten Blogbeitrag noch einmal aufgreifen, wenn es um das Zusammenspiel von CQRS und Event Sourcing geht.

 

Zusammenspiel der Komponenten

Jetzt betrachte ich aber zunächst das Zusammenspiel aller für das Event Sourcing notwendigen Komponenten. Dazu ist wie im nachfolgenden Beispiel einfach nur ein EventSourcingRepository zu initialisieren.

IStoreEventStreams eventStreamStore = new InMemoryEventStreamStore();
Orders orders = new EventSourcingOrderRepository(eventStreamStore);

UUID orderId = UUID.randomUUID();
Order order = new Order(orderId, "Jon Doe");
order.addItem("4260107220015", 2);
order.addItem("4260107222972", 0);
orders.save(order);

System.out.println("Stored events:");
eventStreamStore.get(orderId).getEvents().forEach(event ->
  System.out.println(" - " + event.getClass().getSimpleName())
);

Anschließend habe ich hier eine Bestellung erzeugt und mittels des Repositories gespeichert. Die Ausgabe zeigt, dass drei Ereignisse – eins für das Erzeugen der Bestellung und zwei für das Hinzufügen der Bestellpositionen – im zugehörigen Ereignisstrom gespeichert wurden.

Stored events:
 - OrderCreated
 - OrderItemAdded
 - OrderItemAdded

Natürlich lässt sich die Bestellung aus diesen Ereignissen auch wiederherstellen. Dazu lade ich sie einfach anhand ihrer ID.

Order eventSourcedOrder = orders.get(orderId);
System.out.println("Items for customer " + order.getCustomer() + " in order " + order.getId() + ":");
eventSourcedOrder.getOrderItems().forEach(item -> System.out.println(" - " + item.getAmount() + " x " + item.getEan()));

Wie sich an der Ausgabe erkennen lässt, ist die Bestellung vollständig in dem Zustand, in dem sie gespeichert wurde.

Items for customer Jon Doe in order f5340411-ea03-475b-8568-6466975b9d54:
 - 0 x 4260107222972
 - 2 x 4260107220015

Zusammengefasst ist das auch schon die ganze Magie hinter einer Implementierung von Event Sourcing in Java:

  • Ursprünglich zustandsverändernde Methoden in Aggregaten lösen stattdessen Ereignisse aus
  • Die Anwendung eines Ereignisses auf ein Aggregat verändert den Zustand
  • Die Ereignisse werden als Ereignisstrom persistiert
  • Die Wiederherstellung des Zustands erfolgt aus dem Ereignisstrom durch Anwendung der Ereignisse auf ein „leeres“ Aggregat

Wie versprochen, erläutere ich in meinem nächsten Beitrag wie Event Sourcing und CQRS zusammenspielen und warum man den Eindruck gewinnen könnte, dass sie anatomisch zusammen gehören. Das Ganze natürlich wieder Schritt für Schritt an Code-Beispielen. So, stay tuned!