3
votes

I'm working on a sqoop incremental job to load data from mysql to hdfs. Below is the following scenarios.

Scenario 1: Below are the records inserted in sample table in mysql.

select * from sample;
+-----+--------+--------+---------------------+
| id  | policy | salary | updated_time        |
+-----+--------+--------+---------------------+
| 100 |      1 |   4567 | 2017-08-02 01:58:28 |
| 200 |      2 |   3456 | 2017-08-02 01:58:29 |
| 300 |      3 |   2345 | 2017-08-02 01:58:29 |
+-----+--------+--------+---------------------+

Below is the table structure of sample table in mysql:

create table sample (id int not null primary key, policy int, salary int, updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP);

I'm trying to import this to hdfs by creating a sqoop job as below

sqoop job --create incjob -- import --connect jdbc:mysql://localhost/retail_db --username root -P --table sample --merge-key id --split-by id --target-dir /user/cloudera --append --incremental lastmodified --check-column updated_time -m 1

After executing sqoop job below is the output records in hdfs.

$ hadoop fs -cat /user/cloudera/par*
100,1,4567,2017-08-02 01:58:28.0
200,2,3456,2017-08-02 01:58:29.0
300,3,2345,2017-08-02 01:58:29.0

Scenario 2: After inserting few new records and updating existing records in sample table. Below is the sample table.

select * from sample;
+-----+--------+--------+---------------------+
| id  | policy | salary | updated_time        |
+-----+--------+--------+---------------------+
| 100 |      6 |   5638 | 2017-08-02 02:01:09 |
| 200 |      2 |   7654 | 2017-08-02 02:01:10 |
| 300 |      3 |   2345 | 2017-08-02 01:58:29 |
| 400 |      4 |   1234 | 2017-08-02 02:01:17 |
| 500 |      5 |   6543 | 2017-08-02 02:01:18 |
+-----+--------+--------+---------------------+

After running the same sqoop job below is the records in hdfs.

hadoop fs -cat /user/cloudera/par*
100,1,4567,2017-08-02 01:58:28.0
200,2,3456,2017-08-02 01:58:29.0
300,3,2345,2017-08-02 01:58:29.0
100,6,5638,2017-08-02 02:01:09.0
200,2,7654,2017-08-02 02:01:10.0
400,4,1234,2017-08-02 02:01:17.0
500,5,6543,2017-08-02 02:01:18.0

Here the updated records in mysql are inserted as new records in hdfs, instead of updating the existing records in hdfs. I have used both --merge-key as well as --append in my sqoop job conf. Could any help me on how to resolve this issue.

2

2 Answers

3
votes

You are using --merge-key --append and lastmodified together. This is nor correct.

  • --incremental append mode Append data to an existing dataset in HDFS. You should specify append mode when importing a table where new rows are continually being added with increasing row id values

  • --incremental lastmodified mode - You should use this when rows of the source table may be updated, and each such update will set the value of a last-modified column to the current timestamp.

  • --merge-key - The merge tool runs a MapReduce job that takes two directories as input: a newer dataset, and an older one. These are specified with --new-data and --onto respectively. The output of the MapReduce job will be placed in the directory in HDFS specified by --target-dir.

  • --last-value (value) Specifies the maximum value of the check column from the previous import. If you run sqoop from the command line, without Sqoop job, then you have to add --last-value parameter

In your case there are some new record and some records are also updated so you need go with lastmodified mode.

Your Sqoop command will be:

sqoop job --create incjob -- import --connect jdbc:mysql://localhost/retail_db --username root -P --table sample --merge-key id --target-dir /user/cloudera --incremental lastmodified --check-column updated_time -m 1

Since you have specified only one mapper there is no need of --split-by.

0
votes
  1. I understand that you are trying to update the existing records in HDFS whenever there is change happens in the Source MySQL table.
  2. You should use --append only when you dont want to update the changed records in source table.
  3. Another approach is you can to try to migrate the changed records in a separate directory as delta_records and then join it with the base_records. Please see hortonworks link for more clarity