4
votes

I have a JSON file containing many fields. I read the file using spark's Dataset in java.

  • Spark version 2.2.0

  • java jdk 1.8.0_121

Below is the code.

SparkSession spark = SparkSession
              .builder()
              .appName("Java Spark SQL basic example")
              .config("spark.some.config.option", "some-value")
              .master("local")
              .getOrCreate();

Dataset<Row> df = spark.read().json("jsonfile.json");

I would like to use withColumn function with a custom UDF to add a new column.

UDF1 someudf = new UDF1<Row,String>(){
        public String call(Row fin) throws Exception{
            String some_str = fin.getAs("String");
            return some_str;
        }
    };
spark.udf().register( "some_udf", someudf, DataTypes.StringType );
df.withColumn( "procs", callUDF( "some_udf", col("columnx") ) ).show();

I get a cast error when I run the above code. java.lang.String cannot be cast to org.apache.spark.sql.Row

Questions:

1 - Is reading into a dataset of rows the only option? I can convert the df into a df of strings. but I will not be able to select fields.

2 - Tried but failed to define user defined datatype. I was not able to register the UDF with this custom UDDatatype. do I need user defined datatypes here?

3 - and the main question, how can I cast from String to Row?

Part of the log is copied below:

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
    at Risks.readcsv$1.call(readcsv.java:1)
    at org.apache.spark.sql.UDFRegistration$$anonfun$27.apply(UDFRegistration.scala:512)
        ... 16 more

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$27: (string) => string)

Your help will be greatly appreciated.

2

2 Answers

9
votes

You are getting that exception because UDF will execute on column's data type which is not Row. Consider we have Dataset<Row> ds which has two columns col1 and col2 both are String type. Now if we want to convert the value of col2 to uppercase using UDF.

We can register and call UDF like below.

spark.udf().register("toUpper", toUpper, DataTypes.StringType);
ds.select(col("*"),callUDF("toUpper", col("col2"))).show();

Or using withColumn

ds.withColumn("Upper",callUDF("toUpper", col("col2"))).show();

And UDF should be like below.

private static UDF1 toUpper = new UDF1<String, String>() {
    public String call(final String str) throws Exception {
        return str.toUpperCase();
    }
};
1
votes

Improving what @abaghel wrote. If you use the following import

import org.apache.spark.sql.functions;

Using withColumn, code should be as follows:

ds.withColumn("Upper",functions.callUDF("toUpper", ds.col("col2"))).show();