1
votes

I have two views in my hive

+------------+
| Table_1    |
+------------+
| hash       |
| campaignId |
+------------+

+-----------------+
| Table_2         |
+-----------------+
| campaignId      |
| accountId       |
| parentAccountID |
+-----------------+

Now I have to fetch 'Table_1' data filtered by accountId & parentAccountID, for which I have written the following query:

SELECT /*+ MAPJOIN(T2) */ T1.hash, COUNT(T1.campaignId) num_campaigns
FROM Table_1 T1
JOIN Table_2 T2 ON T1.campaignId = T2.campaignId
WHERE (T2.accountId IN ('aid1', 'aid2') OR T2.parentAccountID IN ('aid1', 'aid2')
GROUP BY T1.hash

This query is working but slow. Is there any better alternative to this (JOIN)?

I am reading Table_1 from kafka through spark.
Slide Duration is 5 sec
Window Duration is 2 minutes

While Table_2 is in RDBMS which spark is reading through jdbc and this has 4500 records.

Every 5 sec, kafka pumps in approximately 2K records in CSV format.
I need the data to be processed within 5 seconds but currently its taking between 8 to 16 seconds.

As per suggestions:

  1. I have repartitioned Table_1 by columns campaignId & hash respectively.
  2. I have repartitioned Table_2 by columns accountId & parentAccountID respectively.
  3. I have implemented MAPJOIN.

But still no improvement.

NOTE: If I remove the window duration, then the process does get executed within time. May be because of less data to process. But that is not the requirement.

5
(1) How would you define "slow"? (2) What volumes are we talking about? - David דודו Markovitz
I am using this in spark streaming which process data every 5 sec. But its taking more than 10 sec to process each batch. (Note: Slide duration is 5 sec but window duration is 10 Sec). Each batch has approx 24K records. - Sandeep Kumar Roy
Do you need the 'hash' column? - David דודו Markovitz
Yes. I am grouping by it. - Sandeep Kumar Roy
Are you doing this query for every RDD (of DStream) on every stream? Because I feel creating HiveContext & Hive query during every stream can be expensive. - KiranM

5 Answers

0
votes

With the right indexes, the following can be faster:

SELECT T1.*
FROM Table_1 T1 JOIN
     Table_2 T2
     ON T1.campaignId = T2.campaignId
WHERE T2.accountId IN ('aid1', 'aid2') 
UNION ALL
SELECT T1.*
FROM Table_1 T1 JOIN
     Table_2 T2
     ON T1.campaignId = T2.campaignId
WHERE T2.parentAccountID IN ('aid1', 'aid2') AND
      T2.accountId NOT IN ('aid1', 'aid2') ;

The first can take account of an index on Table_2(accountId, campaignId) and the second on Table_2(parentAccountID, accountId, campaignId).

0
votes

Since this is Hive we're talking about, you need to look at more than just traditional DBMS.

  • reduce IO. Use a compressed columnar format for your data. ORC or Parquet. Not RC. Do this first, convert your table into ORC. Nothing else will make much of a dent unless the data is compressed and columnar.
  • Choose the proper JOIN strategy for Hive. This old 2011 paper is still relevant.
  • Bucketize your tables
  • Use a modern execution engine: Tez or Spark.
0
votes

If T2 filtered is small enough to fit in memory, try to rewrite query and move filter into subquery and see if join will be executed on mapper. Also, you do not need columns from T2, Left semi join may be used instead of inner join:

set hive.cbo.enable=true; 
set hive.auto.convert.join=true;

SELECT T1.* 
FROM Table_1 T1 
LEFT SEMI JOIN 
     (select campaignId  from Table_2 T2 
        where T2.accountId IN ('aid1', 'aid2') 
           OR T2.parentAccountID IN ('aid1', 'aid2')
     ) T2 ON T1.campaignId = T2.campaignId 
;
0
votes

I would recommend you to use the native Spark transformations rather than HiveSQL:

1.Read the data from Table_2 (RDBMS) into RDD & put it in cache Ex:

rddTbl1.map(campaignIdKey, (accountId, parentAccountId)) //filter out before getting into RDD if needed
rddTbl2.cache()

2.Now read Table_1 stream (Kafka)

//get campaigns of relevant account & parentaccountid
val rddTbl2_1 = rddTbl2.filter(x => x._2._1.equals("aid1") || x._2._1.equals("aid2") || x._2._2.equals("aid1") || x._2._2.equals("aid2"))

dstream.foreachRDD{ rddTbl1 =>
  rddTbl1.map(x => x._2.split(",")).
          map(x => (x(1), x(2)). //campaignId, hash
          join(rddTbl2_1).
          map(x => (x._2._1, 1)). //get (hash,1)
          reduceByKey(_+_).
          foreach(println) //save it if needed
}
0
votes

Ok..

Here is what i finally did.

I created a hash of Table_2.
And then by using broadcast variable I passed that data to every node.

This save me the hassle of doing join.

Thank you all for your time and help. Happy coding :)