I am trying to iterate over Dataset rows in java and then access specific column to find its value stored as a key in a JSON file and get its value. The found value needs to be stored as a new column value in that row for all rows.
I see that my cluster_val
obtained from the JSON file is not NULL but when I try to add it as a column, I am get Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.NullPointerException
So far I have this:
Dataset<Row> df = spark.read().format("csv").load(path);
df.foreach((ForeachFunction<Row>) row ->
{
String df_col_val = (String) row.get(6);
System.out.println(row.get(6));
if(df_col_val.length() > 5){
df_col_val = df_col_val.substring(0, df_col_val.length() - 5 + 1); //NOT NULL
}
System.out.println(df_col_val);
String cluster_val = (String) jo.get(df_col_val); //NOT NULL
System.out.println(cluster_val);
df.withColumn("cluster", df.col(cluster_val)); // NULL POINTER EXCEPTION. WHY?
df.show();
});
So mostly I need help reading dataset row by row and perform the subsequent operations as above. Unable to find much references online. Please refer me to correct sources if possible. Also, if there is a shorthand way of doing this, let me know.
So I figured out df.col(cluster_val)
is throwing the exception as there is no existing column. How dod you convert the string name of the column to Column type required to pass in withColumn()
function pf Dataset
UPDATE:
SO I tried the following and here the new column I am trying to get new col's value using udf but it's null if used like this way:
Dataset<Row> df = spark.read().format("csv").option("header", "true").load(path);
Object obj = new JSONParser().parse(new FileReader("path to json"));
JSONObject jo = (JSONObject) obj;
df.withColumn("cluster", functions.lit((String) jo.get(df.col(df_col_val)))));
df.show();