0
votes

I am trying to add a new column to my Spark Dataframe. New column added will be of a size based on a variable (say salt) post which I will use that column to explode the dataset to use in salted join.

Currently, I am using consecutive lit in an array function but that has a problem that it cannot be parameterized and looks worst as a coding practice. My current implementation looks something like below.

int salt =3;

Dataset<Row> Reference_with_Salt_Col = Reference.withColumn("salt_array", array(lit(0), lit(1), lit(2)));

I have referred and looked at various methods but none of them seems to solve the problem in Java. functions.typedlit approach though works in Python/Scala doesn't seem to work in Java. Further passing an array or list also doesn't help with spark giving the error on the same.

I am using Spark 2.2.0 and Java 1.8 versions

1
Try converting the java list to scala seq before calling typedlit. See thismck
Hi mck, Did try this but was not able to achieve it. Might be due to some scala version mismatch. Was able to use the map implementation. Although below one from @blackbishop is straight forward implementation that does seem to do the worl.Mayank Aggarwal

1 Answers

1
votes

You can use array function, but first convert each element of the list into lit. Example by using map function on stream:

import org.apache.spark.sql.*;

import java.util.Arrays;
import java.util.List;

// example of input dataframe
Reference.show();

//+-----+
//|label|
//+-----+
//|    a|
//|    b|
//|    c|
//+-----+

List<Integer> salt_array = Arrays.asList(0, 1, 2);

Reference.withColumn(
        "salt_array",
        functions.array(salt_array.stream().map(functions::lit).toArray(Column[]::new))
).show();

//+-----+----------+
//|label|salt_array|
//+-----+----------+
//|    a| [0, 1, 2]|
//|    b| [0, 1, 2]|
//|    c| [0, 1, 2]|
//+-----+----------+

To generate an array that holds a range stating from 0 to salt - 1, you can use IntStream.rangeClosed like this:

import java.util.stream.IntStream;

int salt = 3;

Dataset<Row> Reference_with_Salt_Col = Reference.withColumn(
        "salt_array",
        functions.array(IntStream.rangeClosed(0, salt - 1).mapToObj(functions::lit).toArray(Column[]::new))
);