I wanted to sort the Products by Product price. Below are the lines of code-
products = sc.textFile("/user/cloudera/sqoop_import/products")
Getting product category ID from 2nd column
productsMap = products.map(lambda rec: (rec.split(",")[1], rec))
Group by category id
productsGroupBy = productsMap.groupByKey()
To sort by product price by typecasting Product price from string to float:
for i in productsGroupBy.map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]))).collect(): print(i)
I am not able to typecast few values for the product price which are having Null values. So, is there any way to remove the records having Null values for this particular field. Please find the snap of error log below-
17/05/28 00:48:25 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 2, in File "", line 2, in ValueError: could not convert string to float:
productsGroupBy = productsMap.filter(lambda line: yourProductPriceColumn != "Null").groupByKey()
before entering the for loop ? (Otherwise, this filter can be also used with productsGroupBy ). Since you have imported string values, with this filtering you'll have a RDD without lines with productPrice "Null", so you will be able to cast from string to float. – titiro89for i in productsMap.filter(lambda line: line.split(",")[4] == "").collect(): print(i)
... 685,31,TaylorMade SLDR Irons - (Steel) 4-PW, AW,,899.99,images.acmesports.sports/… – Divyojyoti Sinha