0
votes

dateframe 1: crimedf

scala> crimedf.show(5,false)
+----------+-----------+-----------------------------------------------------------------------------------+-----+-----------------------+---------------------+-----------------+--------------------------+---+
|lat       |lng        |desc                                                                               |zip  |title                  |timeStamp            |twp              |addr                      |e  |
+----------+-----------+-----------------------------------------------------------------------------------+-----+-----------------------+---------------------+-----------------+--------------------------+---+
|40.2978759|-75.5812935|REINDEER CT & DEAD END;  NEW HANOVER; Station 332; 2015-12-10 @ 17:10:52;          |19525|EMS: BACK PAINS/INJURY |2015-12-10 17:40:00.0|NEW HANOVER      |REINDEER CT & DEAD END    |1  |
|40.2580614|-75.2646799|BRIAR PATH & WHITEMARSH LN;  HATFIELD TOWNSHIP; Station 345; 2015-12-10 @ 17:29:21;|19446|EMS: DIABETIC EMERGENCY|2015-12-10 17:40:00.0|HATFIELD TOWNSHIP|BRIAR PATH & WHITEMARSH LN|1  |
|40.1211818|-75.3519752|HAWS AVE; NORRISTOWN; 2015-12-10 @ 14:39:21-Station:STA27;                         |19401|Fire: GAS-ODOR/LEAK    |2015-12-10 17:40:00.0|NORRISTOWN       |HAWS AVE                  |1  |
|40.116153 |-75.343513 |AIRY ST & SWEDE ST;  NORRISTOWN; Station 308A; 2015-12-10 @ 16:47:36;              |19401|EMS: CARDIAC EMERGENCY |2015-12-10 17:40:01.0|NORRISTOWN       |AIRY ST & SWEDE ST        |1  |
|40.251492 |-75.6033497|CHERRYWOOD CT & DEAD END;  LOWER POTTSGROVE; Station 329; 2015-12-10 @ 16:56:52;   |null |EMS: DIZZINESS         |2015-12-10 17:40:01.0|LOWER POTTSGROVE |CHERRYWOOD CT & DEAD END  |1  |
+----------+-----------+-----------------------------------------------------------------------------------+-----+-----------------------+---------------------+-----------------+--------------------------+---+
only showing top 5 rows

crimedf.registerTempTable("crimedf")

dataframe 2: zipcode

scala> zipcode.show(5)
+---+----------+-----+---------+----------+--------+---+
|zip|      city|state| latitude| longitude|timezone|dst|
+---+----------+-----+---------+----------+--------+---+
|210|Portsmouth|   NH|43.005895|-71.013202|      -5|  1|
|211|Portsmouth|   NH|43.005895|-71.013202|      -5|  1|
|212|Portsmouth|   NH|43.005895|-71.013202|      -5|  1|
|213|Portsmouth|   NH|43.005895|-71.013202|      -5|  1|
|214|Portsmouth|   NH|43.005895|-71.013202|      -5|  1|
+---+----------+-----+---------+----------+--------+---+

zipcode.registerTemptable("zipcode")

My requirement is :

  1. Create a new column "problem" by extracting substring before ":" from column "title of table"crimedf"

  2. join the 2 tables and group the columns "state" and "problem" and generate count.

I get the desired output when i generate a new table from the first table and joining this with the second table.

scala> val newcrimedf = sqlContext.sql("select substring_index(title,':',1) as problem, zip from crimedf")
newcrimedf: org.apache.spark.sql.DataFrame = [problem: string, zip: int]

scala> newcrimedf.show(2)
+-------+-----+
|problem|  zip|
+-------+-----+
|    EMS|19525|
|    EMS|19446|
+-------+-----+

newcrimedf.registerTempTable("newcrimedf")

sqlContext.sql("select z.state, n.problem, count(*) as count 
from newcrimedf n 
JOIN zipcode z 
ON n.zip = z.zip 
GROUP BY z.state,n.problem 
ORDER BY count DESC").show
+-----+-------+-----+                                                           
|state|problem|count|
+-----+-------+-----+
|   PA|    EMS|44326|
|   PA|Traffic|29297|
|   PA|   Fire|13012|
|   AL|Traffic|    1|
|   TX|    EMS|    1|
+-----+-------+-----+

How to generate the same output from the original first table("crimedf") without creating a second table "newcrimedf"?

How to add a new column while joining? Pls help.

I tried doing it, but its wrong. Below is what I had tried:

sqlContext.sql("select z.state, c.problem, count(*) as count from 
(select zip, substring(title,':',1) problem from crimedf) c 
JOIN zipcode z ON c.zip = z.zip 
GROUP BY z.state,c.problem ORDER BY count desc").show
+-----+-------+-----+                                                           
|state|problem|count|
+-----+-------+-----+
|   PA|   null|86635|
|   TX|   null|    1|
|   AL|   null|    1|
+-----+-------+-----+
1
your query is perfect except for one mistake that instead of substring_index you used substring. :) see my answer belowRamesh Maharjan

1 Answers

0
votes

Create a new column "problem" by extracting substring before ":" from column "title of table"crimedf"

This can be achived using withColumn api and simple split function ( see the code below)

join the 2 tables and group the columns "state" and "problem" and generate count.

these can be achieved by using join, groupBy and count aggregation (see the code below)

Following code should work for you

crimedf.select("zip", "title")                    //selecting needed columns from crimedf
  .withColumn("problem", split($"title", ":")(0)) //generating problem column by splitting title column
  .join(zipcode, Seq("zip"))                      // joining with zipcode dataframe with zip column
  .groupBy("state", "problem")                   //grouping by state and problem
  .agg(count("state"))                           //counting the grouped data
  .show(false)

Edited

your sql query works perfect and gives the same result as the api used above. You just forgot to append _index in substring

sqlContext.sql("""select z.state, c.problem, count(*) as count from
              (select zip, substring_index(title,':',1) as problem from crimedf) c
                 JOIN zipcode z ON c.zip = z.zip
                 GROUP BY z.state,c.problem ORDER BY count desc""").show(false)