I'm working on POC where I have to update the account number in DB with tokens. I read the data into a dataset dsRecords(about 2M records). I have another routine that had captured distinct account numbers and got the tokens, the mapping is stored in HashMap.
Dataset<Row> applySwappedTokens(Dataset<Row> dsRecords, Map<String, String> mappedTokens){
}
Now, I must iterate through the dataset to do the following - 1. Read account number column (accountNumber) value and update (I know dataset is immutable. So, updating the dataset means creating copy of dataset with updated rows) it with the token value from mappedTokens. This can be achieved through JOIN or other operations but I am not spending effort on this because of 2nd task. 2. Read another XML blob column and find the account number and update it.
All the options I have tried so far are resulting into compile-time error or test compilation error due to non-serializable code. Most online resources are in Scala and not Java. Please help.
Spark 2.1 Java 8
Approach1 - Couldn't test due to serialization error.
Dataset<Row> output = sparkSession.sqlContext().createDataFrame(dsRecords.javaRDD().map(row -> {
return RowFactory.create(row.get(0), row.get(1), row.get(2), swapToken(row.get(3)),row.get(4));
}), dsRecords.schema());
return output;
String swapToken(Object inputToken) {
return mappedTokens.get(inputToken);//mappedToken will have to be instance field.
}
Approach2- Incomplete.
dsRecords.foreach((ForeachFunction<Row>) row -> {
Integer index = row.fieldIndex("accountNumber");
String pan = row.getString(index);
String swap = this.swapToken(pan);
//TODO: create a dataset with rows from dsRecords but swap value.
});
Approach 3 - Use UDF with map function
Create a UDF2 (that takes 2 input parameters viz. accountNumber and mappedToken and returns token). It seems UDF can only take column values
update 1 - UDF So, I implemented UDF (AFK, will post the code later): 1. Defined UDF1 ‘updateToken’ to pass the xml column value and return the updated xml value. 2. The HashMap instance ‘mappedTokens’ that has the account-token pair mapping is made static. It is accessed inside my UDF function to find the account in xml string and update with the token.
I could test my applySwappedTokens function which calls the above UDF on dataset ‘withColumn’. However when I run the spark program, I see that ‘mappedToken’ has ‘null’ data and hence, xml column gets updated with empty data. I think the static ‘mappedTokens’ is either in another JVM or driver (even in local the spark creates isolated driver, executor). It’s frustrating that there is no simple solution to iterate and update rows in spark.
Dataset<Row> processByRow(Dataset<Row> dsRecords, SparkSession sparkSession) {
sparkSession.udf().register("updateToken", updateToken, DataTypes.StringType);
return ds = dsRecords.withColumn("eventRecordTokenText", callUDF("updateToken", dsRecords.col("eventRecordTokenText")));
}
static UDF1 updateToken = new UDF1<String, String>() {
public String call(final String tokenText) throws Exception {
// xml operations here..
for (int nodeIndex = 0; nodeIndex < nList.getLength(); nodeIndex++) {
Node thisNode = nList.item(nodeIndex);
if (thisNode.getAttributes().getNamedItem("ProcessTokenValue") != null && thisNode.getAttributes()
.getNamedItem("ProcessTokenValue").getNodeValue().equalsIgnoreCase("true")) {
Node valueNode = thisNode.getAttributes().getNamedItem("Value");
String thisToken = valueNode.getNodeValue();
String newToken = mappedTokens.get(thisToken); // *returns null values from the map*
if(newToken != null && !newToken.isEmpty())
valueNode.setNodeValue(newToken);
}
}
// more xml operations here..
return output;
}
};
update 2 - iterate & update Now, I am trying row by row traversing ..
Dataset<Row> processByRow1(Dataset<Row> dsRecords, SparkSession sparkSession) {
List<MongoRecordSmall> newRows = new ArrayList<MongoRecordSmall>();
dsRecords.foreach((ForeachFunction<Row>) record -> {
String currentToken = record.getAs(AppConstants.TokenCol);
String newToken = mappedTokens.get(currentToken);
newRows.add(new MongoRecordSmall(record.getString(0), record.getString(1), newToken, record.getString(3)));
logger.error(“Size plus=“+newRows.size());
});
return sparkSession.createDataFrame(newRows, MongoRecordSmall.class);
}
This is throwing serliazation error. It seems (https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html) my class where the above logic exists, is being serlalized and sent to worker nodes and failing in doing so.