1
votes

I have a Sftp inbound flow and I got the session information from DefaultSftpSessionFactory. But I need to implement mulitple session information dynamically which I will get from database table. That means I have multiple number of Sftp server details that I need to implement in my integration flow. Now I have done with file transfer from single source to single destination but I need to implement multiple source to multiple destination. So can any one provide some pointer on this.

This is my session Factory...Here I have a single Sftp server information but how to configure multiple server details.

    @Autowired
    private DefaultSftpSessionFactory sftpSessionFactory;

    @Bean
    public DefaultSftpSessionFactory sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(
                true);
        factory.setHost("111.11.12.143");
        factory.setPort(22);
        factory.setUser("sftp");
        factory.setPassword("*******");         
        return factory;
    }

and this is my Sftp Inbound flow..

    @Bean
    public IntegrationFlow sftpInboundFlow() {
    System.out.println("enter sftpInboundFlow....."
            + sftpSessionFactory.getSession());     
    return IntegrationFlows
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true).remoteDirectory(remDir)
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase()")
                    .localDirectory(new File(localDir))
                    .remoteFileSeparator("/"),
                    new Consumer<SourcePollingChannelAdapterSpec>() {
                        @Override
                        public void accept(SourcePollingChannelAdapterSpec e) {
                            e.id("sftpInboundAdapter")
                                    .autoStartup(true)
                                    .poller(Pollers.fixedRate(1000)
                                            .maxMessagesPerPoll(1));
                        }
                    })
            //.channel(MessageChannels.queue("sftpInboundResultChannel"))
                    .channel(sftpInboundResultChannel())
            .get();
}

As suggested by Gary I am editing my post....

Hi Gary, I am taking the reference from Github dynamic FTP example.

Through the ChannelResolver class I need to call my above DSL class. and set the dynamic value in context property without using XML.

In my ChannelResolver class I want some thing like this

StandardEnvironment env = new StandardEnvironment();
Properties props = new Properties();
props.setProperty("inbound.host", host);    //I am getting the value of 'host' from a DB table.
PropertiesPropertySource pps = new PropertiesPropertySource("sftpprop", props);
env.getPropertySources().addLast(pps);
context.setEnvironment(env); 

And my DSL class I need to use like this.
@Value("${inbound.host}")
private String host;

So in this way can I set dynamic value for String 'host' ? 

I am editing my original post...........

In my Outbound dynamic resolver class I am doing like this


    StandardEnvironment env = new StandardEnvironment();
    Properties props = new Properties();        
    props.setProperty("outbound.host", host);
    props.setProperty("outbound.port", String.valueOf(port));
    props.setProperty("outbound.user", user);
    props.setProperty("outbound.password", password);
    props.setProperty("outbound.remote.directory", remoteDir);
    props.setProperty("outbound.local.directory", localDir);        

    PropertiesPropertySource pps = new PropertiesPropertySource("ftpprops", props);
    env.getPropertySources().addLast(pps);
    ctx.setEnvironment(env);


And this is my dsl class....

@Autowired
private DefaultSftpSessionFactory sftpSessionFactory;

@Bean
public DefaultSftpSessionFactory sftpSessionFactory(@Value("${outbound.host}") String host, @Value("${outbound.port}") int port,
        @Value("${outbound.user}") String user, @Value("${outbound.password}") String password
        ) {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(host);
    factory.setPort(port);
    factory.setUser(user);
    factory.setPassword(password);      
    return factory;
}


@Bean
public IntegrationFlow fileInboundFlow(@Value("${outbound.local.directory}") String localDir)
{
    return IntegrationFlows
            .from(Files.inboundAdapter(new File(localDir)),
                    new Consumer<SourcePollingChannelAdapterSpec>() {

                        @Override
                        public void accept(SourcePollingChannelAdapterSpec e) {
                            e.autoStartup(true).poller(
                                    Pollers.fixedDelay(5000)
                                            .maxMessagesPerPoll(1));
                        }
                    })
                    .channel(sftpSendChannel())
                    .get();
}

@Bean
public IntegrationFlow sftpOutboundFlow(@Value("${outbound.remote.directory}") String remDir) {    
    return IntegrationFlows
            .from(sftpSendChannel())
            .handle(Sftp.outboundAdapter(this.sftpSessionFactory)                       
                    .useTemporaryFileName(false)
                    .remoteDirectory(remDir))
                    .get();
}

@Bean
public MessageChannel sftpSendChannel() {
    return new DirectChannel();
}

@Bean
public static PropertySourcesPlaceholderConfigurer configurer1() {
    return new PropertySourcesPlaceholderConfigurer();      
}


And this the error log from console...

Aug 03, 2015 7:50:25 PM org.apache.catalina.core.StandardContext listenerStart SEVERE: Exception sending context initialized event to listener instance of class org.springframework.web.context.ContextLoaderListener org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'sftpOutBoundDsl': Injection of autowired dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Could not autowire field: private org.springframework.integration.sftp.session.DefaultSftpSessionFactory com.tcs.iux.ieg.sftp.dynamic.SftpOutBoundDsl.sftpSessionFactory; nested exception is java.lang.IllegalArgumentException: Could not resolve placeholder 'outbound.host' in string value "${outbound.host}" at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:334) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1204) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:538) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:302) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:229) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:298) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:725) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:757) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:480) at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:403) at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:306) at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:106) at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:4973) at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5467) at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150) at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1559) at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1549) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: org.springframework.beans.factory.BeanCreationException: Could not autowire field: private org.springframework.integration.sftp.session.DefaultSftpSessionFactory com.tcs.iux.ieg.sftp.dynamic.SftpOutBoundDsl.sftpSessionFactory; nested exception is java.lang.IllegalArgumentException: Could not resolve placeholder 'outbound.host' in string value "${outbound.host}" at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:555) at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:87) at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessPropertyValues(AutowiredAnnotationBeanPostProcessor.java:331) ... 22 more Caused by: java.lang.IllegalArgumentException: Could not resolve placeholder 'outbound.host' in string value "${outbound.host}" at org.springframework.util.PropertyPlaceholderHelper.parseStringValue(PropertyPlaceholderHelper.java:174) at org.springframework.util.PropertyPlaceholderHelper.replacePlaceholders(PropertyPlaceholderHelper.java:126) at org.springframework.core.env.AbstractPropertyResolver.doResolvePlaceholders(AbstractPropertyResolver.java:204) at org.springframework.core.env.AbstractPropertyResolver.resolveRequiredPlaceholders(AbstractPropertyResolver.java:178) at org.springframework.context.support.PropertySourcesPlaceholderConfigurer$2.resolveStringValue(PropertySourcesPlaceholderConfigurer.java:175) at org.springframework.beans.factory.support.AbstractBeanFactory.resolveEmbeddedValue(AbstractBeanFactory.java:800) at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:917) at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:904) at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:815) at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:743) at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:466) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1113) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1008) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:505) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:476) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:302) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:229) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:298) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:193) at org.springframework.beans.factory.support.DefaultListableBeanFactory.findAutowireCandidates(DefaultListableBeanFactory.java:1088) at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1006) at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:904) at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:527) ... 24 more

1

1 Answers

1
votes

It is currently not supported.

We have an open JIRA to add support for dynamic server selection but it's unlikely to be done in time for the upcoming 4.2 release.

You could work around it by writing your own custom delegating session factory that uses some criteria (e.g. a ThreadLocal) to determine which delegate factory to use.

EDIT:

As with the XML, you need a PropertySourcesPlaceholderConfigurer bean.

You should also use factory-method injection because the @Configuration class is created too early to have the @Value injected...

@Configuration
public class FooConfig {

    @Bean
    public DefaultSftpSessionFactory factory(
            @Value("${inbound.host}") String host, 
            @Value("${inbound.port}") int port) {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(host);
        sf.setPort(port);
        return sf;
    }

    @Bean
    public PropertySourcesPlaceholderConfigurer configurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

}

.

public class Testing {

    @Test
    public void test() {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(FooConfig.class);
        StandardEnvironment env = new StandardEnvironment();
        Properties props = new Properties();
        props.setProperty("inbound.host", "bar");
        props.setProperty("inbound.port", "23");
        PropertiesPropertySource pps = new PropertiesPropertySource("sftpprop", props);
        env.getPropertySources().addLast(pps);
        context.setEnvironment(env);
        context.refresh();
        DefaultSftpSessionFactory sessionFactory = context.getBean(DefaultSftpSessionFactory.class);
        assertEquals("bar", TestUtils.getPropertyValue(sessionFactory, "host"));
        context.close();
    }

}

By the way, the delegating session factory will be in 4.2 after all.

EDIT2:

You can avoid the early instantiation of the config class and use global @Value injection, as long as you make the PSPC bean static...

@Configuration
public class FooConfig {

    @Value("${foo}")
    public String foo;

    @Bean
    public String earlyFoo() {
        return this.foo;
    }

    @Bean
    public String foo(@Value("${foo}") String foo) {
        return foo;
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer configurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

}

In this case, earlyFoo is populated as expected.