I am struggling to setup fail over in tibco JMS provider. I know how to do this in case of ActiveMQ.
What I have tried is as follows
public class TibcoJMSQueueProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(FDPMetaCacheProducer.class);
private static QueueConnectionFactory factory;
private QueueConnection connection;
private QueueSession session;
@Inject
private FDPTibcoConfigDAO fdpTibcoConfigDao;
private String providerURL;
private String userName;
private String password;
@PostConstruct
public void constructProducer(){
configure();
}
private void configure() {
try {
List<FDPTibcoConfigDTO> tibcoConfigList = fdpTibcoConfigDao.getAllTibcoConfig();
if(!tibcoConfigList.isEmpty()){
FDPTibcoConfigDTO fdpTibcoConfigDTO = tibcoConfigList.get(tibcoConfigList.size()-1);
String providerURL = getProviderUrl(fdpTibcoConfigDTO);
setProviderUrl(providerURL);
String userName = fdpTibcoConfigDTO.getUserName();
String password = fdpTibcoConfigDTO.getPassword();
this.userName = userName;
this.password=password;
factory = new com.tibco.tibjms.TibjmsQueueConnectionFactory(providerURL);
}
} catch (Exception e) {
System.err.println("Exitting with Error");
e.printStackTrace();
System.exit(0);
}
}
private void setProviderUrl(String providerURL) {
this.providerURL = providerURL;
}
private String getProviderUrl(final FDPTibcoConfigDTO FDPTibcoConfigDTO) {
return TibcoConstant.TCP_PROTOCOL + FDPTibcoConfigDTO.getIpAddress().getValue() + TibcoConstant.COLON_SEPERATOR + FDPTibcoConfigDTO.getPort();
}
private Object lookupQueue(String queueName) {
Properties props = new Properties();
Object tibcoQueue = null;
props.setProperty(Context.INITIAL_CONTEXT_FACTORY, TibcoConstant.TIB_JMS_INITIAL_CONTEXT_FACTORY);
props.setProperty(Context.PROVIDER_URL, this.providerURL);
props.setProperty(TibcoConstant.TIBCO_CONNECT_ATTEMPT, "20,10");
props.setProperty(TibcoConstant.TIBCO_RECOVER_START_UP_ERROR, "true");
props.setProperty(TibcoConstant.TIBCO_RECOVER_RECONNECT_ATTEMPT, "20,10");
InitialContext context;
try {
context = new InitialContext(props);
tibcoQueue = context.lookup(queueName);
} catch (NamingException e) {
System.out.println(e.getMessage());
}
return tibcoQueue;
}
public void pushIntoQueueAsync(String message,String queueName) throws JMSException {
connection = factory.createQueueConnection(userName, password);
connection.start();
session = connection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Queue pushingQueue = (Queue)lookupQueue(queueName);
QueueSender queueSender = session.createSender(pushingQueue);
queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage sendXMLRequest = session.createTextMessage(message);
queueSender.send(sendXMLRequest);
LOGGER.info("Pushing Queue {0} ,Pushing Message : {1}", pushingQueue.getQueueName(), sendXMLRequest.getText());
}
public String pushIntoQueueSync(String message,String queueName,String replyQueueName) throws JMSException {
connection = factory.createQueueConnection(userName, password);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = (Destination)lookupQueue(queueName);
MessageProducer messageProducer = session.createProducer(destination);
session = connection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
UUID randomUUID =UUID.randomUUID();
TextMessage textMessage = session.createTextMessage(message);
String correlationId = randomUUID.toString();
//Create Reply To Queue
Destination replyDestination = (Destination)lookupQueue(queueName);
textMessage.setJMSReplyTo(replyDestination);
textMessage.setJMSCorrelationID(correlationId);
String messgeSelector = "JMSCorrelationID = '" + correlationId + "'";
MessageConsumer replyConsumer = session.createConsumer(replyDestination,messgeSelector);
messageProducer.send(textMessage, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000);
Message replayMessage = replyConsumer.receive();
TextMessage replyTextMessage = (TextMessage) replayMessage;
String replyText = replyTextMessage.getText();
LOGGER.info("Pushing Queue {0} ,Pushing Message : {1}", queueName, message);
return replyText;
}
public static QueueConnectionFactory getConnectionFactory(){
return factory;
}
}
In case of activeMQ we use
failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61616)?randomize=false&backup=true
url to handle failover as provider url in ActiveMQconnectionfactory constructor. I have seen somewhere to use multiple url in case of TIBCO like this
tcp://169.144.87.25:7222,tcp://127.0.0.1:7222
How I checked failover like this.
First at all I checked using single IP (tcp://169.144.87.25:7222
) . Message is getting sent and received normally(I have not posted TibcoJMSReceiver code).
I tried with another IP(tcp://169.144.87.25:7222
). It was working fine.
But when I tried with
final String PROVIDER_URL="tcp://169.144.87.25:7222,tcp://127.0.0.1:7222";
I started my program. But before giving input I shutdown first server. As a failover the message should be sent to other server.
But It shows me session closed Exception
.
So Am I handling failover in a correct way or is there other configuration I have to do.