I have implemented multi threading to perform some operation in jira once i consume the message from rabbitmq. I am using spring amqp (version 1.6.1)
Once the thread catches the exception, i am setting status as an error and output object i am referring in future. While sending this output object to the queue. I am facing above execption
Code :
Connection Factory:
@Configuration
@PropertySources({ @PropertySource("classpath:application.properties") })
public class RabbitMQConfiguration {
@Autowired
private Environment environment;
@Bean
public ConnectionFactory connectionFactory() {
// TODO make it possible to customize in subclasses.
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(environment.getProperty("bip.rabbitmq.url"));
connectionFactory.setUsername(environment.getProperty("bip.rabbitmq.username"));
connectionFactory.setPassword(environment.getProperty("bip.rabbitmq.password"));
return connectionFactory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* @return the admin bean that can declare queues etc.
*/
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean(name = "jiraQueueListenerContainerFactory")
public SimpleRabbitListenerContainerFactory jiraQueueListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setReceiveTimeout(10L);
return factory;
}
}
MessageHandler :
@Component
@PropertySources({ @PropertySource("classpath:application.properties") })
public class JiraMessageHandler {
@Autowired
private RabbitTemplate rabbitTemplate;
private static ExecutorService executor = Executors.newFixedThreadPool(Constants.THREAD_SIZE);
private static Logger logger = LogManager.getLogger(JiraMessageHandler.class);
@RabbitListener(containerFactory = "jiraQueueListenerContainerFactory", queues = Constants.QUEUE_NAME)
public void handleMessage(HashMap<String, Object> jiraMessage) {
logger.info(jiraMessage.toString() + System.currentTimeMillis());
logger.info("Jira Message Handler");
BaseServerAdapter jiraProcessingAdapter = new JiraProcessingAdapter();
Future future = executor.submit(jiraProcessingAdapter);
JiraAdapterOutput jiraAdapterOutput = new JiraAdapterOutput();
Future future = executor.submit(jiraProcessingAdapter);
jiraAdapterOutput = (JiraAdapterOutput) future.get();
try {
jiraAdapterOutput = (JiraAdapterOutput) future.get();
if (jiraAdapterOutput.getOutputMap().get("activityStatus") == "SUCCESS") {
logger.info("Successfully Executed Jira ::: " + new Date() + "::: "
+ jiraAdapterOutput.getOutputMap().get("jiraId"));
rabbitTemplate.convertAndSend(Constants.ADAPTER_OUTPUT_QUEUE, senderMap);
}else if (jiraAdapterOutput.getOutputMap().get("activityStatus").equalsIgnoreCase("FAIL")) {
logger.info("Successfully Executed Jira ::: " + new Date() + "::: "
+ jiraAdapterOutput.getOutputMap().get("jiraId"));
sendMessageForProcessingToBIP(senderMap);
}
private boolean sendMessageForProcessingToBIP(HashMap<String, ExchangeDTO> senderMap) {
try {
rabbitTemplate.convertAndSend(Constants.WFM_ERROR_QUEUE, senderMap);
return true;
} catch (Exception e) {
**logger.info("Message sending failed, try again:::::::" + e.getMessage());**
}
return false;
}
It is showing "Application Context is closed and the ConnectionFactory can no longer create connections."
Anything wrong I am doing. I referred here as well : https://jira.spring.io/browse/AMQP-546