0
votes

i am trying to fetch data from the kafka topic and pushing the same to hdfs location. I am facing following issue.

After every message (kafka) the hdfs location is updated with part files with .c000.csv format.i have created one hive table on top of the HDFS location, but the HIVE is not able to read data whatever written from spark structured streaming.

below is the file format after spark structured streaming

  part-00001-abdda104-0ae2-4e8a-b2bd-3cb474081c87.c000.csv

Here is my code to insert:

val kafkaDatademostr = spark.readStream.format("kafka").option("kafka.bootstrap.servers","ttt.tt.tt.tt.com:8092").option("subscribe","demostream").option("kafka.security.protocol","SASL_PLAINTEXT").load

val interval=kafkaDatademostr.select(col("value").cast("string")) .alias("csv").select("csv.*")

val interval2=interval.selectExpr("split(value,',')[0] as rog" ,"split(value,',')[1] as vol","split(value,',')[2] as agh","split(value,',')[3] as aght","split(value,',')[4] as asd")

//   interval2.writeStream.outputMode("append").format("console").start()
       interval2.writeStream.outputMode("append").partitionBy("rog").format("csv").trigger(Trigger.ProcessingTime("30 seconds")).option("path", "hdfs://vvv/apps/hive/warehouse/area.db/test_kafcsv/").start()

Can someone help me, why is it creating files like this?

If I do dfs -cat /part-00001-ad35a3b6-8485-47c8-b9d2-bab2f723d840.c000.csv i can see my values.... but its not reading with hive due to format issue...

1
Curious: Have you heard of Kafka Connect? Do you really want to write Spark code for such a simple use-case of Kafka to HDFS? Also, why CSV compared to Parquet, which Hive can read better (not worry about quotes and commas)? - OneCricketeer
my use case is not simpler.. there are some JOINS and aggregation on top of my data... but i thought of running as simple as it is.. since i am beginner in spark/kafka.. so my simple use case also not working - BigD
Alright, well, could you show your Hive table definition and the data you're looking at? - OneCricketeer
i found something.... do we need specify the last column as partition by ?? - BigD
For an INSERT INTO, yes. For CREATE TABLE you would have PARTITIONED BY instead - OneCricketeer

1 Answers

0
votes

This c000 files are temporary files in which streaming data writes it data. As you are on appending mode, spark executor holds that writer thread , that's why on run time you are not able to read it using hive serializer, though hadoop fs -cat is working .