0
votes

I am using spark and scala on jdk1.8.I am new to Scala.

I am reading a text file (pat1.txt) that looks like :

enter image description here

Now I am reading that file from my scala code as :

val sqlContext = SparkSession.builder().getOrCreate()  
sqlContext.read
    .format(externalEntity.getExtractfileType)
    .option("compression", externalEntity.getCompressionCodec)
    .option("header", if (externalEntity.getHasHeader.toUpperCase == "Y") "true" else "false")
    .option("inferSchema", "true")
    .option("delimiter", externalEntity.getExtractDelimiter)
    .load(externalEntity.getFilePath)
    .createOrReplaceTempView(externalEntity.getExtractName)

And then making a query as from my scala code:

val queryResult = sqlContext.sql(myQuery)

and output is generated as :

queryResult
 .repartition(LteGenericExtractEntity.getNumberOfFiles.toInt)
 .write.format("csv")
 .option("compression", LteGenericExtractEntity.getCompressionCodec)
 .option("delimiter", LteGenericExtractEntity.getExtractDelimiter)
 .option("header", "true"")
 .save(s"${outputDirectory}/${extractFileBase}")

Now when the 'myQuery' above is

select * from PAT1

The program is generating o/p as (notice the extra line with "value" that was not part of the file). Basically the program is not able to to identify the "," separated columns in the input file and in the output it creates 1 column under the header that is named as "value". So the output file looks like :

enter image description here

If I change 'myQuery' as :

select p1.FIRST_NAME, p1.LAST_NAME,p1.HOBBY  from PAT1 p1

It throws exception as:

enter image description here

My input can be in any format ( like can be text/csv and can have compression) and output will always be in .csv

I am getting hard time to understand how to change the read part so the created view can have columns appropriately.Can I get help on that.

1
I think you have to change read format to csv in input like sqlContext.read.format("csv") - Ramdev Sharma

1 Answers

1
votes

This looks like csv file, but with .txt extension. You could try the following:

  1. Rad this file as csv with extra options like spark.read.option("inferSchema", "true").option("header", "true").csv("path/to/file")
  2. After reading file as you did, just specify the schema of the dataframe as:
    sqlContext.read.format("text")
          .option("compression", "none")
          .option("delimiter", ",")
          .option("header", "true")
          .load("/tmp/pat1")
          .toDF("first_name", "last_name", "hobby")