1
votes

I want to join two dataframes based on the following condition: if df1.col("name")== df2.col("name") and df1.col("starttime") is greater than df2.col("starttime").

the first part of the condition is ok, I use "equal" method of the column class in spark sql, but for the "greater than" condition, when I use the following syntax in java":

df1.col("starttime").gt(df2.col("starttime"))

It does not work, It seems "gt" function of column in spark sql, only accepts numerical value types, it does not work properly when you pass column type as its input parameter. The program finishes normally but the results are wrong, it does not find any rows in the dataframe that satisfy my condition, while I know that such rows exist in the dataframe.

any idea on how should I implement comparison between two column types in spark sql?(e.g. if one column is greater than other column in another dataframe)

2
Is not clear what you mean with "one column is greater than other column". Please add an example.pheeleeppoo
How about using a join expression and passing a string with a hive/sql condition?summerbulb
How can I use a join expression and pass a string with sql condition?A.B.
My mistake. It can't be done in a string.summerbulb

2 Answers

0
votes

Try applying .gt after you have applied org.apache.spark.sql.functions.to_utc_timestamp to your columns first:

rdd1.toDF("date1", ...)
    .join(rdd2.toDF("date2", ...), to_utc_timestamp('date1, pattern).gt(to_utc_timestamp('date2, pattern))

where pattern provides the format of the timestamp string.

0
votes

I ran the following code:

    HiveContext sqlContext = new HiveContext(sc);

    List<Event> list = new ArrayList<>();
    list.add(new Event(1, "event1", Timestamp.valueOf("2017-01-01 00:00:00"), Timestamp.valueOf("2017-01-03 00:00:00")));
    list.add(new Event(2, "event2", Timestamp.valueOf("2017-01-02 00:00:00"), Timestamp.valueOf("2017-01-03 00:00:00")));

    List<Event> list2 = new ArrayList<>();
    list2.add(new Event(1, "event11", Timestamp.valueOf("2017-01-02 00:00:00"), Timestamp.valueOf("2017-01-10 00:00:00")));
    list2.add(new Event(2, "event22", Timestamp.valueOf("2017-01-01 00:00:00"), Timestamp.valueOf("2017-01-15 00:00:00")));

    DataFrame df1 = getDF(sc, sqlContext, list);
    DataFrame df2 = getDF(sc, sqlContext, list2);

    df1.join(df2,df1.col("startTime").gt(df2.col("startTime"))).show();

And here is the result I got:

+---+------+--------------------+--------------------+---+-------+--------------------+--------------------+
| id|  name|           startTime|             endTime| id|   name|           startTime|             endTime|
+---+------+--------------------+--------------------+---+-------+--------------------+--------------------+
|  2|event2|2017-01-02 00:00:...|2017-01-03 00:00:...|  2|event22|2017-01-01 00:00:...|2017-01-15 00:00:...|
+---+------+--------------------+--------------------+---+-------+--------------------+--------------------+

Seems to me like it works as expected.

Also, the spark code (version 1.6 here) says the same story.