2
votes

I have a query to join the tables. How do I optimize to run it faster?

val q = """
          | select a.value as viewedid,b.other as otherids
          | from bm.distinct_viewed_2610 a, bm.tets_2610 b
          | where FIND_IN_SET(a.value, b.other) != 0 and a.value in (
          |   select value from bm.distinct_viewed_2610)
          |""".stripMargin
val rows = hiveCtx.sql(q).repartition(100)

Table descriptions:

hive> desc distinct_viewed_2610;
OK
value                   string

hive> desc tets_2610;
OK
id                      int                                         
other                   string 

the data looks like this:

hive> select * from distinct_viewed_2610 limit 5;
OK
1033346511
1033419148
1033641547
1033663265
1033830989

and

hive> select * from tets_2610 limit 2;
OK

1033759023
103973207,1013425393,1013812066,1014099507,1014295173,1014432476,1014620707,1014710175,1014776981,1014817307,1023740250,1031023907,1031188043,1031445197

distinct_viewed_2610 table has 1.1 million records and i am trying to get similar id's for that from table tets_2610 which has 200 000 rows by splitting second column.

for 100 000 records it is taking 8.5 hrs to complete the job with two machines one with 16 gb ram and 16 cores second with 8 gb ram and 8 cores.

Is there a way to optimize the query?

spark-executor pic

2
Partition your tables (if possible), use ORC or Parquet for better file formats, adjust your memory settings so that you can fit the smaller joined table into memory (if possible), etc... Typically how you squeeze Spark performance. Question is: How many executors are you running? What settings did you give to them? Is there skew in your data that made one executor take a long time? - OneCricketeer
Thanks for the reply..Need clarifications on below point..1. I read from few documents that spark will not check in which format the data has been stored it directly access the data from the path.My queries on ORC and non-ORC table proved that as the response times seems to be the same? - Sudheer Nulu
Where did you read that? Of course Spark cares about data format. If you make a Dataset, then a columnar data format will be most optimal than a plain text file. - OneCricketeer
I have 4 executors and have just attached the snapshot of my executors tab which gives the info regarding to the memory allocation - Sudheer Nulu
Well, at first glance, you are only using 12 cores on the larger machine, and 4 on the other. I'm guessing 4 cores are being allocated for something else. If you want more executors, you can set 1 or 2 cores. Also it doesn't look like the memory is really being used at all. - OneCricketeer

2 Answers

0
votes

Now you are doing cartesian join. Cartesian join gives you 1.1M*200K = 220 billion rows. After Cartesian join it filtered by where FIND_IN_SET(a.value, b.other) != 0

Analyze your data. If 'other' string contains 10 elements in average then exploding it will give you 2.2M rows in table b. And if suppose only 1/10 of rows will join then you will have 2.2M/10=220K rows because of INNER JOIN.

If these assumptions are correct then exploding array and join will perform better than Cartesian join+filter.

select distinct a.value as viewedid, b.otherids
  from bm.distinct_viewed_2610 a
       inner join (select e.otherid, b.other as otherids 
                     from bm.tets_2610 b
                          lateral view explode (split(b.other ,',')) e as otherid
                  )b on a.value=b.otherid

And you do not need this :

and a.value in (select value from bm.distinct_viewed_2610)

Sorry I cannot test query, do it yourself please.

0
votes

If you are using orc formate change to parquet as per your data i woud say choose range partition.

Choose proper parallization to execute fast.

I have answred on follwing link may be help you.

Spark doing exchange of partitions already correctly distributed

Also please read it

http://dev.sortable.com/spark-repartition/