1
votes

I want to aggregate responses coming from 3 different endpoints(@ServiceActivator) and persist aggregated response to DB.

I am getting following exception

org.hibernate.LazyInitializationException: failed to lazily initialize a collection of role: c.b.bean.jpa.PersonEntity.listsOfEmails, could not initialize proxy - no Session

How to make message flow transaction aware? Or I am missing somthing?

Following is code snippet,

Configuration

@Configuration
@EnableIntegration
@ComponentScan(basePackages={"integration.endpoint", "integration.sync"})
@IntegrationComponentScan(basePackages={"integration.gateway"})
public class InfrastructureConfiguration {

    @Bean
    @Description("Entry to the messaging system through the gateway.")
    public MessageChannel requestChannel(){
        return pubSubChannel();
    }

    @Bean
    @Description("Sends transformed message to outbound channel.")
    public MessageChannel invocationChannel(){
        return pubSubChannel();
    }

    @Bean
    @Description("Sends handler message to aggregator channel.")
    public MessageChannel aggregatorChannel(){
        return pubSubChannel();
    }

    @Bean
    @Description("Sends handler message to response channel.")
    public MessageChannel responseChannel(){
        return pubSubChannel();
    }

    private PublishSubscribeChannel pubSubChannel() {
        PublishSubscribeChannel pubSub = new PublishSubscribeChannel(executor());
        pubSub.setApplySequence(true);
        return pubSub;
    }

    private Executor executor() {
        return Executors.newFixedThreadPool(10);
    }
}

Starting Gateway

@MessagingGateway(name="entryGateway", defaultRequestChannel="requestChannel")
public interface IntegrationService {
    String initiateSync(AnObject obj);
}

Message Builder: It transforms the message, by fetching an entity and set that as a property to message and message is send to the channel. Later this entity used by @Autowired serives in @ServiceActivator( 3 Endpoints). This Entity is lazily initialized for its associations.

@Component
public class MessageBuilder {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageBuilder.class);

    @Autowired
    private ODao dao;

    @Transformer(inputChannel="requestChannel", outputChannel="invocationChannel")
    public OMessage buildMessage(Message<AnObject> msg){
        LOGGER.info("Transforming messages for ID [{}]", msg.getPayload().getId());
        OMessage om = new OMessage(msg.getPayload());
        om.buildMessage(dao);
        return om;
    }
}

Endpoint-1

@Component
public class Handler1 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Handler1.class);

    @Autowired
    private service1 Service1;

    @Override
    @ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
    public ResponseMessage handle(Message<OMessage> msg) {
        OMessage om = msg.getPayload();
        ResponseMessage rm = null;
        if(map.get("toProceed")){
            LOGGER.info("Handler1 is called");
            rm = service1.getResponse(om);
        }else{
            LOGGER.info("Handler1 is not called");
        }
        return rm;
    }
}

Endpoint-2

@Component
public class Handler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Handler2.class);

    @Autowired
    private service2 Service2;

    @Override
    @ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
    public ResponseMessage handle(Message<OMessage> msg) {
        OMessage om = msg.getPayload();
        ResponseMessage rm = null;
        if(map.get("toProceed")){
            LOGGER.info("Handler2 is called");
            rm = service2.getResponse(om);
        }else{
            LOGGER.info("Handler2 is not called");
        }
        return rm;
    }
}

Endpoint-3

@Component
public class Handler3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Handler3.class);

    @Autowired
    private service3 Service3;

    @Override
    @ServiceActivator(inputChannel="invocationChannel", outputChannel="aggregatorChannel")
    public ResponseMessage handle(Message<OMessage> msg) {
        OMessage om = msg.getPayload();
        ResponseMessage rm = null;
        if(map.get("toProceed")){
            LOGGER.info("Handler3 is called");
            rm = service3.getResponse(om);
        }else{
            LOGGER.info("Handler3 is not called");
        }
        return rm;
    }
}

Aggregator

@Component
public class MessageAggregator {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageAggregator.class);

    @Aggregator(inputChannel="aggregatorChannel", outputChannel="responseChannel")
    public Response aggregate(List<ResponseMessage> resMsg){
        LOGGER.info("Aggregating Responses");
        Response res = new Response();
        res.getResponse().addAll(resMsg);
        return res;
    }

    @ReleaseStrategy
    public boolean releaseChecker(List<Message<ResponseMessage>> resMsg) {
        return resMsg.size() ==3;
    }

    @CorrelationStrategy
    public ResponseMessage corelateBy(ResponseMessage resMsg) {
        LOGGER.info("CorrelationStrategy: message payload details {}", resMsg);
        return resMsg;
    }
}
1
Did you try to use @Transactional on methods where you are performing db related stuff?sol4me
Yes, but nothing changed.VirtualLogic
You want single transaction for entire message flow? And just to be sure you have transactionManager configured somewhere in your app?sol4me
Yes, a transactionManager is configured at DAO layer, I have the same question how can I make message flow transaction aware? Can you provide some code example?VirtualLogic

1 Answers

1
votes

You might fetch reference to lazy loaded domain inside a dao layer. So when it will be used later, it will be instantiated properly without proxy. For example it might be like this snippet:

public List<PersonEntity> fetchPersonsWithMails() {
    return sessionFactory.getCurrentSession()
        .createCriteria(PersonEntity.class)
        .setFetchMode("listsOfEmails", FetchMode.JOIN)
        .list();
}