0
votes

I need help on the following scenario:

1) Memo table is the source table in hive. It has 5493656359 records.Its desc is as follows:

load_ts timestamp
memo_ban bigint
memo_id bigint
sys_creation_date timestamp
sys_update_date timestamp
operator_id bigint
application_id varchar(6)
dl_service_code varchar(5)
dl_update_stamp bigint
memo_date timestamp
memo_type varchar(4)    
memo_subscriber varchar(20)
memo_system_txt varchar(180)
memo_manual_txt varchar(2000)
memo_source varchar(1)
data_dt string
market_cd string

Partition information:
data_dt string
market_cd string

2)

This is the target table

CREATE TABLE IF NOT EXISTS memo_temprushi (
load_ts TIMESTAMP,
ban BIGINT,
attuid BIGINT, 
application VARCHAR(6),
system_note INT,
user_note INT, 
note_time INT, 
date TIMESTAMP)
PARTITIONED BY (data_dt  STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

3)

This is the initial load statement from source table Memo into target table memo_temprushi. Loads all records till date 2015-12-14:

SET hive.exec.compress.output=true;
SET mapred.output.compression.type=BLOCK;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
INSERT INTO TABLE memo_temprushi PARTITION (DATA_DT)
SELECT LOAD_TS,MEMO_BAN, OPERATOR_ID, APPLICATION_ID,
CASE WHEN LENGTH(MEMO_SYSTEM_TXT)=0 THEN 0 ELSE 1 END,
CASE WHEN LENGTH(MEMO_MANUAL_TXT)=0 THEN 0 ELSE 1 END,
HOUR(MEMO_DATE), MEMO_DATE, DATA_DT 
FROM tlgmob_gold.MEMO WHERE LOAD_TS < DATE('2015-12-15');

4)

For incremental load I want to insert the rest of the records i.e. from date 2015-12-15 onward. I'm using following query:

INSERT INTO TABLE memo_temprushi PARTITION (DATA_DT) 
SELECT MS.LOAD_TS,MS.MEMO_BAN, MS.OPERATOR_ID, MS.APPLICATION_ID, 
CASE WHEN LENGTH(MS.MEMO_SYSTEM_TXT)=0 THEN 0 ELSE 1 END, 
CASE WHEN LENGTH(MS.MEMO_MANUAL_TXT)=0 THEN 0 ELSE 1 END, 
HOUR(MS.MEMO_DATE), MS.MEMO_DATE, MS.DATA_DT 
FROM tlgmob_gold.MEMO MS JOIN (select max(load_ts) max_load_ts from memo_temprushi) mt 
ON 1=1 
WHERE 
ms.load_ts > mt.max_load_ts;

It launches 2 jobs. Initially it gives warning regarding stage being a cross product. The first job gets completely quite soon but second job remains stuck at reduce 33%. The log shows : [EventFetcher for fetching Map Completion Events] org.apache.hadoop.mapreduce.task.reduce.EventFetcher: EventFetcher is interrupted.. Returning It shows that the number of reducers is 1. Trying to increase the number of reducers through this command set mapreduce.job.reduces but it's not working.

Thanks

1

1 Answers

1
votes

You can try this.

  • Run "select max(load_ts) max_load_ts from memo_temprushi"
  • Add the value in the where condition of the query and remove the join condition of the query.

If it works, then you can develop shell script in which first query will get max value and then run the second query with out join.

Here is the sample shell script.

max_date=$(hive -e "select max(order_date) from orders" 2>/dev/null)

hive -e "select order_date from orders where order_date >= date_sub('$max_date', 7);"