1
votes

I have a requirement where i need to look for a file continuously at unix location.Once its available then i need to parse it and convert to some json format.This needs to be done using Spring integration - DSL. Following is the piece of code I got from spring site but it shows following exception:

o.s.integration.handler.LoggingHandler: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.processFileChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 

Below is the code:

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileReadingFlow() {
         return IntegrationFlows
                  .from(s -> s.file(new File("Y://"))
                              .patternFilter("*.txt"),
                          e -> e.poller(Pollers.fixedDelay(1000)))
                  .transform(Transformers.fileToString())
                  .channel("processFileChannel")
                  .get();
        }

}

New Code:

@SpringBootApplication public class SpringIntegration {

public static void main(String[] args) {
    new SpringApplicationBuilder(SpringIntegration.class)
    .web(false)
    .run(args);
}

@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost("ip");
    factory.setPort(port);
    factory.setUser("username");
    factory.setPassword("pwd");
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<LsEntry>(factory);
}

@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
    fileSynchronizer.setDeleteRemoteFiles(false);
    fileSynchronizer.setRemoteDirectory("remote dir");
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.txt"));

    return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource ftpMessageSource() {
    SftpInboundFileSynchronizingMessageSource source =
            new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
    source.setLocalFilter(new AcceptOnceFileListFilter<File>());
    source.setLocalDirectory(new File("Local directory"));

    return source;
}

@Bean
@ServiceActivator(inputChannel = "fileInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {


        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("File Name : "+message.getPayload());

        }

    };
}

@Bean public static StandardIntegrationFlow processFileFlow() { return IntegrationFlows .from("fileInputChannel").split() .handle("fileProcessor", "process").get();

    }

@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
    AcceptOnceFileListFilter<File> filters =new AcceptOnceFileListFilter<>();

    FileReadingMessageSource source = new FileReadingMessageSource();
    source.setAutoCreateDirectory(true);
    source.setDirectory(new File("Local directory"));
    source.setFilter(filters);

    return source;
}
@Bean
public FileProcessor fileProcessor() {
    return new FileProcessor();
}


 @Bean
    @ServiceActivator(inputChannel = "fileInputChannel")
    public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
        AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
        outbound.setExpectReply(true);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }



    @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
    public interface MyGateway {
        String sendToRabbit(String data);

    }

}

FileProcessor:

public class FileProcessor {

public void process(Message<String> msg) {
    String content = msg.getPayload();
    JSONObject jsonObject ;
    Map<String, String> dataMap = new HashMap<String, String>();
    for(int i=0;i<=content.length();i++){
    String userId = content.substring(i+5,i+16);


    dataMap = new HashMap<String, String>();

    dataMap.put("username", username.trim());


    i+=290; //each record of size 290 in file
     jsonObject = new JSONObject(dataMap);
    System.out.println(jsonObject);

    }

}

}

1

1 Answers

0
votes

Your code is correct , but an exception tells you that there is need something what will read messages from the direct channel "processFileChannel".

Please, read more about different channel types in the Spring Integration Reference Manual.

EDIT

One of first class citizen in Spring Integration is MessageChannel abstraction. See EIP for more information.

The definition like .channel("processFileChannel") mean declare DirectChannel. This kind of channel means accept message on the send and perform handling directly just in send process. In the raw Java words it may sound like: call one service from another. Throw NPE if the another hasn't been autowired.

So, if you use DirectChannel for the output, you should declare somewhere a subscriber for it. I don't know what is your logic, but that is how it works and no other choice to fix Dispatcher has no subscribers for channel.

Although you can use some other MessageChannel type. But for this purpose you should read more doc, e.g. Mark Fisher's Spring Integration in Action.