1
votes

I have a Spark SQL that read my S3 JSON file(s) into a DataFrame.

I then run 2 SQL on that DataFrame and found SparkSQL read my S3 JSON files twice before executing each of my SQL.

If the DataFrame object is not being reused, it would be very costly...

Any help is appreciated.

Here is my code fragment:

protected boolean doAggregations() throws IOException { 

    SQLContext sqlContext = getSQLContext(); 

    DataFrame edgeDataFrame = sqlContext.read().json(sourceDataDirectory); 


    edgeDataFrame.cache(); 

    getLogger().info("Registering and caching the table 'edgeData'"); 
    edgeDataFrame.registerTempTable("edgeData"); 

    String dateKey = DateTimeUtility.SECOND_FORMATTER.print(System.currentTimeMillis()); 

    for (Map.Entry<String, AggregationMetadata> entry : aggMetadataMap.entrySet()) { 
        String aggName = entry.getKey(); 
        String resultDir = getAggregationResultDirectory(aggName, dateKey); 
        String sql = entry.getValue().getSql(); 
        // The input file(s) are being read again and again instead of operating on the "edgeDataFrame" 
        DataFrame dataFrame = sqlContext.sql(sql); 
        dataFrame.write().format("json").save(resultDir); 
    } 
    return true; 
}
1

1 Answers

0
votes

Your JSON files were read twice because Spark did not know the schema of the JSON and SQL requires a known schema. Therefore, Spark took a two-pass approach:

  1. Discover the schema of all JSON records as the union of the schemas of each JSON record.

  2. Load the data into an appropriately-configured data structure.

Imagine you have the simple, one-line JSON file:

{"category" : "A", "num" : 5}

If you execute

sqlContext.read.json(path).saveAsTable("test")

in spark-shell you'll notice the two passes.

The first pass has a map phase that collects the discovered schema per partition and the reduce phase combines the schema into the union schema for all partitions.

For the map phase, you'll see something like:

INFO DAGScheduler: Submitting Stage 11 (file:///home/ubuntu/test_data.jsonlines MapPartitionsRDD[28] at textFile at JSONRelation.scala:114), which has no missing parents

For the reduce phase you'll see something like:

INFO SparkContext: Starting job: reduce at JsonRDD.scala:54

After that, when the schema is known, the actual loading of the JSON data will begin. This will only involve a map phase because there is no need to share information between partition processors once the schema is discovered.

You can see how Spark treats the data columns in the logs:

INFO ColumnChunkPageWriteStore: written 56B for [category] BINARY: 1 values, 11B raw, 29B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED]
INFO ColumnChunkPageWriteStore: written 70B for [num] INT64: 1 values, 14B raw, 29B comp, 1 pages, encodings: [PLAIN, RLE, BIT_PACKED]