1
votes

I have an application (Scala 2.10.3, Akka 2.3.1, Camel 2.13.0) which subscribes to a JMS Topic, and is notified via JMS messages when specific files are available for download. Each JMS message includes the name+path of a file which is available for collection via SFTP.

I then want to be able to fetch the files via SFTP, but only fetch files for which we have received a JMS message (to avoid problems where we might fetch a file which is in-progress of being written).

I want a solution which fits in with Akka Camel and the Consumer model. I've read through the Camel options for file2 and ftp2 which are used for SFTP endpoints, but I need help with:

  • how can I define a class/object which can be referenced in the endpointUri string via &filter=... parameter? I would want to be able to update the filter object so that every time the Consumer polls for a list of files, the updated filter list is applied.

  • how can I define a custom IdempotentRepository, to allow cache sizes larger than the default of 1000?

My SFTP Consumer Actor currently looks like this (with some values redacted ...):

class SftpConsumer extends Actor with ActorLogging with Consumer {
  val host = ...
  val username = ...
  val keyFile = ...
  def endpointUri: String = s"sftp://${host}?username=${username}&privateKeyFile=${keyFile}&idempotent=true"
1

1 Answers

3
votes

The filter and idempotentRepository parameters need to refer to objects (by name) in the registry.

For the filter, you need to create an object of a class which extends org.apache.camel.component.file.GenericFileFilter.

For the filter and/or idempotentRepository, you need to create a registry, assign the registry to the Camel context, and register these objects to the registry e.g.

// define a class which extends GenericFileFilter[T], and which
// filters for files which are explicitly allowed via include()
class MyFileFilter extends GenericFileFilter[File] {
  var include = Set[String]()
  def accept(file: GenericFile[File]) = include.contains(file.getFileName)
  def include(filename: String) = include = include + filename
  def exclude(filename: String) = include = include - filename
}

// Create a filter and a registry, add a mapping for the file filter to
// the registry, and assign the registry to the camel context
val myFileFilter = new MyFileFilter()
val simpleRegistry = new SimpleRegistry()
simpleRegistry.put("myFilter", myFileFilter )
camel.context.setRegistry(simpleRegistry);

// create a memory-based idempotent repository with a custom cache size
val cacheSize = 2500
val myRepository = MemoryIdempotentRepository.memoryIdempotentRepository(cacheSize)
simpleRegistry.put("myRepository", myRepository)

// adjust the endpointUri to include the &filter= and &idempotentRepository= parameters
def endpointUri: String = s"sftp://${host}?username=${username}...&idempotent=true&idempotentRepository=#myRepository&filter=#myFilter"