3
votes

I'm using Spark 1.5.2 with Java, and I'm attempting to read in a parquet file that contains data that originated from a JSON file. I'm having difficulties figuring out how I should read in a field that originally contained nested JSON, but now is a WrappedArray<WrappedArray<String>>. I've looked through the Spark pages for reading in Parquet files, but none of the examples seemed to match what I was looking for. I did some searching, and found things that were close, but specific to scala.

Here is an example of the original JSON:

{"page_number":1,"id_groups":[{"ids":["60537"]},{"ids":["65766","7368815"]}]}

The field I'm having an issue reading in is the id_groups field. I read the parquet file in, and did a show. The schema looks like this:

StructField(id_groups,ArrayType(StructType(StructField(ids,ArrayType(StringType,true),true)),true),true))

I'm guessing that I need to create a schema for that field, but I can't figure out how to do that using the Spark Java API.

This post seemed promising (shows scala code creating a schema for nested data), but I don't know how to replicate something similar using Java.

spark-specifying-schema-for-nested-json

Any suggestions on how to read the id_groups data from the parquet file?

IntelliJ shows, while stepping through the code, that the id_groups field is a WrappedArray<WrappedArray<String>>.

1
As far as I understand you've been able to read the file, right? So the problem is how to select nested field? - zero323
Yes, you are correct - I am unsure how to read the data from the id_groups field. I'm not sure if this would be helpful or not, but IntelliJ shows that the data in the id_groups field is a WrappedArray<WrappedArray<String>>. - LeeWallen

1 Answers

0
votes

I found a way to read the data that originated from nested JSON, but I don't particularly like the way I did it.

DataFrame parquetData = sqlContext.read().parquet("/Users/leewallen/dev/spark_data/out/ParquetData");
parquetData.registerTempTable("pd");
DataFrame idGroupsDataFrame = sqlContext.sql("select id_groups.ids from pd");

List<String> idList = idGroupsDataFrame.javaRDD()
                                       .map((Function<Row, String>) row -> {
    List<String> ids = new ArrayList<>();
    List<WrappedArray<String>> wrappedArrayList = row.getList(0);
    java.util.Iterator<WrappedArray<String>> wrappedArrayIterator = wrappedArrayList.iterator();
    while (wrappedArrayIterator.hasNext()) {
        WrappedArray<String> idWrappedArray = wrappedArrayIterator.next();
        Iterator<String> stringIter = idWrappedArray.iterator();
        List<String> tempIds = new ArrayList<>();
        while (stringIter.hasNext()) {
            tempIds.add(stringIter.next());
        }

        ids.add(tempIds.stream()
                       .reduce((s1, s2) -> String.format("%s,%s", s1, s2))
                       .get());
    }

    return ids.stream()
              .reduce((s1, s2) -> String.format("%s|%s", s1, s2))
              .get();
}).collect();

idList.forEach(id -> System.out.println(id));

If the input data looks like this:

{"page_number":1,"id_groups":[{"ids":["60537"]},{"ids":["65766","7368815"]}]}

then the printed output looks like this:

60537|65766,7368815

If anyone has a better way to get the same result, then please let me know.