0
votes

I am trying to iterate over Dataset rows in java and then access specific column to find its value stored as a key in a JSON file and get its value. The found value needs to be stored as a new column value in that row for all rows.

I see that my cluster_val obtained from the JSON file is not NULL but when I try to add it as a column, I am get Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.NullPointerException

So far I have this:

Dataset<Row> df = spark.read().format("csv").load(path);
        df.foreach((ForeachFunction<Row>) row ->
    {
        String df_col_val = (String) row.get(6);
        System.out.println(row.get(6));
        if(df_col_val.length() > 5){
            df_col_val = df_col_val.substring(0, df_col_val.length() - 5 + 1); //NOT NULL
        }
        System.out.println(df_col_val); 
        String cluster_val = (String) jo.get(df_col_val); //NOT NULL
        System.out.println(cluster_val);
        df.withColumn("cluster", df.col(cluster_val));  // NULL POINTER EXCEPTION. WHY?

        df.show();

    });

So mostly I need help reading dataset row by row and perform the subsequent operations as above. Unable to find much references online. Please refer me to correct sources if possible. Also, if there is a shorthand way of doing this, let me know.

So I figured out df.col(cluster_val) is throwing the exception as there is no existing column. How dod you convert the string name of the column to Column type required to pass in withColumn() function pf Dataset

UPDATE:

SO I tried the following and here the new column I am trying to get new col's value using udf but it's null if used like this way:

Dataset<Row> df = spark.read().format("csv").option("header", "true").load(path);

            Object obj = new JSONParser().parse(new FileReader("path to json"));
            JSONObject jo = (JSONObject) obj;

                df.withColumn("cluster", functions.lit((String) jo.get(df.col(df_col_val)))));
        df.show();
1

1 Answers

1
votes

While using df.withColumn needs first argument as column name and second argument as value for this column. If you wants to add new column with name as "cluster" and value as from some json value then you can use "lit" function as lit(cluster_val) where cluster_val holds value.

You have to import "org.apache.spark.sql.functions._" to use lit function.

I hope it helps.