1
votes

I am trying to create a datapipeline where the incomng data is stored into parquet and i create and external hive table and users can query the hive table and retrieve data .I am able to save the parquet data and retrieve it directly but when i query the hive table its not returning any rows. I did the following test setup

--CREATE EXTERNAL HIVE TABLE create external table emp ( id double, hire_dt timestamp, user string ) stored as parquet location '/test/emp';

Now created dataframe on some data and saved to parquet .

---Create dataframe and insert DATA

val employeeDf = Seq(("1", "2018-01-01","John"),("2","2018-12-01", "Adam")).toDF("id","hire_dt","user")
val schema = List(("id", "double"), ("hire_dt", "date"), ("user", "string"))
val newCols= schema.map ( x => col(x._1).cast(x._2)) 
val newDf = employeeDf.select(newCols:_*)
newDf.write.mode("append").parquet("/test/emp")
newDf.show 

--read the contents directly from parquet 
val sqlcontext=new org.apache.spark.sql.SQLContext(sc)
sqlcontext.read.parquet("/test/emp").show 

+---+----------+----+
| id|   hire_dt|user|
+---+----------+----+
|1.0|2018-01-01|John|
|2.0|2018-12-01|Adam|
+---+----------+----+

--read from the external hive table 
spark.sql("select  id,hire_dt,user from  emp").show(false)

+---+-------+----+
|id |hire_dt|user|
+---+-------+----+
+---+-------+----+

As shown above i am able to see the data if i read from parquet directly but not from hive .The question is what i am doing wrong here ? What i am i doing wrong that the hive isnt getting the data. I thought msck repair may be a reason but i get error if i try to do msck repair table saying table not partitioned.

4

4 Answers

0
votes

Based on your create table statement, you have used location as /test/emp but while writing data, you are writing at /tenants/gwm/idr/emp. So you will not have data at /test/emp.

CREATE EXTERNAL HIVE TABLE create external table emp ( id double, hire_dt timestamp, user string ) stored as parquet location '/test/emp';

Please re-create external table as

CREATE EXTERNAL HIVE TABLE create external table emp ( id double, hire_dt timestamp, user string ) stored as parquet location '/tenants/gwm/idr/emp';

0
votes

Apart from the answer given by Ramdev below, you also need to be cautious of using the correct datatype around date/timestamp; as 'date' type is not supported by parquet when creating a hive table.

For that you can change the 'date' type for column 'hire_dt' to 'timestamp'.

Otherwise there will be a mismatch in data you persisting through spark and trying to read in hive (or hive SQL). Keeping it to 'timestamp' at both places will resolve the issue. I hope it helps.

0
votes

Do you have enableHiveSupport() in your sparkSession builder() statement. Are you able to connect to hive metastore? Try doing show tables/databases in your code to see if you can display tables present at your hive location?

0
votes

i got this working with below chgn.

val dfTransformed = employeeDf.withColumn("id", employeeDf.col("id").cast(DoubleType))
            .withColumn("hire_dt", employeeDf.col("hire_dt".cast(TimestampType))

So basically the issue was datatype mismatch and some how the original code the cast doesn't seem to work. So i did an explicit cast and then write it goes fine and able to query back as well.Logically both are doing the same not sure why the original code not working.

val employeeDf = Seq(("1", "2018-01-01","John"),("2","2018-12-01", "Adam")).toDF("id","hire_dt","user")

val dfTransformed = employeeDf.withColumn("id", employeeDf.col("id").cast(DoubleType))
    .withColumn("hire_dt", employeeDf.col("hire_dt".cast(TimestampType))

dfTransformed.write.mode("append").parquet("/test/emp")
dfTransformed.show 

--read the contents directly from parquet 
val sqlcontext=new org.apache.spark.sql.SQLContext(sc)
sqlcontext.read.parquet("/test/emp").show 

+---+----------+----+
| id|   hire_dt|user|
+---+----------+----+
|1.0|2018-01-01|John|
|2.0|2018-12-01|Adam|
+---+----------+----+

--read from the external hive table 
spark.sql("select  id,hire_dt,user from  emp").show(false)
+---+----------+----+
| id|   hire_dt|user|
+---+----------+----+
|1.0|2018-01-01|John|
|2.0|2018-12-01|Adam|
+---+----------+----+