7
votes

I am trying to read a file on an remote machine in Apache Spark (the Scala version) using ftp. Currently, I have followed an example in the Learning Spark repo of Databricks on GitHub. Using curl, I am able to download the file, so the path I uses exists.

Below is a snippet of the code I try to execute:

var file = sc.textFile("ftp://user:pwd/192.168.1.5/brecht-d-m/map/input.nt")
var fileDF = file.toDF()
fileDF.write.parquet("out")

After trying to execute a count on the dataframe, I get following stacktrace (http://pastebin.com/YEq8c2Hf):

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#1L])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#4L])
      +- Project
         +- Scan ExistingRDD[_1#0]

...

Caused by: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: ftp://user:[email protected]/brecht-d-m/map/input.nt

I would assume that the file would be unreachable, but this is in contradiction with that I am able to retrieve the file via curl:

curl ftp://user:[email protected]/brecht-d-m/map/input.nt

This will print out the specific file on my terminal. I do not see what I am doing wrong in the Scala code. Is there an error in the code snippet I gave above, or is that code totally wrong?

Thanks in advance, Brecht

Note:

  • Specifying the whole path (/home/brecht-d-m/map/input.nt) also does not work (as expected, since this also does not work in curl; "server denied you to change to the given directory"). Trying this in Spark, gives the IOException that seek is not supported (http://pastebin.com/b9EB9ru2).

  • Working locally (e.g. sc.textFile("/home/brecht-d-m/map/input.nt")) works perfectly.

  • File permissions for specific file is set to R+W for all users.

  • The file size (15MB) should not be a problem, and it should be able to handle much bigger files.

  • Software versions: Scala 2.11.7, Apache Spark 1.6.0, Java 1.8.0_74, Ubuntu 14.04.4

2

2 Answers

11
votes

I was able to find a workaround. Via the codesnippet below:

import org.apache.spark.SparkFiles

val dataSource = "ftp://user:pwd/192.168.1.5/brecht-d-m/map/input.nt"
sc.addFile(dataSource)
var fileName = SparkFiles.get(dataSource.split("/").last)
var file = sc.textFile(fileName)

I am able to download a file over FTP (with the same URL as from the first code snippet). This workaround will first download the file (via addFile). Next, I retrieve the path to where the file was downloaded. Finally, I use that path to load that file into an RDD.

0
votes

I had the same requirement to fetch remote files using scala. The current answer does not solve the problem for sftp (mostly applicable across companies).

I am using the below scala code to create an inputStream from the file. I transform it to String. But we can choose to create rdd / save to filesystem a well.

Sharing this incase someone has to use scala.

Entry to import jsch into build.sbt:

libraryDependencies += "com.jcraft" % "jsch" % "0.1.55"

Now create the below method using jsch built-in classes:

import com.jcraft.jsch.{ChannelSftp, JSch, JSchException}
      def getRemoteFile: String = {
        val jsch = new JSch()
        try{
          val session = jsch.getSession("devuser", "175.1.6.60", 22)
          session.setConfig("StrictHostKeyChecking", "no")
          session.setPassword("devpassword")
          session.connect()

          val channel = session.openChannel("sftp")
          channel.connect()
          val sftpChannel = channel.asInstanceOf[ChannelSftp]
          val output = sftpChannel.get("/tmp/monitoring/greenplum_space_report.txt")
          val displayAns = scala.io.Source.fromInputStream(output).mkString
          sftpChannel.exit()
          session.disconnect()
          displayAns
        }
        catch {
          case ex : JSchException => ex.printStackTrace().toString
        }
      }