2
votes

I am using Inbound Adapter using Java DSL to poll pdf files from the SFTP server. I have a use case where after fetching the pdf file application will fetch the config file present in CSV format with the same name on the SFTP server. After fetching the config file, the application will process the original pdf file using the properties defined in the config file and upload it back to SFTP server using the outbound adapter.

I am facing problems with fetching config files within the handler on the same thread using the outbound gateway.

Here is my code :

Register Integration Flows:

  for (String client : clientsArr) {
      this.flowContext.registration(getInboundIntegrationFlow(client)).register();
  }

  this.flowContext.registration(getOutboundIntegrationFlow()).register();
  this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();

Inbound Adapter Integration Flow:


  @Autowired
  private SftpPdfMessageHandler messageHandler;

  private IntegrationFlow getInboundIntegrationFlow(String client) {

    String remoteDirectory = getRemoteDirectory(client);
    String localDirectory = getLocalDirectory(client);
    String inboundAdapterId = getInboundAdapterId(client);

    return IntegrationFlows
        .from(Sftp.inboundAdapter(sftpSessionFactory())
                .preserveTimestamp(true)
                .remoteDirectory(remoteDirectory)
                .autoCreateLocalDirectory(true)
                .localDirectory(new File(localDirectory))
                .maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
                .filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
                .deleteRemoteFiles(true),
            e -> e.id(inboundAdapterId)
                .autoStartup(true)
                .poller(Pollers
                    .fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()), TimeUnit.SECONDS)
                    .receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
                    .maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
                ))
        .handle(inBoundHandler())
        .get();
  }

  public MessageHandler inBoundHandler() {
    return message -> {
      File file = (File) message.getPayload();
      messageHandler.handleMessage(file);
    };
  }

Outbound Adapter Integration Flow:

  private IntegrationFlow getOutboundIntegrationFlow() {

    return IntegrationFlows.from("sftpOutboundChannel")
        .handle(Sftp.outboundAdapter(sftpSessionFactory(), FileExistsMode.FAIL)
            .remoteDirectoryExpression(String.format("headers['%s']", FileHeaders.REMOTE_DIRECTORY))).get();
  }

  @Bean("sftpOutboundChannel")
  public MessageChannel sftpOutboundChannel() {
    return new DirectChannel();
  }

SFTP Message Handler:

  @Async("sftpHandlerAsyncExecutor")
  public void handleMessage(File originalFile) {

    File configFile = fetchConfigFile();

    /*
      process original file and store processed file in output file path on local directory
     */
      
    boolean success = uploadFileToSftpServer(outputFilePath, client, entity);

    if (success) {
      deleteFileFromLocal(originalFile);
    }
  }

Outbound Gateway GET Integration Flow:

  private IntegrationFlow sftpGatewayGetIntegrationFlow() {
    return IntegrationFlows.from("sftpGetInputChannel")
        .handle(Sftp.outboundGateway(sftpSessionFactory(),
            AbstractRemoteFileOutboundGateway.Command.GET, "payload")
            .options(AbstractRemoteFileOutboundGateway.Option.DELETE,
                AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
            .localDirectoryExpression(String.format("headers['%s']", Constants.HEADER_LOCAL_DIRECTORY_NAME))
            .autoCreateLocalDirectory(true))
        .channel("nullChannel")
        .get();
  }

  @Bean("sftpGetInputChannel")
  public MessageChannel sftpGetInputChannel() {
    return new DirectChannel();
  }

messageHandler.handleMessage() method is called in async (with ThreadPoolTaskExecutor) which internally fetch config file using the outbound gateway. But I couldn't find a single channel where I can send and receive a message payload in the same thread. I found MessagingTemplate in spring docs but couldn't find a way to connect this with my outbound gateway integration flow.

sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel, new GenericMessage<>("/dir/file.csv", headers)) gives "Dispatcher has no subscribers for channel" exception with DirectChannel().

I am looking for a solution where I can fetch required file from the server with any of the following ways :

  • Integrate MessagingTemplate with IntegrationFlow (if possible) using an appropriate channel.
  • Some chaining of message handlers in the inbound adapter flow where after polling the original file it will fetch another file using sftp outbound gateway and then call final handler with both objects (original file and config file). I am trying to achieve a similar thing using the custom code above.
  • Any other way to use send and poller channels for GET command in a multi-threaded environment.

Application needs to decide the directory path on runtime while using the GET command.

1

1 Answers

1
votes

You probably need to learn what is a @MessagingGateway and how to make it to interact with channels on your outbound gateway.

See docs for more info: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#gateway

If you really would like to get a config file as a result, you should not do .channel("nullChannel"). With the gateway in hands there is going to be replyChannel header with a TemporaryReplyChannel instance populated by the gateway. Then in your code you just going to use that functional interface as an API to call.

In fact that messaging gateway uses a mentioned MessagingTemplate.sendAndReceive().