Loading presentation...

Present Remotely

Send the link below via email or IM


Present to your audience

Start remote presentation

  • Invited audience members will follow you as you navigate and present
  • People invited to a presentation do not need a Prezi account
  • This link expires 10 minutes after you close the presentation
  • A maximum of 30 users can follow your presentation
  • Learn more about this feature in our knowledge base article

Do you really want to delete this prezi?

Neither you, nor the coeditors you shared it with will be able to recover it again.


DDD/CQRS/ES in practice - Axon to the rescue - GeeCON 2013

No description

Piotr Wyczesany

on 15 May 2013

Comments (0)

Please log in to add your comment.

Report abuse

Transcript of DDD/CQRS/ES in practice - Axon to the rescue - GeeCON 2013

Presentation Layer Business Logic Layer Data Access Layer Classic 3-Layered architecture Domain
Design Problem! One model complex business
logic processing data
presentation A little history lesson 1988
Bertrand Mayer
"Object-Oriented Software Construction" Command Query
Separation Each method can be either Command performs an action Query returns data coined term: what if... ...apply it on
higher level ? Presentation Layer Commands DTOs Read side Write side Event Store Events Event Bus Event Listener Denormalized
views CQRS overview Presentation Layer Write Side Read Side Commands Queries Write Model Read Model Domain-Driven Design's
building blocks Simple DTOs
prepared for views Two models
responsible for different things Classic SQL
database Event
Sourcing or Storage SQL
Views NoSQL
Solution or Storage Command Query Responsibility Segregation Event Database Basics of Event Sourcing Instead of storing system's current state in tables

we store all the history as a series of small deltas

that happend from the point 0 events small deltas == objects need to
recreate their state
based on historical events based on them
we can (re)generate
view projections Delta 1 Delta 2 Delta 3 Delta n 2. Transaction ends 1. Events are stored in Event Store 3. Events are sent to bus 4. Dispatcher sends events
to Event Handlers 5. Events are denormalised
to query projections Why? focus complex domain processing
where it happens - in Write Side data presentation and quick reads
where it is required - in Read Side scalability reads happen more frequently in that world we don't like frameworks! we like to focus on the domain frameworks usually help to solve
it's creator's problems we can quite quickly write
all the necessary code really :) but... each DDD/CQRS/ES system
has the same infrastructure code sometimes it can be tricky still has to be written,
tested and maintained No silver bullet... framework can handle
a lot of boilerplate code... ...if used wisely ...if is flexible enough ...if is not too invasive right tool for the job! Axon for the rescue open-source DDD/CQRS/ES
framework for Java... ...and probably other JVM languages http://axonframework.org/ Axon supports: Commands Aggregate modeling Repositories Event Stores Event Handling Sagas Testing Spring integration in biology: part of neuron transmits information
to other neurons Saga Commands Command Expressed
intent Required
information + classname immutable state Custom CommandGateway GatewayProxyFactory Creates Command Gateway instance based on any interface @MetaData - on arguments @Timeout - on methods Command Bus dispatches Commands to Handlers SimpleCommandBus single thread allows interceptors
CommandHandlerInterceptor maintains
Unit of Work in one JVM Disruptor Two groups of threads changing
Aggregate's state storing and
publishing Events DisruptorCommandBus limitations: only Event Sourcing only single Aggregate's
state can be changed which is a good practice :) should not cause
rollback of Unit of Work messing with Commands order when creating Aggregate,
other Commands may
be reordered only for few millis but... outperformes SimpleCommandBus by a factor 4 :) Command Handlers CommandHandler interface: // T is the Command type that it can handle
Object handle(CommandMessage<T> command, UnitOfWork uow); Handlers are subscribed to CommandBus Annotation based Handlers only one Handler is assigned to Command class AggregateAnnotationCommandHandler // on any method of an object
@CommandHandler turns any object into
CommandHandler works on many methods in class Spring support:
<axon:annotation-config/> & @Component Unit of Work Command processing is a single unit UnitOfWork Axon's
internal class all Aggregate changes
are bound to it hook point
for interceptors no need
to access it directly Transaction can be bound to Unit of Work Interceptors CommandHandlerInterceptor interface: // may take some actions before/after Command handling
Object handle(CommandMessage<?> commandMessage,
UnitOfWork unitOfWork, InterceptorChain interceptorChain)
throws Throwable; CommandDispatchInterceptor interface: // may alter the Command with metadata, or block it with Exception
CommandMessage<?> handle(CommandMessage<?> commandMessage); BeanValidationInterceptor AuditingInterceptor Distributing Command Bus allows to distribute commands across many JVMs DistributedCommandBus CommandBusConnector RoutingStrategy JGroupsConnector MetaDataRoutingStrategy AnnotationRoutingStrategy requires Command Gateway CommandGateway interface: // sends the Command and invokes given callback
<R> void send(Object command, CommandCallback<R> callback); // sends and blocks until Command returns result
<R> R sendAndWait(Object command); // sends and blocks (with given timeout) until Command returns result
<R> R sendAndWait(Object command, long timeout, TimeUnit unit); // fire and forget
void send(Object command); Domain-Driven Design's
Building Blocks Aggregates Repositories Events Describes
what happened Immutable Any class All Events from Aggregates
are wrapped in DomainEventMessage you need to
extend framework class but it helps
with Events
and Repositories Axon provides couple of interfaces and basic implementations: interface AggregateRoot
- basic operations for Repository abstract class AbstractAggregateRoot
- basic implementation // can be used with JPA interface EventSourcedAggregateRoot
- basic operations for Event Sourced Repository abstract class AbstractEventSourcedAggregateRoot
- basic implementation for Event Sourced Aggregates abstract class AbstractAnnotatedAggregateRoot
- gives support for @EventHandler on methods no framework can help you model your domain Repository interface: // loads Aggregate by id
T load(Object aggregateIdentifier); // loads Aggregate by id and checks version
T load(Object aggregateIdentifier, Long expectedVersion); // adds newly created given Aggregate
void add(T aggregate); // no delete() method - Aggregates have markDeleted() Standard Repositories Classic approach:
each change overrides old state AbstractRepository - synchronized with Unit of Work LockingRepository - prevents concurrent modifications RetryScheduler - can retry command if failed GenericJpaRepository - stores JPA compatible Aggregates Event Sourcing
Repositories handle EventSourcedAggregateRoot EventSourcingRepository EventStore actual storage mechanism AggregateFactory for creating Aggregate in zero state GenericAggregateFactory - uses empty constructor SpringPrototypeAggregateFactory - uses Spring Caching needs CachingEventSourcingRepository needs JCache works with pessimistic locking strategy Caches Aggregates Repositories public interface MyGateway {

// fire and forget
void sendCommand(MyCommand command);

// attaches meta data and wait for a result for 10 seconds
@Timeout(value = 20, unit = TimeUnit.SECONDS)
ReturnValue sendCommandAndWaitForAResult(
MyCommand command, @MetaData("userId") String userId);

// caller decides how long method will wait
void sendCommandAndWait(MyCommand command,
long timeout, TimeUnit unit)
throws TimeoutException, InterruptedException;
} @Component
public class MyCommandHandler {

public void handle(MyCommand command) {
// handle the command and do something useful

public void handle(MyOtherCommand command) {
// handle the command and do something useful
} public class MyAggregate extends AbstractAnnotatedAggregateRoot {
public MyAggregate(AggregateIdentifier identifier) {


public void doSomething(String reason) {
apply(new SomethingDoneEvent(reason));

private void handle(SomethingDoneEvent event) {
this.reason = event.getReason();
} Event Serialization Serializer interface JavaSerializer XStreamSerializer suitable if Event classes
don't change over the time XML JSON Event Store implementations Files JPA MongoDB Others File based Event Store FileSystemEventStore simplest possible stores Events
in flat files good performance easy configuration no transactions! JPA based Event Store JpaEventStore stores events in Entries supports JPA transactions serialized Event
or Snapshot some metadata DomainEventEntry SnapshotEventEntry automatically detects Hibernate MongoDB
Event Store MongoEventStore Events collection Snapshots collection transactions? StorageStrategy Document per Event Document per commit DocumentPerCommitStorageStrategy DocumentPerEventStorageStrategy easier to query
Events manually atomicity Own Event Store DomainEventStream - use for reading and writing unofficial WIP on: Greg Young's Event Store Cassandra Redis Event upcasting Event snapshotting software changes over the time Event's structure changes Events need upcasting Event in version X Event in version X + 1 can be chained Upcaster has to be written manually long-living Aggregates may have too many Events in history snapshot is a serialized Aggregate's state at a given time snapshot triggering number of Events
since last snapshot time to load
Aggregate time-based Snapshot triggering EventCountSnapshotterTrigger requires: Snapshotter interface trigger // treshold Cache AggregateSnapshotter requires: EventStore Executor // ThreadPoolExecutor Conflict resolution User A sends MyCommand
to Aggregate with version X User B sends MyOtherCommand
to Aggregate with version X a conflict but sometimes those Commands
have different intent ConflictResolver interface: public class MyConflictResolver implements ConflictResolver {

// throw new ConflictingModificationException if cannot resolve conflict
// return silently otherwise
void resolveConflicts(List<DomainEventMessage> appliedChanges,
List<DomainEventMessage> committedChanges) {
} dispatches Events
to Event Listeners Event Listeners have to be registered in Event Bus Simple Event Bus EventBus very basic implementation SimpleEventBus one thread dispatches
sequentially exception breaks
and rethrows Clustering Event Bus Bundles Event Listeners
into clusters Dispatches between
different JVMs ClusteringEventsBus ClusterSelector based on EventBusTerminal bridge between clusters ClassNamePrefixClusterSelector ClassNamePatternClusterSelector AnnotationClusterSelector some in single JVM some remotely Spring AMQP Event Listener EventListener interface // handles MyEvent received from Event Bus
void handle(EventMessage event); EventMessage is Axon's class a lot of if-else Annotated Event Handler similar to Annotated Command Handler AnnotationEventListenerAdapter turns any object into
Event Handler @Component
public class MyEventHandler {

public void handle(MyEvent event) {

public void handle(MyOtherEvent event) {
} Spring support:
<axon:annotation-config/> Asynch Event Processing improves scalability introduces
Eventual Consistency AsynchronousCluster requires Executor
// ThreadPoolExecutor SequencingPolicy FullConcurrencyPolicy SequentialPolicy SequentialPerAggregatePolicy May use
Transaction Manager Error Handling default ErrorHandler with Transaction Manager without Transaction Manager rollback
retry after 2 mins log exception
continue to next Listener Custom Error Handlers ErrorHandler interface // handle the Exception raised by given Event Listener
RetryPolicy handleError(Throwable exception,
EventMessage<?> eventMessage, EventListener eventListener); RetryPolicy // ignore failure and proceed to next Event Listener
public static RetryPolicy proceed();

// rollback and skip Event processing
public static RetryPolicy skip();

// rollback, skip and reschedule after given time
public static RetryPolicy retryAfter(int timeout, TimeUnit unit) BASE transaction Common misuse of ACID transaction: ... a bank transfer BASE transaction cannot rollback... ...needs compensating action instead Saga
in Axon Special Event Handler manages Long Living Transactions responsible for business transactions Saga life cycle Saga interface AbstractAnnotatedSaga gives support for @SagaEventHandler start saga with @StartSaga end saga with @EndSaga or calling end() method Event Handling Saga S1 listens for Event E1 from Aggregate A1
Saga S2 listens for Event E2 from Aggregate A2 AssociationValue type of identifier field name in Event corresponding value field value in Event @SagaEventHandler(associationProperty="myEventProperty") method annotations Time Handle situation when nothing happens for some time. EventScheduler schedules Event for publication SimpleEventScheduler pure JVM not persistent QuartzEventScheduler persistence misfire management clustering Saga repository Saga can live very long Has to be stored Has to be loaded by association Id SagaRepository InMemorySagaRepository
in-memory JpaSagaRepository
serialization like with Events MongoSagaRepository
single document
per saga instance Testing Event Sourced Aggregates Scenarios:
Given: a bunch of historical Events
When: a Command
Then: a bunch of new Events Given-When-Then Fixture FixtureConfiguration FixtureConfiguration fixture =
Fixtures.newGivenWhenThenFixture(MyAggregate.class); @Test
public void testFirstFixture() {
fixture.given(new MyEvent(1))
.when(new MyCommand())
.expectEvents(new MyEvent(2));
} in JUnit Testing Event Sourcing constraint Each Aggregate's change
must happen only
in methods applying Events Axon automatically detects illegal state changes during tests Testing Annotated Sagas Scenarios:
Given: a bunch of Events (from certain Aggregates)
When: an Event arrives or time elapses
Then: expect certain behavior or state Saga test fixture AnnotatedSagaTestFixture AnnotatedSagaTestFixture fixture = new AnnotatedSagaTestFixture(MySaga.class); @Test
public void testFirstFixture() {
fixture.givenAggregate(myAggregateId).published(new MyEvent())
Matchers.listWithAllOf(new MarkAsOverdueCommand()));
} in JUnit Axon Framework open-source DDD/CQRS/ES
framework for Java... Commands Aggregate modeling Repositories Event Stores Event Handling Sagas Spring integration Testing http://www.axonframework.org Stream: In particular Aggregates Repositories Events // aggregates one (or more) Entities and/or Value Objects // protects invariants
(a transaction boudnary) // loads and stores Aggregates by id // a fact that occured in the domain // building blocks // no findBySomeQuery() method - Aggregates are supposed to be loaded by id only // no update() method - Aggregates are bound to Unit of Work Event upcaster's job: Piotr Wyczesany DDD/CQRS/ES in practice - Axon to the rescue @WyczesCalvo eventuallyinconsistent.com Upcaster interface can work on
org.dom4j.Document Agenda 1. Classic 3-layered architecture 2. Little history lesson 3. Basics of Command Query Responsibility Segregation 4. Basics (really) of Domain-Driven Design 5. Basics of Event Sourcing 6. Axon to the rescue! public class SomeClient {
private MyGateway myGateway;

// create gateway of type MyGateway
public SomeFatClient(GatewayProxyFactory factory) {
this.myGateway = factory.createGateway(MyGateway.class);

// react to some GUI action
public void onSomeButtonClicked() {
myGateway.sendCommand(new MyCommand(...));
} meetup.com/DDD-KRK
Full transcript