1
votes

I have implemented multi threading to perform some operation in jira once i consume the message from rabbitmq. I am using spring amqp (version 1.6.1)

Once the thread catches the exception, i am setting status as an error and output object i am referring in future. While sending this output object to the queue. I am facing above execption

Code :

Connection Factory:

@Configuration
@PropertySources({ @PropertySource("classpath:application.properties") })
public class RabbitMQConfiguration {

    @Autowired
    private Environment environment;

    @Bean
    public ConnectionFactory connectionFactory() {

        // TODO make it possible to customize in subclasses.
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(environment.getProperty("bip.rabbitmq.url"));
        connectionFactory.setUsername(environment.getProperty("bip.rabbitmq.username"));
        connectionFactory.setPassword(environment.getProperty("bip.rabbitmq.password"));
        return connectionFactory;
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * @return the admin bean that can declare queues etc.
     */
    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

    @Bean(name = "jiraQueueListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory jiraQueueListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setReceiveTimeout(10L);
        return factory;
    }

}

MessageHandler :

@Component
@PropertySources({ @PropertySource("classpath:application.properties") })

public class JiraMessageHandler {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static ExecutorService executor = Executors.newFixedThreadPool(Constants.THREAD_SIZE);

    private static Logger logger = LogManager.getLogger(JiraMessageHandler.class);

    @RabbitListener(containerFactory = "jiraQueueListenerContainerFactory", queues = Constants.QUEUE_NAME)
    public void handleMessage(HashMap<String, Object> jiraMessage) {

        logger.info(jiraMessage.toString() + System.currentTimeMillis());
        logger.info("Jira Message Handler");
        BaseServerAdapter jiraProcessingAdapter = new JiraProcessingAdapter();
        Future future = executor.submit(jiraProcessingAdapter);
        JiraAdapterOutput jiraAdapterOutput = new JiraAdapterOutput();
        Future future = executor.submit(jiraProcessingAdapter);
        jiraAdapterOutput = (JiraAdapterOutput) future.get();
        try {
            jiraAdapterOutput = (JiraAdapterOutput) future.get();

            if (jiraAdapterOutput.getOutputMap().get("activityStatus") == "SUCCESS") {
                logger.info("Successfully Executed Jira ::: " + new Date() + "::: "
                        + jiraAdapterOutput.getOutputMap().get("jiraId"));
                        rabbitTemplate.convertAndSend(Constants.ADAPTER_OUTPUT_QUEUE, senderMap);
            }else if (jiraAdapterOutput.getOutputMap().get("activityStatus").equalsIgnoreCase("FAIL")) {
                logger.info("Successfully Executed Jira ::: " + new Date() + "::: "
                        + jiraAdapterOutput.getOutputMap().get("jiraId"));
                        sendMessageForProcessingToBIP(senderMap);
            }

        private boolean sendMessageForProcessingToBIP(HashMap<String, ExchangeDTO> senderMap) {
        try {
            rabbitTemplate.convertAndSend(Constants.WFM_ERROR_QUEUE, senderMap);
            return true;
        } catch (Exception e) {
            **logger.info("Message sending failed, try again:::::::" + e.getMessage());**
        }
        return false;

    }

It is showing "Application Context is closed and the ConnectionFactory can no longer create connections."

Anything wrong I am doing. I referred here as well : https://jira.spring.io/browse/AMQP-546

1

1 Answers

0
votes

The code above (in handleMessage) looks incomplete - there is no catch for the try.

You must be closing the application context elsewhere before attempting the send. You can't use beans in the context after it is closed/destroyed. The JIRA you reference was to allow such access in stop() methods in other beans - i.e. destroy the connection only after all other beans have stopped.

I suggest you turn on DEBUG logging for org.springframework and figure out why the context is being closed prematurely.