2
votes

I want to map my Timestamp fields in Dataset having values like 2018-08-17T19:58:46.000+0000 to format like 2018-08-17 19:58:46.000, i.e. yyyy-MM-dd HH:mm:ss.SSS, and some columns to yyyy-MM-dd.

For example, I have a dataset DS1 with columns id, lastModif, created:

+------------------+----------------------------+----------------------------+
|Id                |lastModif                   |created                     |
+------------------+----------------------------+----------------------------+
|abc1              |2019-01-14T19:51:55.000+0000|2019-02-07T20:37:53.000+0000|
|AQA2              |2019-02-05T19:26:36.000+0000|2019-02-07T20:40:06.000+0000|
+------------------+----------------------------+----------------------------+ 

From above DS1 I need the lastModif column mapped to format yyyy-MM-dd HH:mm:ss.SSS and createdTime column mapped to yyyy-MM-dd.
I have similar DS2, DS3 with different column mapping.
I have kept a properties file from which it will fetch the mapping columns as keys and timestamp format as the values.
In the code I am keeping the list of mapping column, and non-mapping columns, and selecting the column:

String cols = "Id,created,lastModif";
String[] colArr = cols.split(",");
String mappedCols = "lastModif,created"; //hardcoding as of now.

List<String> mappedColList = Arrays.asList(mappedCols.split(","));
String nonMappedCols = getNonMappingCols(colArr, mappedCols.split(",")).toLowerCase();
List<String> nonMapped = Arrays.asList(nonMappedCols.split(","));

//column-mapping logic
filtered = tempDS.selectExpr(convertListToSeq(nonMapped),unix_timestamp($"lastModif","yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").as("lastModif"));
filtered.show(false);


public static Seq<String> convertListToSeq(List<String> inputList)
{
    return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
}

private static String getNonMappingCols(String[] cols, String[] mapped)
{
    String nonMappedCols = "";
    List<String> mappedList = Arrays.asList(mapped);

    for(int i=0; i<cols.length; i++)
    {
        if(!mappedList.contains(cols[i])){
            nonMappedCols += cols[i]+",";               
        }
    }
    nonMappedCols = nonMappedCols.substring(0, nonMappedCols.length()-1);

    return nonMappedCols;
}

How do I map the column to the required timestamp format?
And in the line of code tempDS.selectExpr(convertListToSeq(nonMapped),unix_timestamp($"lastModif","yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").as("lastModif")); the $"lastModif" is unidentified in Java.
And secondly this way is a static way i.e. hardcoding the mapping column. How do I map the columns from my List<String> mappedColList ?

2
did you try using new Column("lastModif") instead of $"lastModif" ?W Almir
Yes. Its giving compilation error "unix_timestamp(Column,String) undefined"aiman
I tried org.apache.spark.sql.functions.unix_timestamp(tempDS.col("lastModif"),"yyyy-MM-dd HH:mm:ss.SSS")...the compiler error went away, but the my data is in string of type yyyy-MM-ddTHH:mm:ss.SSS+Z eg: 2019-02-07T20:37:53.000+0000 and it is getting parsed to null.aiman

2 Answers

3
votes
  1. First of all, let's clarify your input data. You mentioned, that you had Timestamp, but the output format you listed seems to be just String value representing the following format yyyy-MM-dd'T'HH:mm:ss.SSSZ. Can you confirm this conclusion?

  2. In one of your comments you answered that the function unix_timestamp returns null in your attempt. Looking into the documentation for unix_timestamp(Column s, String p) we can see that it requires other formats to parse, otherwise it return null:

Parameters:
s - A date, timestamp or string. If a string, the data must be in a format that can be cast to a timestamp, such as yyyy-MM-dd or yyyy-MM-dd HH:mm:ss.SSSS
fmt - A date time pattern detailing the format of s when s is a string
Returns:
A timestamp, or null if s was a string that could not be cast to a timestamp or fmt was an invalid format

  1. If you input parameters are really String, I'd suggest you the following solution, using Spark SQL functions to_timestamp(Column s, String fmt) and date_format(Column dateExpr, String format)
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.to_timestamp;
import static org.apache.spark.sql.functions.date_format;
....
SparkSession spark = SparkSession
            .builder()
            .appName("datetime-transformation")
            .master("local[*]")
            .getOrCreate();

SomeDto someDto = SomeDto.builder()
            .id("abc1")
            .lastModif("2019-01-14T19:51:55.123+02:00")
            .created("2019-01-14T19:51:55.123+02:00")
            .build();

Dataset<Row> ds = spark.createDataset(Collections.singletonList(someDto), Encoders.bean(SomeDto.class)).toDF();

ds.printSchema();
ds.show(false);

Dataset<Row> dfm = ds
            .withColumn("lastModif", to_timestamp(col("lastModif"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
            .withColumn("created", date_format(to_timestamp(col("lastModif"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), "yyyy-MM-dd"));

dfm.printSchema();
dfm.show(false);

The output would be:

root
 |-- created: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lastModif: string (nullable = true)

+-----------------------------+----+-----------------------------+
|created                      |id  |lastModif                    |
+-----------------------------+----+-----------------------------+
|2019-01-14T19:51:55.123+02:00|abc1|2019-01-14T19:51:55.123+02:00|
+-----------------------------+----+-----------------------------+

root
 |-- created: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lastModif: timestamp (nullable = true)

+----------+----+-----------------------+
|created   |id  |lastModif              |
+----------+----+-----------------------+
|2019-01-14|abc1|2019-01-14 19:51:55.123|
+----------+----+-----------------------+
0
votes

This is how I made the mapping dynamic:

private static Dataset<Row> mapColumns(Properties mappings, String tableNm, String[] colArr, Dataset<Row> tempDS) throws Exception
{
    String mappedCols = "lastmodif,createdDate,endDate";
    Dataset<Row> filtered = null;
    Properties mappingCols = mappings;
    List<String> mapped = Arrays.asList(mappedCols.split(","));

    List<String> colsList = Arrays.asList(colArr);
    ArrayList<String> tempList = new ArrayList<String>();
    Iterator itrTmp = colsList.iterator();
    while(itrTmp.hasNext()){
        tempList.add((String)itrTmp.next());
    }

    Iterator itr = mapped.iterator();
    filtered = tempDS.selectExpr(convertListToSeq(colsList));

    while(itr.hasNext()){
        String column = itr.next().toString();
        String newCol = column+"_mapped";
        String propertyKey = tableNm+"-"+column;
        String propertyValue = mappingCols.getProperty(propertyKey);

        filtered = filtered.selectExpr(convertListToSeq(colsList))
                .withColumn(newCol, functions.regexp_replace(functions.substring(filtered.col(column), 0, 23),"T", " ")).alias(newCol)
                .drop(filtered.col(column));

        tempList.remove(column);
        tempList.add(newCol);
        colsList = tempList;
    }

    filtered = filtered.selectExpr(convertListToSeq(colsList)); 
    filtered.show(false);
}

public static Seq<String> convertListToSeq(List<String> inputList)
{
    return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
}

But the String to Timestamp conversion is still pending. As of now I am doing a substring, but this logic for all the columns whose data is of type yyyy-mm-ddThh:mm:ss.SSSZ or yyyy-mm-ddThh:mm:ss.SSS+0000 etc, but wont work if a column has data of type yyyy-mm-dd and the code will break. I have raised this here: how to convert string to timestamp.