0
votes

I am trying to remove duplicates from my Dataset in Spark SQL in Java. My dataset has three columns. Let's say the name of the column are name, timestamp, and score. The name is the String representation of employee name and timestamp is in long (epoch representation) of the activity that an employee does. The score is the integer filed representing the score of the employee.

Now, Let's say I have the following dataset:

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
John --> 1595242800000   -->  10
Bob  --> 1595246400000   -->  20
John --> 1595239200000   -->  10  

Note that in the above dataset the first and fourth row are the same.

When I use distinct() function over the above dataset by doing something like this

myDataset.distinct()

I get the result as

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
John --> 1595242800000   -->  10
Bob  --> 1595246400000   -->  20

The fourth row in this case is eliminated which is the expected behaviour.

What I wanted is convert the timestamp field into yyyy-MM-dd format and then remove duplicates with the combination of Name field. So from the original dataset the first, second and fourth row have the same value of date that is 2020-07-20 for Name = John. I would only want to have one row for the name = 'John'.

So from the above dataset after removing the duplicate row as explained above the resulting dataset would become

Name --> timestamp       -->  scores 
John --> 1595239200000   -->  10  
Bob  --> 1595246400000   -->  20

Note that I do not have any constraint to keep the first timestamp only for the same name. Any of the timestamp would work for me as long as they all are belonging to the same date.

What I have tried so far is

Dataset<Row> duplicateRemovedDataset = myDataset
                .dropDuplicates("Name", String.valueOf(functions.from_unixtime
                        (functions.col("timestamp").divide(1000), "yyyy-MM-dd")));

But this is producing me this error

User class threw exception: org.apache.spark.sql.AnalysisException: 
Cannot resolve column name "from_unixtime((timestamp / 1000), yyyy-MM-dd)" among list of my column name

How should I go about doing this?

Or in more general term how to call a custom function while calling dropDuplicates on a dataset?

2

2 Answers

1
votes

You can create a new column with date format you need and drop duplicates on columns you want as below

For Java

import static org.apache.spark.sql.functions.*;
Dataset<Row> resultDF = df.withColumn("date", to_date(to_timestamp(df.col("Timestamp").divide(1000)), "yyyy-MM-dd"));

resultDF.dropDuplicates("Name", "date")
        .drop("date")
        .show(false);

For Scala

import org.apache.spark.sql.functions._
val resultDF = df.withColumn("date", to_date(to_timestamp(col("Timestamp") / 1000), "yyyy-MM-dd"))

resultDF.dropDuplicates("Name", "date")
  .drop("date")
  .show(false)

Output:

+----+-------------+-----+
|Name|Timestamp    |score|
+----+-------------+-----+
|Bob |1595246400000|20   |
|John|1595239200000|10   |
+----+-------------+-----+
1
votes

Try this:


val myDataset = Seq(("John","1595239200000",10),           
              ("John", "1595242800000" ,10),
             ("Bob", "1595246400000" ,20),
             ("John", "1595239200000" ,10)
            )
.toDF("Name", "timestamp","score")
myDataset.show()

+----+-------------+-----+
|Name|    timestamp|score|
+----+-------------+-----+
|John|1595239200000|   10|
|John|1595242800000|   10|
| Bob|1595246400000|   20|
|John|1595239200000|   10|
+----+-------------+-----+

import org.apache.spark.sql.functions.{col, to_date, to_timestamp}

myDataset.withColumn("datestamp",to_date(from_unixtime($"timestamp" / 1000))).dropDuplicates("name","datestamp").show()

+----+-------------+-----+----------+
|name|    timestamp|score| datestamp|
+----+-------------+-----+----------+
| Bob|1595246400000|   20|2020-07-20|
|John|1595239200000|   10|2020-07-20|
+----+-------------+-----+----------+