10
votes

I'm new to PySpark, Below is my JSON file format from kafka.

{
        "header": {
        "platform":"atm",
        "version":"2.0"
       }
        "details":[
       {
        "abc":"3",
        "def":"4"
       },
       {
        "abc":"5",
        "def":"6"
       },
       {
        "abc":"7",
        "def":"8"
       }    
      ]
    }

how can I read through the values of all "abc" "def" in details and add this is to a new list like this [(1,2),(3,4),(5,6),(7,8)]. The new list will be used to create a spark data frame. how can i do this in pyspark.I tried the below code.

parsed = messages.map(lambda (k,v): json.loads(v))
list = []
summed = parsed.map(lambda detail:list.append((String(['mcc']), String(['mid']), String(['dsrc']))))
output = summed.collect()
print output

It produces the error 'too many values to unpack'

Error message below at statement summed.collect()

16/09/12 12:46:10 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 16/09/12 12:46:10 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 16/09/12 12:46:10 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 16/09/12 12:46:10 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 1, in ValueError: too many values to unpack

4
what are the results of: messages.take(3) ,parsed = messages.map(lambda (k,v): json.loads(v)) , parsed.take(3) ?Yaron
in this parsed = messages.map(lambda (k,v): json.loads(v)) statement i am getting error " too many values to unpack" because "details" has a list of json stringanusha
please provide the result of "messages.take(3)", please also provide the error you receive (you can edit your question and add this info)Yaron
Hi Yaron, please find the result of messages.take(3) below: >>> messages.take(3) [u'{', u' "header": {', u' "platform":"atm",'] I have updated the error message in the question. Thanks for your help!anusha
The List is not part of the RDD. The code is fundamentally wrong. You should take the correct entries from the details json-object, process them, and collect them to the driver in the very end.Fokko Driesprong

4 Answers

8
votes

First of all, the json is invalid. After the header a , is missing.

That being said, lets take this json:

{"header":{"platform":"atm","version":"2.0"},"details":[{"abc":"3","def":"4"},{"abc":"5","def":"6"},{"abc":"7","def":"8"}]}

This can be processed by:

>>> df = sqlContext.jsonFile('test.json')
>>> df.first()
Row(details=[Row(abc='3', def='4'), Row(abc='5', def='6'), Row(abc='7', def='8')], header=Row(platform='atm', version='2.0'))

>>> df = df.flatMap(lambda row: row['details'])
PythonRDD[38] at RDD at PythonRDD.scala:43

>>> df.collect()
[Row(abc='3', def='4'), Row(abc='5', def='6'), Row(abc='7', def='8')]

>>> df.map(lambda entry: (int(entry['abc']),     int(entry['def']))).collect()
[(3, 4), (5, 6), (7, 8)]

Hope this helps!

6
votes
import pyspark
from pyspark import SparkConf

# You can configure the SparkContext

conf = SparkConf()
conf.set('spark.local.dir', '/remote/data/match/spark')
conf.set('spark.sql.shuffle.partitions', '2100')
SparkContext.setSystemProperty('spark.executor.memory', '10g')
SparkContext.setSystemProperty('spark.driver.memory', '10g')
sc = SparkContext(appName='mm_exp', conf=conf)
sqlContext = pyspark.SQLContext(sc)

data = sqlContext.read.json(file.json)

I feel that he missed an important part of the read sequence. You have to initialize a SparkContext.

When you start a SparkContext, it also spins up a webUI on port 4040. The webUI can be accessed using http://localhost:4040. That is a useful place to check progress of all calculations.

4
votes

try this with latest spark version.

df = spark.read.json('test.json')
0
votes

According to the info in the comments, each row in messages RDD holds one line from the json file

 u'{', 
 u' "header": {', 
 u' "platform":"atm",'

Your code is failing in the following line:

parsed = messages.map(lambda (k,v): json.loads(v))

Your code takes line like: '{' and try to convert it into key,value, and execute json.loads(value)

it is clear that python/spark won't be able to divide one char '{' into key-value pair.

The json.loads() command should be executed on a complete json data-object

This specific task might be accomplished easier with pure python