2
votes

I'm having an issue with the FTP2 component of Camel, with consumer actors living inside an Akka system.

The basic idea is to monitor an FTP directory for files, then spawn a child actor to deal with each file individually. Akka is being used to manage concurrency and reliability. The parent consumer actor polls the directory with noop=true so it doesn't do anything, then the child consumer actor is supposed to download the file, filtered with the 'include' Camel option. It's important that the downloads are concurrent, and it's important that the file isn't loaded into memory (hence the use of localWorkDirectory).

I've written a simple repro:

package camelrepro;

import java.io.InputStream;

import org.mockftpserver.core.command.Command;
import org.mockftpserver.core.command.ReplyCodes;
import org.mockftpserver.core.session.Session;
import org.mockftpserver.core.session.SessionKeys;
import org.mockftpserver.fake.FakeFtpServer;
import org.mockftpserver.fake.UserAccount;
import org.mockftpserver.fake.command.AbstractFakeCommandHandler;
import org.mockftpserver.fake.filesystem.FileEntry;
import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.testkit.JavaTestKit;

public class Main {

    public static class ParentActor extends UntypedConsumerActor {

        public ParentActor() {
            System.out.println("Parent started");
        }
        @Override
        public String getEndpointUri() {
            return "ftp://anonymous@localhost:8021?password=password&readLock=changed&initialDelay=0&delay=200&noop=true";
        }

        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg instanceof CamelMessage) {
                getContext().actorOf(new Props(ChildActor.class), "0");
            } else {
                unhandled(msg);
            }
        }
    }

    public static class ChildActor extends UntypedConsumerActor {

        public ChildActor() {
            System.out.println("Child started");
        }

        @Override
        public String getEndpointUri() {
            return "ftp://anonymous@localhost:8021?password=password&readLock=changed&initialDelay=0&delay=200&include=test.txt&localWorkDirectory=/tmp";
        }

        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg instanceof CamelMessage) {
                System.out.println("Child got message");
                CamelMessage camelMsg = (CamelMessage) msg;

                InputStream source = camelMsg.getBodyAs(InputStream.class, getCamelContext());
                System.out.println(source.getClass().getName());
                System.exit(0);
            } else {
                unhandled(msg);
            }
        }
    }

    public static void main(String[] args) {

        ActorSystem system = ActorSystem.create("default");

        FakeFtpServer ftpServer = new FakeFtpServer();
        UnixFakeFileSystem ftpFileSystem = new UnixFakeFileSystem();
        ftpServer.setFileSystem(ftpFileSystem);
        ftpServer.addUserAccount(new UserAccount("anonymous", "password", "/"));
        ftpServer.setServerControlPort(8021);

        // fix bug in PWD handling (either Apache FTP client or mock server depending on opinion)
        ftpServer.setCommandHandler("PWD", new AbstractFakeCommandHandler() {
            @Override
            protected void handle(Command command, Session session) {
                String currentDirectory = (String) session.getAttribute(SessionKeys.CURRENT_DIRECTORY);
                this.replyCodeForFileSystemException = ReplyCodes.READ_FILE_ERROR;
                verifyFileSystemCondition(notNullOrEmpty(currentDirectory), currentDirectory, "filesystem.currentDirectoryNotSet");
                int replyCode = ReplyCodes.PWD_OK;
                String replyText = String.format("\"%s\" OK", currentDirectory.replaceAll("\"", "\"\""));
                session.sendReply(replyCode, replyText);
            }
        });
        ftpFileSystem.add(new FileEntry("/test.txt", "hello world"));
        ftpServer.start();

        new JavaTestKit(system) {{
            getSystem().actorOf(new Props(ParentActor.class));
        }};
    }
}

Maven dependencies showing versions:

    <dependencies>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-camel_2.10</artifactId>
            <version>2.1.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-testkit_2.10</artifactId>
            <version>2.1.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-ftp</artifactId>
            <version>2.10.3</version>
        </dependency>
        <dependency>
            <groupId>org.mockftpserver</groupId>
            <artifactId>MockFtpServer</artifactId>
            <version>2.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>commons-net</groupId>
            <artifactId>commons-net</artifactId>
            <version>3.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.2</version>
        </dependency>
    </dependencies>

I expect to see BufferedInputStream written to standard out - and check that ByteArrayInputStream is not.

But instead, I see file not found exceptions:

[ERROR] [02/15/2013 10:53:32.951] [default-akka.actor.default-dispatcher-7] [akka://default/user/$a/0] Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory)
org.apache.camel.TypeConversionException: Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory)
    at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:162)

A couple of times, it worked, leading me to suspect it may be a race somewhere. But it almost always fails.

Any clues, ideas, suggestions?

FWIW:

uname -a: Linux 3.2.0-37-generic #58-Ubuntu SMP Thu Jan 24 15:28:10 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
java: 1.7.0_11-b21
3

3 Answers

3
votes

I've found the solution to the above problem.

It's the fact that the child consumer autoAck() returns true (which it does by default). In such cases, akka-camel will send the CamelMessage fire-and-forget, and proceed with cleanup. The child actor, meanwhile, doesn't actually get an opened InputStream until one of the type converters called by getBodyAs() opens it up. So there is a race between the child actor opening the file via getBodyAs(), and Camel cleanup deleting the file after asynchronously sending the message.

So the fix is to override autoAck() to return false, and send Ack.getInstance() (or new Status.Failure(<cause>) if you like) at the end of the child message handler.

1
votes

Use Camel 2.10.2 as there is an issue in 2.10.3 with the ftp component

0
votes

When using localWorkDirectory=/tmp then that directory is for storing the file temporary during routing. When the Camel Exchange is done the file is deleted. I am not sure how this works with Akka which is async events. So the Akka onReceive may be called async after the Camel Exchange is done and therefore the temp file is deleted.

In Camel you would route the file to a filke location of more permament nature

 from("ftp:...")
   .to("file:inbox")

And then you can have Akka consume from ("file:inbox") instead.