From what I gather, you are trying to add an index (with consecutive values) to a dataframe. Unfortunately, there is no built in function that does that in Spark. You can only add an increasing index (but not necessarily with consecutive values) with df.withColumn("index", monotonicallyIncreasingId
).
Nonetheless, there exists a zipWithIndex
function in the RDD API that does exactly what you need. We can thus define a function that transforms the dataframe into a RDD, adds the index and transforms it back into a dataframe.
I'm not an expert in spark in java (scala is much more compact) so it might be possible to do better. Here is how I would do it.
public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {
JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {
Row r = t._1;
Long index = t._2 + 1;
ArrayList<Object> list = new ArrayList<>();
r.toSeq().iterator().foreach(x -> list.add(x));
list.add(index);
return RowFactory.create(list);
});
StructType newSchema = df.schema()
.add(new StructField(name, DataTypes.LongType, true, null));
return df.sparkSession().createDataFrame(rdd, newSchema);
}
And here is how you would use it. Notice what the built in spark function does in contrast with what our approach does.
Dataset<Row> df = spark.range(5)
.withColumn("index1", functions.monotonicallyIncreasingId());
Dataset<Row> result = zipWithIndex(df, "good_index");
// df
+---+-----------+
| id| index1|
+---+-----------+
| 0| 0|
| 1| 8589934592|
| 2|17179869184|
| 3|25769803776|
| 4|25769803777|
+---+-----------+
// result
+---+-----------+----------+
| id| index1|good_index|
+---+-----------+----------+
| 0| 0| 1|
| 1| 8589934592| 2|
| 2|17179869184| 3|
| 3|25769803776| 4|
| 4|25769803777| 5|
+---+-----------+----------+