1
votes

WHAT IAM TRYING TO ACHIEVE I have a REST Call from UI that calls to add a User. So, the user will have to do a async queue (this is a constraint), but then wait for the response queue for a configured time and process it before the result is sent back to UI. Is the queue comes back with empty reference number, then i have to delete the user record and throw exception saying user is invalid. If the response comes back with valid reference (or if the timeout happens), then i assume it as valid and return success.

I have a application where I send off a queue message to get the referenceNumber for my User Object. And then wait for the queue response before responding back to REST Call. But, i have to wait for configured time for the queue response to come back.

UserManagerImpl

// REST CALL to persist
public User Persist(User user) {
...
...
 // Building the message for sending to QUEUE
 UserEnvelopeV1_0 userEnvelope =buildUserEnvelope(user);
// This is the place i send the queue message
userQueueClient.send(userEnvelope);
// Update Request time
updateRequestDetails(user.getUserId);
// This is the call i am going retry
boolean userValid = userRetryTemplate.doUserReferenceRetry(userId);
if (!userValid ) {
                  //remove User Object
                  throw Exception
                }
...
}

// update the request time for reference Number
private void updateRequestDetails(String userId) {
 User user = userRepository.findById(userId);
        if (user != null) {
            user.setRefRequestDateItem(DateHelper.createXMLGregorianCalendar());
            userRepository.saveAndFlush(user);
        }

public void updateReference(String userId, String referenceNumber) {

        User user = userRepository.findById(userId);
        if (user != null) {
            user.setReference(referenceNumber);
            user.setResponseDate(DateHelper.createXMLGregorianCalendar());
            userRepository.saveAndFlush(user);
        }
    }

UserQueueClient :

@Component
public class UserQueueClient {



    @JmsListener(id = "#{T(java.util.UUID).nameUUIDFromBytes('${in.res}",
            destination = "${in.res}", containerFactory = "containerFactory")
    public void receive(Message message, UserEnvelopeV1_0 envelope) throws{


        try {
            String userId = envelope.getHeader().getMessageIdentification().getUserId();
 ApplicationInformationStructure applicationInformation = envelope.getBody().getApplicationInformation();

if(CollectionUtils.isNotEmpty(applicationInformation.getApplicationInformationResult())) {
          String referenceNumber = applicationInformation.getApplicationInformationResult().getRefNumber();      

                userManager.updateReference(userId, referenceNumber);
            }

        } catch (Exception e) {
            //
        }
    }

    @Transactional(propagation = Propagation.MANDATORY)
    public void send(UserEnvelopeV1_0 sarsSoapEnvelope) throws JMSException {


        envelope.setHeader();

        Message message = sendToQueue(envelope, requestQueue, responseQueue,
                userId);

        applicationEventPublisher.publishEvent(new MessageLogEvent("USER_GET_REF_NUMBER", message, MessageType.XML,
                requestQueue, MessageDirection.SEND, true, false, new Date(), userId));

    }
}

UserRetryTemplate



@Component
public class UserRetryTemplate {


    @Value("${retry.max.attempts:5}")
    private int maxAttempts;

    @Value("${response.waiting.time.in.seconds:60}")
    private long maxDelay;

    @Autowired
    private UserRepository userRepository;

    private static final long INITIAL_INTERVAL = 2000L;

    public RetryTemplate retryTemplate() {

        // Max timeout in milliseconds
        long maxTimeout = maxDelay*1000;

        //double multiplier = (maxTimeout - INITIAL_INTERVAL)/((maxAttempts-2)*6000);

        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(maxAttempts);


        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(maxTimeout/(maxAttempts-1));

        RetryTemplate template = new RetryTemplate();
        template.setRetryPolicy(retryPolicy);
        template.setBackOffPolicy(backOffPolicy);
        return template;
    }

    public boolean doUserReferenceRetry(String userId) {
        boolean isUserReferenceValid = true;
        try {
            boolean isValidUser = retryTemplate().execute(context -> {
                logger.info("Attempted {} times", context.getRetryCount());
                User user = userRepository.findById(userId);
                logger.info("User Retry :" + user);

                if (user.getResponseDateItem() == null || user.getReferenceNumber == null) {
                    logger.info("response not yet received");
                    throw new IllegalStateException("User Response not yet received");
                }
                if (user.getReferenceNumber != null)) {
                    return true;
                }
                throw new IllegalStateException("Response not yet received");
            });
            return isUserReferenceValid ;
        } catch (IllegalArgumentException e) {

        }
        return true;
    }

}

So i implemented a logic, where i will send the queue message and do a Spring retry (for configured time) to check the database if the referenceNumber is updated in DB. Also, when the queue response comes back, I will update the DB with the referenceNumber.

But, when i implemented the above logic, the spring retry is keep on retrying till the configured time, but my Spring application is not processing any response Queues. Is there a way the Spring application can run both the processes in parallel.

The problem is if i remove the spring retry mechanism, the response queue is processing my response and updating the User record with reference number.

But when i added the retry logic, then the response queue is no longer processing my queue.

1

1 Answers

0
votes

I found belowline confusing.

"where i will send the queue message and do a Spring retry (for configured time) to check the database if the referenceNumber is updated in DB. Also, when the queue response comes back, I will update the DB with the referenceNumber."

In one line you are saying that you are waiting for reference number to be updated and in other line you are saying that you are updating database. Who is producer here ? are there two different thread ? Producer and Consumer in this case you.

If you want to block current thread for configured time Can you consider Blocking Queue with poll(long timeout, TimeUnit unit) method

poll(long timeout, TimeUnit unit) – retrieves and removes the head of the queue, waiting up to the specified wait time if necessary for an element to become available. Returns null after a timeout

Please edit question with sufficient details.