0
votes

I am not expert in RDD and looking for some answers to get here, I was trying to perform few operations on pyspark RDD but could not achieved , specially with substring.I know I can do that by converting RDD to DF, but wondering how this was being done earlier before pre DF era ? Are companies still prefer to do work in RDD or dataframes ?

My code:

rdd= sc.textFile("Sales.txt")
##Taking only required columns and changing the data types
rdd_map = rdd.map(lambda line: (int((line.split("|")[0])),int((line.split("|")[1])),line.split("|")[4]))
##Filtering the data
rdd_filter = rdd_map.filter(lambda x: (x[0] > 43668) & ('-' in x[2]))
## Trying to perform substring
rdd_clean = rdd_filter.map(lambda x: x.substr(x[2],1,3))

Data sample:

43665|63|OLD ORDER|Sport-100 Helmet, Re|HL-U509-R
43668|87|OLD ORDER|Sport-100 Helmet, Re|HL-U509-R

Complete error message:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 1 times, most recent failure: Lost task 0.0 in stage 50.0 (TID 152, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

1
Please post the whole error message and a few exemplary of Sales.txt.cronoik
Updated the required details in original post , also would like to know other details as posted in op.1st record: 43665|63|OLD ORDER|Sport-100 Helmet, Re|HL-U509-R 43668|87|OLD ORDER|Sport-100 Helmet, Re|HL-U509-Rrommy
what's your expected result?jxc
I want to get the first 3 chars from the result, HL- , However I tried using following code was able to get it successfully after some research , rdd_clean= rdd_filter.map(lambda x: x[2][1:3]) , But I also want to get the results along with other columns in RDDrommy

1 Answers

0
votes

I think you can probably simplify some of the transformation steps using flatMap() and a list comprehension:

>>> rdd = sc.parallelize([
      '43665|63|OLD ORDER|Sport-100 Helmet, Re|HL-U509-R'
    , '43668|87|OLD ORDER|Sport-100 Helmet, Re|HL-U509-R'
])

>>> rdd_clean = rdd.flatMap(lambda x: [ (int(e[0]), int(e[1]), e[4][:3]) for e in [x.split('|')] if ('-' in e[4]) & (int(e[0]) > 43665) ])

>>> rdd_clean.collect()
[(43668, 87, 'HL-')]    

Where, I use flatMap() to setup the three-item tuple, and move the filter() and taking the sub-string of x[2] into the list comprehension. If you insist on your original method, just do:

rdd_clean = rdd_filter.map(lambda x: (x[0], x[1], x[2][:3]))