0
votes

Issue : object not serializable

Can you please see how to overcome the issue. able to read it properly like printing properly. but while writing the records to parquet getting

object not serializable

caused by: java.io.NotSerializableException: parquet.avro.AvroParquetWriter Serialization stack: - object not serializable (class: parquet.avro.AvroParquetWriter, value: parquet.avro.AvroParquetWriter@658e7ead)

Please review and let me know what are the best way to do it.

Code : Coverting Avro record to Parquet

  val records = sc.newAPIHadoopRDD(conf.getConfiguration,
  classOf[AvroKeyInputFormat[GenericRecord]],
  classOf[AvroKey[GenericRecord]], //Transforms the PairRDD to RDD 
  classOf[NullWritable]).map(x => x._1.datum) 

  // Build a schema
  val schema = SchemaBuilder
  .record("x").namespace("x")
  .fields
  .name("x").`type`().stringType().noDefault()
  .endRecord

val parquetWriter = new AvroParquetWriter[GenericRecord](new Path(outPath), schema)

val parquet  = new GenericRecordBuilder(schema)

records.foreach { keyVal =>
  val x = keyVal._1.datum().get("xyz") -- Field
     parquet.set("x", x)
        .build
      parquetWriter.write(schema.build())
    }
3

3 Answers

1
votes

you could start here to read in avro into a dataframe https://github.com/databricks/spark-avro

// import needed for the .avro method to be added
import com.databricks.spark.avro._

val sqlContext = new SQLContext(sc)

// The Avro records get converted to Spark typesca
val df = sqlContext.read.avro("src/test/resources/episodes.avro")

df.registerTempTable("tempTable")
val sat = sqlContext.sql( //use lateral view explode )
sat.write.parquet("/tmp/output")
0
votes

I'm not sure why you are taking the approach you are. But I would recommend a different approach. If you get the avro file into an rdd, which it looks like you do. And you can create a schema then convert the RDD into a data frame and then write the data frame out as parquet.

var avroDF = sqlContext.createDataFrame(avroRDD,avroSchema)
avroDF
    .write
    .mode(SaveMode.Overwrite)
    .parquet("parquet directory to write file")
0
votes

For some of my complex Json that has complicated structs and arrays I use hive ql lateral view explode. Here is an example of complex json that is flattened. It starts out as 10 rows and for some traces I can get 60 rows and some I get less than 5. It just depends on how it explodes.

val tenj = sqlContext.read.json("file:///home/marksmith/hive/Tenfile.json")

scala> tenj.printSchema
root

 |-- DDIVersion: string (nullable = true)
 |-- EndTimestamp: string (nullable = true)
 |-- Stalls: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Stall: long (nullable = true)
 |    |    |-- StallType: string (nullable = true)
 |    |    |-- TraceTypes: struct (nullable = true)
 |    |    |    |-- ActiveTicket: struct (nullable = true)
 |    |    |    |    |-- Category: string (nullable = true)
 |    |    |    |    |-- Traces: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- EndTime: string (nullable = true)
 |    |    |    |    |    |    |-- ID: string (nullable = true)
 |    |    |    |    |    |    |-- Source: string (nullable = true)
 |    |    |    |    |    |    |-- StartPayload: struct (nullable = true)
 |    |    |    |    |    |    |    |-- SubticketID: string (nullable = true)
 |    |    |    |    |    |    |    |-- TicketID: string (nullable = true)
 |    |    |    |    |    |    |    |-- TicketState: long (nullable = true)
 |    |    |    |    |    |    |-- StartTime: string (nullable = true)

tenj.registerTempTable("ddis")


val sat = sqlContext.sql(
    "select DDIVersion, StallsExp.stall, StallsExp.StallType, at.EndTime, at.ID, 
       at.Source, at.StartPayload.SubTicketId, at.StartPayload.TicketID, 
       at.StartPayload.TicketState, at.StartTime  
    from ddis 
      lateral view explode(Stalls) st as StallsExp 
      lateral view explode(StallsExp.TraceTypes.ActiveTicket.Traces) at1 as at")
sat: org.apache.spark.sql.DataFrame = [DDIVersion: string, stall: bigint, StallType: string, EndTime: string, ID: string, Source: string, SubTicketId: string, TicketID: string, TicketState: bigint, StartTime: string]

sat.count
res22: Long = 10

sat.show
+----------+-----+---------+--------------------+---+------+-----------+--------+-----------+--------------------+
|DDIVersion|stall|StallType|             EndTime| ID|Source|SubTicketId|TicketID|TicketState|           StartTime|
+----------+-----+---------+--------------------+---+------+-----------+--------+-----------+--------------------+
|  5.3.1.11|   15|    POPS4|2016-06-08T20:07:...|   | STALL|          0|     777|          1|2016-06-08T20:07:...|
|  5.3.1.11|   14|    POPS4|2016-06-08T20:07:...|   | STALL|          0|     384|          1|2016-06-08T20:06:...|
|  5.3.1.11|   13|    POPS4|2016-06-08T20:07:...|   | STALL|          0|  135792|          1|2016-06-08T20:06:...|
|  5.0.0.28|   26|    POPS4|2016-06-08T20:06:...|   | STALL|          0|     774|          2|2016-06-08T20:03:...|