I am using MongoDB-Hadoop connector to read a collection having embedded documents.
JSON collection : PersonaMetaData
{
"user_id" : NumberLong(2),
"persona_created" : true,
"persona_createdAt" : ISODate("2016-02-24T06:41:49.761Z"),
"persona" : [{"persona_type" : 1,
"created_using_algo" : "Name of the algo",
"version_algo" : "1.0",
"createdAt" : ISODate("2016-02-24T06:41:49.761Z"),
"persona_items": {"key1":"value1", "key2": "value2"} }]
}
I have created following classes to represent the data in the collection
class Persona_Items implements Serializable
{
private int key1;
private String key2;
// Getter/Setter and constructor
}
class Persona implements Serializable
{
String persona_type;
String created_using_algo
String version_algo
long createdAt;
List<Persona_Items> listPersonaItems;
// Getter/setter and constructor
}
class PersonaMetaData implements Serializable
{
long user_id;
boolean persona_created;
long persona_createdAt;
List<Persona> listPersona;
// Getter/setter and constructor
}
and then used it as
// RDD representing the complete collection
JavaPairRDD<Object, BSONObject> bsonRdd = sc.newAPIHadoopRDD(inputConfig,
com.mongodb.hadoop.MongoInputFormat.class,
Object.class, BSONObject.class);
// Get RDD of PersonaMetaData
JavaRDD<PersonaMetaData> metaDataSchemaJavaRDD =
bsonRdd.map(new Function<Tuple2<Object, BSONObject>, PersonaMetaData >() {
@Override
public PersonaMetaData call(Tuple2<Object, BSONObject> objectBSONObjectTuple2)
throws Exception { // Parse the BSON object and return a new PersonaMetaData object }
// Convert into DataFrame
dataFrame= sqlContext.createDataFrame(metaDataSchemaJavaRDD,
PersonaMetaData.class);
Exception
scala.MatchError: io.abc.spark.schema.PersonaMetaData @31ff5060 (of class io.abc.spark.schema.PersonaMetaData ) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:169) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(SQLContext.scala:500) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(SQLContext.scala:500) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext.scala:500) at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext.scala:498)
Not having any list in the class runs without any issues.