0
votes

I am quite a newbie to Spark and Scala ;)

Code summary :

Reading data from CSV files --> Creating A simple inner join on 2 Files --> Writing data to Hive table --> Submitting the job on the cluster

Can you please help to identify what went wrong. The code is not really complex. The job is executed well on cluster. Therefore when I try to visualize data written on hive table I am facing issue.

hive> select * from Customers limit 10;

Failed with exception java.io.IOException:java.io.IOException: hdfs://m01.itversity.com:9000/user/itv000666/warehouse/updatedcustomers.db/customers/part-00000-348a54cf-aa0c-45b4-ac49-3a881ae39702_00000.c000 .csv not a SequenceFile

object LapeyreSparkDemo extends App {
  
  //Getting spark ready
  val sparkConf = new SparkConf()
  sparkConf.set("spark.app.name","Spark for Lapeyre")
  
  //Creating Spark Session
  val spark = SparkSession.builder()
                          .config(sparkConf)
                          .enableHiveSupport()
                          .config("spark.sql.warehouse.dir","/user/itv000666/warehouse")
                          .getOrCreate()                       
  Logger.getLogger(getClass.getName).info("Spark Session Created Successfully")
  
  //Reading
  Logger.getLogger(getClass.getName).info("Data loading in DF started")
  val ordersSchema = "orderid Int, customerName String, orderDate String, custId Int, orderStatus 
  String, age String, amount Int" 
  val orders2019Df = spark.read
  .format("csv")
  .option("header",true)
  .schema(ordersSchema)
  .option("path","/user/itv0006666/lapeyrePoc/orders2019.csv")
  .load
  val newOrder = orders2019Df.withColumnRenamed("custId", "oldCustId")
                             .withColumnRenamed("customername","oldCustomerName")
   
  val orders2020Df = spark.read
  .format("csv")
  .option("header",true)
  .schema(ordersSchema)
  .option("path","/user/itv000666/lapeyrePoc/orders2020.csv")
  .load
  
  Logger.getLogger(getClass.getName).info("Data loading in DF complete")
  
  //processing
  Logger.getLogger(getClass.getName).info("Processing Started")
  val joinCondition = newOrder.col("oldCustId") === orders2020Df.col("custId")
  val joinType = "inner"
  val joinData = newOrder.join(orders2020Df, joinCondition, joinType)
                             .select("custId","customername")
  
  //Writing
  
  spark.sql("create database if not exists updatedCustomers")
                  
  joinData.write
  .format("csv")
  .mode(SaveMode.Overwrite)
  .bucketBy(4, "custId")
  .sortBy("custId")
  .saveAsTable("updatedCustomers.Customers")
                        
  //Stopping Spark Session
  spark.stop()

}

Please let me know in case more information required. Thanks in advance.

1
Can you add the result of describe extended to the question? Maybe it'll help. DESCRIBE [DATABASE] [FORMATTED|EXTENDED] object_name - Filip

1 Answers

0
votes

This is the culprit

joinData.write
.format("csv")

Instead used this and it worked.

joinData.write
.format("Hive")

Since I am writing data to hive table (orc format), the format should be "Hive" and not csv.

Also, do not forget to enable hive support while creating spark session. Also, In spark 2, bucketby & sortby is not supported. Maybe it does in Spark 3.