I am working on a project with spark and scala and I am new to both but with lot of help from stackoverflow I have done all the data processing and stored the processed data in mysql. Now at last I am facing a problem and I don't understand how to tackle it. First time when I processed the data then I stored the dataframe using this method and first time table is empty.
df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");
Let say that my processed data is look like this in database.
id name eid number_of_visitis last_visit_date
1 John C110 12 2016-01-13 00:00:00
2 Root C111 24 2016-04-27 00:00:00
3 Michel C112 8 2016-07-123 00:00:00
4 Jonny C113 45 2016-06-10 00:00:00
Now person named 'Root' with eid 'C111' visit office 2 times on '2016-08-30 00:00:00' now after processing this new data I need to update only this person record in database. How I will do that. Now the updated table should look like this.
id name eid number_of_visitis last_visit_date
1 John C110 12 2016-01-13 00:00:00
2 Root C111 26 2016-08-30 00:00:00
3 Michel C112 8 2016-07-123 00:00:00
4 Jonny C113 45 2016-06-10 00:00:00
I have million of data in this table and if I load the full table in spark dataframe and update the desired record then it will take more time and also it does not make sense because why I load the full table when I want to update only one row.I tried this code but it added the new row to table rather than updating the row.
df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");
Is there any way to do that in spark?
I have seen this on Internet can I do like this for update.
val numParallelInserts = 10
val batchSize = 1000
new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) =>
val db = connect()
val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)"
val stmt = db.prepareStatement(sql)
iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) =>
batch foreach { session =>
stmt.setString(1, session.id)
stmt.setString(2, TimestampFormat.print(session.ts))
stmt.addBatch()
}
stmt.executeBatch()
db.commit();
logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements")
}
db.close();