0
votes

I have implemented the following scenario:

  1. A queueChannel holding Messages in form of byte[]
  2. A MessageHandler, polling the queue channel and uploading files over sftp
  3. A Transformer, listening to errorChannel and sending extracted payload from the failed message back to the queueChannel (thought of as an error handler to handle failed messages so nothing gets lost)

If the sftp server is online, everything works as expected.

If the sftp server is down, the errormessage, that arrives as the transformer is:

org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session

The transformer cannot do anything with this, since the payload's failedMessage is null and throws an exception itself. The transformer looses the message.

How can I configure my flow to make the tranformer get the right message with the corresponding payload of the unsucsesfully uploaded file?

My Configuration:

  @Bean
  public MessageChannel toSftpChannel() {
    final QueueChannel channel = new QueueChannel();
    channel.setLoggingEnabled(true);
    return new QueueChannel();
  }

  @Bean
  public MessageChannel toSplitter() {
    return new PublishSubscribeChannel();
  }

  @Bean
  @ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
  public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
    handler.setFileNameGenerator(message -> {
      if (message.getPayload() instanceof byte[]) {
        return (String) message.getHeaders().get("name");
      } else {
        throw new IllegalArgumentException("byte[] expected in Payload!");
      }
    });
    return handler;
  }

  @Bean
  public SessionFactory<LsEntry> sftpSessionFactory() {
    final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);

    final Properties jschProps = new Properties();
    jschProps.put("StrictHostKeyChecking", "no");
    jschProps.put("PreferredAuthentications", "publickey,password");
    factory.setSessionConfig(jschProps);

    factory.setHost(sftpHost);
    factory.setPort(sftpPort);
    factory.setUser(sftpUser);
    if (sftpPrivateKey != null) {
      factory.setPrivateKey(sftpPrivateKey);
      factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
    } else {
      factory.setPassword(sftpPasword);
    }
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<>(factory);
  }

  @Bean
  @Splitter(inputChannel = "toSplitter")
  public DmsDocumentMessageSplitter splitter() {
    final DmsDocumentMessageSplitter splitter = new DmsDocumentMessageSplitter();
    splitter.setOutputChannelName("toSftpChannel");
    return splitter;
  }

  @Transformer(inputChannel = "errorChannel", outputChannel = "toSftpChannel")
  public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {

    Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
      .getFailedMessage();
    return MessageBuilder.withPayload(failedMessage)
                         .copyHeadersIfAbsent(failedMessage.getHeaders())
                         .build();
  }

  @MessagingGateway 
  public interface UploadGateway {

    @Gateway(requestChannel = "toSplitter")
    void upload(@Payload List<byte[]> payload, @Header("header") DmsDocumentUploadRequestHeader header);
  }

Thanks..

Update

@Bean(PollerMetadata.DEFAULT_POLLER)
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
  PollerMetadata poller() {
    return Pollers
      .fixedRate(5000)
      .maxMessagesPerPoll(1)
      .receiveTimeout(500)
      .taskExecutor(taskExecutor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory())
      .get();
  }

  @Bean
  @ServiceActivator(inputChannel = "toMessageStore", poller = @Poller(PollerMetadata.DEFAULT_POLLER))
  public BridgeHandler bridge() {
    BridgeHandler bridgeHandler = new BridgeHandler();
    bridgeHandler.setOutputChannelName("toSftpChannel");
    return bridgeHandler;
  }
1
Sorry, I thought you were polling an inbound channel adapter. Turn on DEBUG logging and follow the message flow. If you still can't figure it out, post the log someplace.Gary Russell
You also probably shouldn't be using a QueueChannel if you are worried about message loss.Gary Russell
@gary Hello Gary, thanks for your reply... I am mostly worried that the failed upload will result in file loss.. in my understanding I have to queue the files somehow/somewhere to be able to retry the upload... Or is there a better way for handling such use cases? Here is the log linkRok Purkeljc
Please see my answer.Gary Russell

1 Answers

1
votes

The null failedMessage is a bug; reproduced INT-4421.

I would not recommend using a QueueChannel for this scenario. If you use a direct channel, you can configure a retry advice to attempt redeliveries. when the retries are exhausted (if so configured), the exception will be thrown back to the calling thread.

Add the advice to the SftpMessageHandler's adviceChain property.

EDIT

You can work around the "missing" failedMessage by inserting a bridge between the pollable channel and the sftp adapter:

@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public BridgeHandler bridge() {
    BridgeHandler bridgeHandler = new BridgeHandler();
    bridgeHandler.setOutputChannelName("toRealSftpChannel");
    return bridgeHandler;
}

@Bean
@ServiceActivator(inputChannel = "toRealSftpChannel")
public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression("foo"));
    handler.setFileNameGenerator(message -> {
        if (message.getPayload() instanceof byte[]) {
            return (String) message.getHeaders().get("name");
        }
        else {
            throw new IllegalArgumentException("byte[] expected in Payload!");
        }
    });
    return handler;
}