4
votes

I am working on a project with spark and scala and I am new to both but with lot of help from stackoverflow I have done all the data processing and stored the processed data in mysql. Now at last I am facing a problem and I don't understand how to tackle it. First time when I processed the data then I stored the dataframe using this method and first time table is empty.

      df.write.mode("append").jdbc("dburl", "tablename", "dbproperties"); 

Let say that my processed data is look like this in database.

      id      name       eid      number_of_visitis    last_visit_date
      1       John       C110     12                   2016-01-13 00:00:00
      2       Root       C111     24                   2016-04-27 00:00:00
      3       Michel     C112     8                    2016-07-123 00:00:00
      4       Jonny      C113     45                   2016-06-10 00:00:00

Now person named 'Root' with eid 'C111' visit office 2 times on '2016-08-30 00:00:00' now after processing this new data I need to update only this person record in database. How I will do that. Now the updated table should look like this.

      id      name       eid      number_of_visitis    last_visit_date
      1       John       C110     12                   2016-01-13 00:00:00
      2       Root       C111     26                   2016-08-30  00:00:00
      3       Michel     C112     8                    2016-07-123 00:00:00
      4       Jonny      C113     45                   2016-06-10 00:00:00

I have million of data in this table and if I load the full table in spark dataframe and update the desired record then it will take more time and also it does not make sense because why I load the full table when I want to update only one row.I tried this code but it added the new row to table rather than updating the row.

       df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");

Is there any way to do that in spark?

I have seen this on Internet can I do like this for update.

val numParallelInserts = 10
val batchSize = 1000

new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) =>
  val db = connect()

  val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)"
  val stmt = db.prepareStatement(sql)

  iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) =>
    batch foreach { session =>
      stmt.setString(1, session.id)
      stmt.setString(2, TimestampFormat.print(session.ts))
      stmt.addBatch()
    }
    stmt.executeBatch()
    db.commit();
    logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements")
  }

  db.close();
1
have you tried with "overwrite" mode?dsr301
overwrite recreate the table with not exact data types and delete all the older data and insert only new processed data.Atif Shahzad

1 Answers

3
votes

You can try using sql to do that. Store the updated (and even new) data in a new temporary table and then merge the temporary table into the main table.

One way to do that is -

  1. Update all the records in the main table using the temporary table

    update main_table set visits = main_table.visits + temp_table.visits from temp_table where main_table.eid = temp_table.eid;

  2. Delete all duplicate records from temporary table (that leaves only new records in the temporary table)

    delete from temp_table where main_table.eid = temp_table.eid;

  3. Insert all records from temporary table into main table

    insert into main_table select * from temp_table;

  4. Drop the temporary table

    drop table temp_table;