0
votes

Facing duplicate entry problem while inserting to the table.

I have been used Hadoop mapper for reading record from file.It success fully reads record from file.But while writing the record to mysql data base by Hadoop reducer, following error occured.

java.io.IOException: Duplicate entry '505975648' for key 'PRIMARY'

But Mysql table is remains empty.Unable to write the record to mysql table from Hadoop DBWritable reducer.

Following is error log:

WARNING: com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Connection.close() has already been called. Invalid operation in this state. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at com.mysql.jdbc.Util.handleNewInstance(Util.java:406) at com.mysql.jdbc.Util.getInstance(Util.java:381) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:984) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:956) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926) at com.mysql.jdbc.ConnectionImpl.getMutex(ConnectionImpl.java:3018) at com.mysql.jdbc.ConnectionImpl.rollback(ConnectionImpl.java:4564) at org.apache.hadoop.mapred.lib.db.DBOutputFormat$DBRecordWriter.close(DBOutputFormat.java:72) at org.apache.hadoop.mapred.ReduceTask$OldTrackingRecordWriter.close(ReduceTask.java:467) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:539) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:262)

Jun 04, 2014 1:23:36 PM org.apache.hadoop.mapred.LocalJobRunner$Job run WARNING: job_local_0001 java.io.IOException: Duplicate entry '505975648' for key 'PRIMARY' at org.apache.hadoop.mapred.lib.db.DBOutputFormat$DBRecordWriter.close(DBOutputFormat.java:77) at org.apache.hadoop.mapred.ReduceTask$OldTrackingRecordWriter.close(ReduceTask.java:467) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:531) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:262)

1

1 Answers

1
votes

The DBOutputFormat / DBRecordWriter does everything in a database transaction. While you may have nothing in the table right now, if you try to do two inserts in the same transaction with the same primary key, you will get this error, which is what is happening. To better trace this, you can add logging. You can do this by taking the code for the DBOutputFormat and making a new similarly named class. I called mine LoggingDBOutputFormat. Update your job code to use this new output format instead. For the new output format you change the close method to log your statements before they are executed:

    /** {@inheritDoc} */
public void close(TaskAttemptContext context) throws IOException {
  try {
      LOG.warn("Executing statement:" + statement);   

      statement.executeBatch();
    connection.commit();
  } catch (SQLException e) {
    try {
      connection.rollback();
    }
    catch (SQLException ex) {
      LOG.warn(StringUtils.stringifyException(ex));
    }
    throw new IOException(e.getMessage());
  } finally {
    try {
      statement.close();
      connection.close();
    }
    catch (SQLException ex) {
      throw new IOException(ex.getMessage());
    }
  }
}

You can then check the general log on the mysql side to see if anything was executed. Odds are you will see that your transaction was rolled back based of the error. To work around this, make sure the primary keys are unique. If updating/upserting was what you wanted instead, you can make an output/record writer that does that, but that is a different undertaking.