0
votes

I have a hive table partitionned by timestamp on top of parquet files with snappy conversion. basically the paths look like :

s3:/bucketname/project/flowtime=0/
s3:/bucketname/project/flowtime=1/
s3:/bucketname/project/flowtime=2/
...

I detect some inconsistency considering this table. The problem is that because of one field that gives LongType at some parquet schemas and String at another, runing queries throws ClassCastException.

So what I am trying to do now is to read all my parquet files and check their schemas so I can recreate them. I want to map my filenames to the schema of the associate parquet. so that I can have :

filename                           | schema 
s3:/bucketname/project/flowtime    |StructField(StructField(Id,StringType,True), 
                                   |StructField(Date,StringType,True)

So I tried to use spark with Scala and function input_file_name of org.apache.spark.sql.functions which I wrap in an UDF. It works pretty fine.

val filename = (path: String) => path
val filenameUDF = udf(filename)
val df=sqlContext.parquetFile("s3a://bucketname/").select(filenameUDF(input_file_name())).toDF()
df.map(lines =>(lines.toString,sqlContext.read.parquet(lines.toString.replace("[","").replace("]","")).schema.toString)})

It is to give an RDD[(String,String)] Only it seems that the part that reads the parquet within my map thows a nullPointerException.

ERROR scheduler.TaskSetManager: Task 0 in stage 14.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 35, CONFIDENTIAL-SERVER-NAME, executor 13): java.lang.NullPointerException
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:32)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
        at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1888)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1888)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

If you have any idea why the read parquet seems to not work inside the map please let me know why, because both parts of the pair I wanna create (the filename AND the schema) seems to work fine but joining them dont.

If also, you have better ideas how to solve the inconsistency among my parquet files that makes my hive table corrupted, because I don't see another choice than to work it that way because parquet are immutable and changing hive metadata don't change the embedded parquet metadata in each file.

Thank you for your attention. Renaud

1

1 Answers

0
votes

Let me suggest you an other get and loop on yout bucket list.

first you can read and store s3 bucket name by using listStatus then loop on each path.

import java.net.URI
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
import java.io._

val file = new File("/home/.../fileName.txt")
val path = "s3:/bucketname/project/"

val fileSystem = FileSystem.get(URI.create(path), new Configuration()) 
val folders = fileSystem.listStatus(new Path(path)) 
val bw = new BufferedWriter(new FileWriter(file))

for (folder <- folders) { bw.write(folder.getPath.toString().split("/")(6) + " => " + spark.read.parquet(folder.getPath.toString()).select("myColum").schema.toString() + "\n") }

bw.close

hope it will help you.

Regards. Steven