1
votes

I wanted to sort the Products by Product price. Below are the lines of code-

products = sc.textFile("/user/cloudera/sqoop_import/products")

Getting product category ID from 2nd column

productsMap = products.map(lambda rec: (rec.split(",")[1], rec))

Group by category id

productsGroupBy = productsMap.groupByKey()

To sort by product price by typecasting Product price from string to float:

for i in productsGroupBy.map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]))).collect(): print(i)

I am not able to typecast few values for the product price which are having Null values. So, is there any way to remove the records having Null values for this particular field. Please find the snap of error log below-

17/05/28 00:48:25 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 2, in File "", line 2, in ValueError: could not convert string to float:

1
Have you tried something like productsGroupBy = productsMap.filter(lambda line: yourProductPriceColumn != "Null").groupByKey() before entering the for loop ? (Otherwise, this filter can be also used with productsGroupBy ). Since you have imported string values, with this filtering you'll have a RDD without lines with productPrice "Null", so you will be able to cast from string to float.titiro89
Thanks a lot @titiro89 !! I ran the below code and got the below result : for i in productsMap.filter(lambda line: line.split(",")[4] == "").collect(): print(i) ... 685,31,TaylorMade SLDR Irons - (Steel) 4-PW, AW,,899.99,images.acmesports.sports/…Divyojyoti Sinha
Here, product_price has been shifted to 5th index instead of 4th index. So, such records should be skipped!!Divyojyoti Sinha

1 Answers

2
votes

We can filter the products containing null values as below

Quoting - isNotNull()

//filter the products containing null
filteredProducts = products.filter(products.price.isNotNull())
//create a tuple (CategoryId,record)
filteredProductsMap = filteredProducts.map(lambda rec: (rec.split(",")[1], rec))
//group by categoryId
productsGroupBy = filteredProductsMap.groupByKey()
//Sort based on product price
for i in productsGroupBy.map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]))).collect(): print(i)