0
votes

PySpark newb here and I'm trying to organize an unwieldy RDD of historical data. I read the data in (from WASB) and need to understand its structure. I have a general idea of the Schema, but because this is a large extract, I can see that not all records are consistent.

What I'm struggling with is referring to RDD elements by position so that I can try to extract some sense from the data. I can't commit to a Schema right now due to the inconsistency - so that means dataframes are not an option and I lose the flexible query style of a DF.

Quick summary of the environment and data in question:
Azure HDInsight Clusters, data in WASB
HDFS v. 2.7
YARN v. 2.7
Spark v 1.6 (HA config with 8 worker nodes (16 Core x 112 GB of RAM each)
Jupyter - PySpark
Data: oddly delimited "CSV" with approx. 765MM records

Read the data
ops = sc.textFile("wasb://[email protected]/raw_ops.csv")

Split on the funky delimiter
ops = ops.map(lambda s: s.split(u"\ufffd")).cache()

Show 5 records of RDD
ops.take(5)

[ [u'ALER RECOMMENDED BRAKE FLUID EXCHANGE - $139.88 ~', u'~PERFORMED DEALER RECOMMENDED BRAKE FLUID EXCHANGE USING A SPECIAL BRAKE FLUID EXCHANGE MACHINE,A PRESSURIZED SUPPLY OF MOC BRAKE FLUID', u'HIST_LD', u'2016-03-08 16:02:53', u'ARCHIVE'] ,[u'04638', u'0734140', u'2011-10-19', u'345267460', u'2', u'TIR', u'', u'0', u'685745051', u'TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATION', u'HIST_LD', u'2016-03-08 16:01:39', u'ARCHIVE'] ,[u'04638', u'0734140', u'2011-10-19', u'345267460', u'1', u'PRIME ITEM', u'', u'0', u'0', u'TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATION ~', u'~TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATIONS AND DOCUMENT PSI ~', u'~ ~', u'~20450 SET AT 36 PSI.', u'HIST_LD', u'2016-03-08 16:01:39', u'ARCHIVE'] ,[u'12093', u'0399468', u'2011-10-19', u'345268559', u'2', u'201', u'', u'1.5', u'0', u'REPLACED GAS CAP AND SANDED FILLER NECK', u'HIST_LD', u'2016-03-08 16:07:15', u'ARCHIVE'] ,[u'12093', u'0399468', u'2011-10-19', u'345268559', u'1', u'PRIME ITEM', u'', u'0', u'0', u'REPLACED GAS CAP AND SANDED FILLER NECK ~', u'~REPLACE GAS CAP AND SAND FILLER NECK', u'HIST_LD', u'2016-03-08 16:07:15', u'ARCHIVE'] ]

I see that the 3rd column might be a date, how do I extract this value from each row in the RDD?
(Pseudo code sample looking for 2013 data here):
ops.filter(lambda x[2]: year(x[2])==2013)

I'm finding limited documentation on how to do this online - especially as it pertains to wrangling inconsistently-structured data without a decisive Schema. Bottom line is what should that line of "pseudo code" actually be?

My ultimately goal is to parse out data from 2013-2015, partition these into their own dataframes and write them to Hive. Thanks for the help!

2

2 Answers

0
votes

So this one is a one way to solve the problem:

from datetime import datetime  

def only_pass(maybe_date):  
    try:  
        datetime.strptime(maybe_date,"%Y-%m-%d").date()  
        return 1  
    except Exception as err:    
        return 0   

only_rows_with_dates = rdd.filter(lambda row: only_pass(row[2]) == 1)  
  • Can't align nothing here :/ hope you get the point.
0
votes

Best option I think at the moment is to

  1. Split the rows in the RDD based on the delimiter

rddNew = rddOriginal.map(lambda s: s.split(u"\ufffd")).cache()

  1. Search the element you suspect is the Date in question and create a new RDD for the time "partition" using RegEx skills from the "re" library

import re rdd2013 = opcode.filter(lambda x : re.search('^2013', x[2]))