It seems that spark can't apply schema (differs from String) for DataFrame when converting it from RDD through Row object. I've tried both on Spark 1.4 and 1.5 version.
Snippet (Java API):
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
directKafkaStream.foreachRDD(rdd -> {
rdd.foreach(x -> System.out.println("x._1() = " + x._1()));
rdd.foreach(x -> System.out.println("x._2() = " + x._2()));
JavaRDD<Row> rowRdd = rdd.map(x -> RowFactory.create(x._2().split("\t")));
rowRdd.foreach(x -> System.out.println("x = " + x));
SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
StructField id = DataTypes.createStructField("id", DataTypes.IntegerType, true);
StructField name = DataTypes.createStructField("name", DataTypes.StringType, true);
List<StructField> fields = Arrays.asList(id, name);
StructType schema = DataTypes.createStructType(fields);
DataFrame sampleDf = sqlContext.createDataFrame(rowRdd, schema);
sampleDf.printSchema();
sampleDf.show();
return null;
});
jssc.start();
jssc.awaitTermination();
It produces following output if specify DataTypes.StringType for "id" field:
x._1() = null
x._2() = 1 item1
x = [1,item1]
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
+---+-----+
| id| name|
+---+-----+
| 1|item1|
+---+-----+
For specified code it throws error:
x._1() = null
x._2() = 1 item1
x = [1,item1]
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
15/09/16 04:13:33 ERROR JobScheduler: Error running job streaming job 1442402013000 ms.0
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$IntConverter$.toScalaImpl(CatalystTypeConverters.scala:358)
Similar issue was on Spark Confluence, but it marked as resolved for 1.3 ver.