I want to aggregate responses coming from 3 different endpoints(@ServiceActivator) and persist aggregated response to DB.
I am getting following exception
org.hibernate.LazyInitializationException: failed to lazily initialize a collection of role: c.b.bean.jpa.PersonEntity.listsOfEmails, could not initialize proxy - no Session
How to make message flow transaction aware? Or I am missing somthing?
Following is code snippet,
Configuration
@Configuration
@EnableIntegration
@ComponentScan(basePackages={"integration.endpoint", "integration.sync"})
@IntegrationComponentScan(basePackages={"integration.gateway"})
public class InfrastructureConfiguration {
@Bean
@Description("Entry to the messaging system through the gateway.")
public MessageChannel requestChannel(){
return pubSubChannel();
}
@Bean
@Description("Sends transformed message to outbound channel.")
public MessageChannel invocationChannel(){
return pubSubChannel();
}
@Bean
@Description("Sends handler message to aggregator channel.")
public MessageChannel aggregatorChannel(){
return pubSubChannel();
}
@Bean
@Description("Sends handler message to response channel.")
public MessageChannel responseChannel(){
return pubSubChannel();
}
private PublishSubscribeChannel pubSubChannel() {
PublishSubscribeChannel pubSub = new PublishSubscribeChannel(executor());
pubSub.setApplySequence(true);
return pubSub;
}
private Executor executor() {
return Executors.newFixedThreadPool(10);
}
}
Starting Gateway
@MessagingGateway(name="entryGateway", defaultRequestChannel="requestChannel")
public interface IntegrationService {
String initiateSync(AnObject obj);
}
Message Builder: It transforms the message, by fetching an entity and set that as a property to message and message is send to the channel. Later this entity used by @Autowired serives in @ServiceActivator( 3 Endpoints). This Entity is lazily initialized for its associations.
@Component
public class MessageBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageBuilder.class);
@Autowired
private ODao dao;
@Transformer(inputChannel="requestChannel", outputChannel="invocationChannel")
public OMessage buildMessage(Message<AnObject> msg){
LOGGER.info("Transforming messages for ID [{}]", msg.getPayload().getId());
OMessage om = new OMessage(msg.getPayload());
om.buildMessage(dao);
return om;
}
}
Endpoint-1
@Component
public class Handler1 {
private static final Logger LOGGER = LoggerFactory.getLogger(Handler1.class);
@Autowired
private service1 Service1;
@Override
@ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
public ResponseMessage handle(Message<OMessage> msg) {
OMessage om = msg.getPayload();
ResponseMessage rm = null;
if(map.get("toProceed")){
LOGGER.info("Handler1 is called");
rm = service1.getResponse(om);
}else{
LOGGER.info("Handler1 is not called");
}
return rm;
}
}
Endpoint-2
@Component
public class Handler2 {
private static final Logger LOGGER = LoggerFactory.getLogger(Handler2.class);
@Autowired
private service2 Service2;
@Override
@ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
public ResponseMessage handle(Message<OMessage> msg) {
OMessage om = msg.getPayload();
ResponseMessage rm = null;
if(map.get("toProceed")){
LOGGER.info("Handler2 is called");
rm = service2.getResponse(om);
}else{
LOGGER.info("Handler2 is not called");
}
return rm;
}
}
Endpoint-3
@Component
public class Handler3 {
private static final Logger LOGGER = LoggerFactory.getLogger(Handler3.class);
@Autowired
private service3 Service3;
@Override
@ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
public ResponseMessage handle(Message<OMessage> msg) {
OMessage om = msg.getPayload();
ResponseMessage rm = null;
if(map.get("toProceed")){
LOGGER.info("Handler3 is called");
rm = service3.getResponse(om);
}else{
LOGGER.info("Handler3 is not called");
}
return rm;
}
}
Aggregator
@Component
public class MessageAggregator {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageAggregator.class);
@Aggregator(inputChannel="aggregatorChannel", outputChannel="responseChannel")
public Response aggregate(List<ResponseMessage> resMsg){
LOGGER.info("Aggregating Responses");
Response res = new Response();
res.getResponse().addAll(resMsg);
return res;
}
@ReleaseStrategy
public boolean releaseChecker(List<Message<ResponseMessage>> resMsg) {
return resMsg.size() ==3;
}
@CorrelationStrategy
public ResponseMessage corelateBy(ResponseMessage resMsg) {
LOGGER.info("CorrelationStrategy: message payload details {}", resMsg);
return resMsg;
}
}
@Transactional
on methods where you are performing db related stuff? – sol4me