3
votes

It seems that spark can't apply schema (differs from String) for DataFrame when converting it from RDD through Row object. I've tried both on Spark 1.4 and 1.5 version.

Snippet (Java API):

JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, String.class,
                StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);

directKafkaStream.foreachRDD(rdd -> {
    rdd.foreach(x -> System.out.println("x._1() = " + x._1()));
    rdd.foreach(x -> System.out.println("x._2() = " + x._2()));

    JavaRDD<Row> rowRdd = rdd.map(x -> RowFactory.create(x._2().split("\t")));

    rowRdd.foreach(x -> System.out.println("x = " + x));

    SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());

    StructField id = DataTypes.createStructField("id", DataTypes.IntegerType, true);
    StructField name = DataTypes.createStructField("name", DataTypes.StringType, true);
    List<StructField> fields = Arrays.asList(id, name);
    StructType schema = DataTypes.createStructType(fields);

    DataFrame sampleDf = sqlContext.createDataFrame(rowRdd, schema);

    sampleDf.printSchema();
    sampleDf.show();

    return null;
});

jssc.start();
jssc.awaitTermination();

It produces following output if specify DataTypes.StringType for "id" field:

x._1() = null
x._2() = 1  item1
x = [1,item1]
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

+---+-----+
| id| name|
+---+-----+
|  1|item1|
+---+-----+

For specified code it throws error:

x._1() = null
x._2() = 1  item1
x = [1,item1]
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)

15/09/16 04:13:33 ERROR JobScheduler: Error running job streaming job 1442402013000 ms.0
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$IntConverter$.toScalaImpl(CatalystTypeConverters.scala:358)

Similar issue was on Spark Confluence, but it marked as resolved for 1.3 ver.

1

1 Answers

4
votes

You are mixing two different things - data types and DataFrame schema. When you create Row like this:

RowFactory.create(x._2().split("\t"))

you get Row(_: String, _: String) but your schema states that you have Row(_: Integer, _: String). Since there is no automatic type conversion you get the error.

To make it work you can either cast values when you create rows or define id as a StringType and use Column.cast method after you've created a DataFrame.