0
votes

We are having an usecase wherein each aggregate root should have different eventstores. We have used the following configuration where currently , we have only one event-store configured as below

@Configuration
@EnableDiscoveryClient
public class AxonConfig {
       private static final String DOMAIN_EVENTS_COLLECTION_NAME = "coll-capture.domainEvents";
      //private static final String DOMAIN_EVENTS_COLLECTION_NAME_TEST = 
      //"coll-capture.domainEvents-test";


       @Value("${mongodb.database}")
       private String databaseName;

       @Value("${spring.application.name}")
       private String appName;

       @Bean
        public RestTemplate restTemplate() {
        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
        HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new 
        HttpComponentsClientHttpRequestFactory(httpClient);

        return new RestTemplate(clientHttpRequestFactory);
    }

        @Bean
        @Profile({"uat", "prod"})
        public CommandRouter springCloudHttpBackupCommandRouter(DiscoveryClient discoveryClient,
                                                            Registration localInstance,
                                                            RestTemplate restTemplate,
                                                            @Value("${axon.distributed.spring- 
        cloud.fallback-url}") String messageRoutingInformationEndpoint) {
        return new SpringCloudHttpBackupCommandRouter(discoveryClient,
                localInstance,
                new AnnotationRoutingStrategy(),
                serviceInstance -> appName.equalsIgnoreCase(serviceInstance.getServiceId()),
                restTemplate,
                messageRoutingInformationEndpoint);
    }

         @Bean
         public Repository<TestEnquiry> testEnquiryRepository(EventStore eventStore) {
          return new EventSourcingRepository<>(TestEnquiry.class, eventStore);
    }

         @Bean
         public Repository<Test2Enquiry> test2enquiryRepository(EventStore eventStore) {
           return new EventSourcingRepository<>(Test2Enquiry.class, eventStore);
    }

    
         @Bean
          public EventStorageEngine eventStorageEngine(MongoClient client) {
        MongoTemplate mongoTemplate = new DefaultMongoTemplate(client,  databaseName)
                .withDomainEventsCollection(DOMAIN_EVENTS_COLLECTION_NAME);
        return new MongoEventStorageEngine(mongoTemplate);
        }
    

}

Now , We want to configure "DOMAIN_EVENTS_COLLECTION_NAME_TEST"(just for example) as well in EventStorageEngine. How we can achieve the same support for multiple event-stores and select the tracking process as which collection they should be part of

1
Just curious, but why do you want to have a distinct event store per aggregate? - Steven

1 Answers

0
votes

If you are going the route of segregating the event streams, then combining them from an event handling perspective could become a necessity indeed. Especially when having several bounded contexts, segregating the event streams into distinct storage solutions is reasonable.

If you want to define which [message source / event store] is used by a TrackingEventProcessor, you will have to deal with the EventProcessingConfigurer. More specifically, you should invoke the EventProcessingConfigurer#registerTrackingEventProcessor(String, Function<Configuration, StreamableMessageSource<TrackedEventMessage<?>>>) method. The first String parameter is the name of the processor you want to configure as being "tracking". The second parameter defines a Function which gives you the message source to be used by this TrackingEventProcessor (TEP). It is here where you should provide the event store you want this TEP to ingest events from.

Pairing them up at a later stage could also occur of course, which is also supported by Axon Framework. This boils down to a specific form of StreamableMessageSource implementation. More specifically, you can use the MultiStreamableMessageSource, where you can connect any number of StreamableMessageSources together.

Note that Axon's EmbeddedEventStore is in essence an implementation of a StreamableMessageSource. Once the MultiStreamableMessageSource, you will have to specify it as the messageSource for your TrackingEventProcessors of course.

Last note, know that this solution can only be used when you are using TrackingEventProcessors, as those are the only Event Processors provided by Axon ingesting a StreamableMessageSource as the source for it's events.