3
votes

I am starting out with the Axon framework and hit a bit of a roadblock.

While I can load individual aggregates using their ID, I can’t figure out how to get a list of all aggregates, or a list of all aggregate IDs.

The EventSourcingRepository class only has load() methods that return one aggregate.

Is there a way to all aggregate (IDs) or am I supposed to keep a list of all aggregate IDs outside of axon?

To keep things simple I am only using an InMemoryEventStorageEngine for now. I am using Axon 3.0.7.

2

2 Answers

3
votes

First off I'm was wondering why you would want to retrieve a complete list of all the aggregates from the Repository. The Repository interface is set such that you can load an Aggregate to handle commands or to create a new Aggregate.

Asking the question you have, I'd almost guess you're using it for querying purposes rather than command handling. This however isn't the intended use for the EventSourcingRepository.

One reason you'd want this I can think about, is that you want to implement an API call to publish a command to all Aggregates of a specific type in your application. Taking that scenario then yes, you need to store the aggregateId references yourself.

But concluding with my earlier question: why do you want to retrieve a list of aggregates through the Repository interface?

Answer Update

Regarding your comment, I've added the following to my answer:

Axon helps you to set up your application with event sourcing in mind, but also with CQRS (Command Query Responsibility Segregation). That thus means that the command and query side of your application are pulled apart.

The Aggregate Repository is the command side of your application, where you request to perform actions. It thus does not provide a list-of-aggregates, as a command is an expression of intent on a aggregate. Hence it only requires the Repository user to retrieve one aggregate or create one.

The example you've got that you need of the list of Aggregates is the query side of your application. The query side (your views/entities) is typically updated based on events (sourced through events). For any query requirement you have in your application, you'd typically introduce a separate view tailored to your needs.

In your example, that means you'd introduce a Event Handling Component, listening to your Aggregate Events, which update a Repository with query models of your aggregate.

0
votes

The EventStore passed into EventSourcingRepository implements StreamableMessageSource<M extends Message<?>> which is a means of reaching in for the aggregates.

Whilst doing it the framework way with an event handling component will probably scale better (depending on the how its used / the context), I'm pretty sure the event handling components are driven by StreamableMessageSource<M extends Message<?>> anyway. So if we wanted to skip the framework and just reach in, we could do it like this:

    List<String> aggregates(StreamableMessageSource<Message<?>> eventStore) {
        return immediatelyAvailableStream(eventStore.openStream(
                eventStore.createTailToken() /* All events in the event store */
        ))
                .filter(e -> e instanceof DomainEventMessage)
                .map(e -> (DomainEventMessage) e)
                .map(DomainEventMessage::getAggregateIdentifier)
                .distinct()
                .collect(Collectors.toList());
    }

    /*
        Note that the stream returned by BlockingStream.asStream() will block / won't terminate
        as it waits for future elements.
     */
    static <M> Stream<M> immediatelyAvailableStream(final BlockingStream<M> messageStream) {
        Iterator<M> iterator = new Iterator<M>() {
            @Override
            public boolean hasNext() {
                return messageStream.hasNextAvailable();
            }

            @Override
            public M next() {
                try {
                    return messageStream.nextAvailable();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("Didn't expect to be interrupted");
                }
            }
        };

        Spliterator<M> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED);
        Stream stream = StreamSupport.stream(spliterator, false);
        return (Stream)stream.onClose(messageStream::close);
    }