3
votes

I have two paths, one for a file and one for a folder. I would like to move the file into that folder on HDFS. How can I do that in Scala? I'm using Spark, too

Bonus if the same code will work for Windows paths too, just like reading/writing files on HDFS, but not required.

I have tried the following:

val fs = FileSystem.get(sc.hadoopConfiguration)
fs.moveFromLocalFile(something, something2)

And I get the following error:

Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs:/user/o/datasets/data.txt, expected: file:///

Same goes for moveToLocalFile() because they are meant to transfer files between filesystems, not within a filesystem. I have also tried fs.rename() but that did not do anything at all (no error or anything either).

I basically create files in one directory (writing to them with a stream) and once they are done they need to moved into a different directory. This different directory is monitored by Spark streaming and I have had some issues when Spark streaming tries to work with not finished files

2
Spark streaming tries to work with not finished files. You need to explicitly ignore any files starting with a period or underscore - OneCricketeer
When I create the files, in their temporary form they still have the same filename however they have size 0 (bytes) until they are finished, then they have the final size and the same name. - osk
Yes, and unless you ignore them, Spark Streaming throws errors - OneCricketeer
How can I detect the size in my program? Since the filename does not change - osk
I don't understand the question, but it seems unrelated to the original post - OneCricketeer

2 Answers

10
votes

Try the following Scala code.

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)

val srcPath = new Path(srcFilePath)
val destPath = new Path(destFilePath)

hdfs.copyFromLocalFile(srcPath, destPath)

You should also check if Spark has the HADOOP_CONF_DIR variable set in the conf/spark-env.sh file. This will make sure that Spark is going to find the Hadoop configuration settings.

The dependencies for the build.sbt file:

libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.6.0"
libraryDependencies += "org.apache.commons" % "commons-io" % "1.3.2"
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"

OR

you can used IOUtils from apache commons to copy data from InputStream to OutputStream

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.commons.io.IOUtils;



val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

//Create output stream to HDFS file
val outFileStream = fs.create(new Path("hdfs://<namenode>:<port>/output_path"))

//Create input stream from local file
val inStream = fs.open(new Path("hdfs://<namenode>:<port>/input_path"))

IOUtils.copy(inStream, outFileStream)

//Close both files
inStream.close()
outFileStream.close()
1
votes
import org.apache.hadoop.fs.{FileAlreadyExistsException, FileSystem, FileUtil, Path}

val srcFileSystem: FileSystem = FileSystemUtil
  .apply(spark.sparkContext.hadoopConfiguration)
  .getFileSystem(sourceFile)
val dstFileSystem: FileSystem = FileSystemUtil
  .apply(spark.sparkContext.hadoopConfiguration)
  .getFileSystem(sourceFile)
FileUtil.copy(
  srcFileSystem,
  new Path(new URI(sourceFile)),
  dstFileSystem,
  new Path(new URI(targetFile)),
  true,
  spark.sparkContext.hadoopConfiguration)