Assuming that you have a fixed # of col and rows, one solution could be join both Df's by row index (in case you do not have id's for the records), and then iterate direct in the final DF [with all the columns of both DF's].
Something like this:
Schemas
DF1
root
|-- col1: double (nullable = true)
|-- col2: double (nullable = true)
|-- col3: double (nullable = true)
DF2
root
|-- col1: double (nullable = true)
|-- col2: double (nullable = true)
|-- col3: double (nullable = true)
df1
+----------+-----------+------+
| col1| col2| col3|
+----------+-----------+------+
|1.20000001| 1.21| 1.2|
| 2.1111| 2.3| 22.2|
| 3.2|2.330000001| 2.333|
| 2.2444| 2.344|2.3331|
+----------+-----------+------+
df2
+------+-----+------+
| col1| col2| col3|
+------+-----+------+
| 1.2| 1.21| 1.2|
|2.1111| 2.3| 22.2|
| 3.2| 2.33| 2.333|
|2.2444|2.344|2.3331|
+------+-----+------+
Added row index
df1
+----------+-----------+------+---+
| col1| col2| col3|row|
+----------+-----------+------+---+
|1.20000001| 1.21| 1.2| 0|
| 2.1111| 2.3| 22.2| 1|
| 3.2|2.330000001| 2.333| 2|
| 2.2444| 2.344|2.3331| 3|
+----------+-----------+------+---+
df2
+------+-----+------+---+
| col1| col2| col3|row|
+------+-----+------+---+
| 1.2| 1.21| 1.2| 0|
|2.1111| 2.3| 22.2| 1|
| 3.2| 2.33| 2.333| 2|
|2.2444|2.344|2.3331| 3|
+------+-----+------+---+
Combined DF
+---+----------+-----------+------+------+-----+------+
|row| col1| col2| col3| col1| col2| col3|
+---+----------+-----------+------+------+-----+------+
| 0|1.20000001| 1.21| 1.2| 1.2| 1.21| 1.2|
| 1| 2.1111| 2.3| 22.2|2.1111| 2.3| 22.2|
| 2| 3.2|2.330000001| 2.333| 3.2| 2.33| 2.333|
| 3| 2.2444| 2.344|2.3331|2.2444|2.344|2.3331|
+---+----------+-----------+------+------+-----+------+
This is how you can do that:
println("Schemas")
println("DF1")
df1.printSchema()
println("DF2")
df2.printSchema()
println("df1")
df1.show
println("df2")
df2.show
val finaldf1 = df1.withColumn("row", monotonically_increasing_id())
val finaldf2 = df2.withColumn("row", monotonically_increasing_id())
println("Added row index")
println("df1")
finaldf1.show()
println("df2")
finaldf2.show()
val joinedDfs = finaldf1.join(finaldf2, "row")
println("Combined DF")
joinedDfs.show()
val tolerance = 0.001
def isInValidRange(a: Double, b: Double): Boolean ={
Math.abs(a-b)<=tolerance
}
joinedDfs.take(10).foreach(row => {
assert( isInValidRange(row.getDouble(1), row.getDouble(4)) , "Col1 validation. Row %s".format(row.getLong(0)+1))
assert( isInValidRange(row.getDouble(2), row.getDouble(5)) , "Col2 validation. Row %s".format(row.getLong(0)+1))
assert( isInValidRange(row.getDouble(3), row.getDouble(6)) , "Col3 validation. Row %s".format(row.getLong(0)+1))
})
Note: Assert's are not serialized, a workaround is use take() to avoid errors.