0
votes

Spark: 1.4.0

I have a flatfile from Amazon S3 which I loaded into HDFS (in the master node of my EC2 Spark cluster). The flatfile is a Hive output. Note: I couldn't change the context as it is already defined. The following codes are used in the pyspark shell:

Each 'row' corresponds to 1 row of data:

row = sc.textFile("/data/file")
row.first()

u'E8B98\x01John\x01Smith\x01Male\x01Gold\x0125''

Then I split each row using flatmap() since for some reason map() doesn't seem to delimit it (using '\x01' as the delimiter):

elements = row.flatMap(lambda x: x.split('\x01'))
elements.take(8)

[u'E8B98', u'John', u'Smith', u'Male', u'Gold', u'25', u'E8B99', u'Alice']

Since I know the data has 6 columns per row, how do I get the data into a dataframe? I'm intend to sort by attribute, sum etc.

I tried the following but it didn't work:

id = row.flatMap(lambda x: x.split('\x01')[0])
id.first()

E

1

1 Answers

1
votes

There is many way to transform an rdd to a dataframe in python :

Considering the following rdd

rdd = sc.parallelize(list(["E8B98\x01John\x01Smith\x01Male\x01Gold\x0125","E8B2\x01Joe\x01Smith\x01Female\x01Gold\x0125"]))
rdd.first()

Output:

'E8B98\x01John\x01Smith\x01Male\x01Gold\x0125'

Let's now create an rdd of tuples :

rdd2 = rdd.map(lambda x : x.split("\x01"))
rdd2.first()

Output:

['E8B98', 'John', 'Smith', 'Male', 'Gold', '25']

We can now create a dataframe with one of the following ways :

Create it directly from the tuples rdd :

sqlContext.createDataFrame(rdd2).collect()

Output:

[Row(_1=u'E8B98', _2=u'John', _3=u'Smith', _4=u'Male', _5=u'Gold', _6=u'25'), Row(_1=u'E8B2', _2=u'Joe', _3=u'Smith', _4=u'Female', _5=u'Gold', _6=u'25')]

or create it with the same rdd specifying the name of the columns :

df = sqlContext.createDataFrame(rdd2, ['id', 'name', 'surname', 'gender', 'description', 'age'])
df.collect()

Output:

[Row(id=u'E8B98', name=u'John', surname=u'Smith', gender=u'Male', description=u'Gold', age=u'25'), Row(id=u'E8B2', name=u'Joe', surname=u'Smith', gender=u'Female', description=u'Gold', age=u'25')]

or create it with the inferred schema :

pyspark.sql.types import *
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("surname", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("description", StringType(), True),
    StructField("age", StringType(), True)])
df2 = sqlContext.createDataFrame(rdd2, schema)
df2.collect()

Output:

[Row(id=u'E8B98', name=u'John', surname=u'Smith', gender=u'Male', description=u'Gold', age=u'25'),Row(id=u'E8B2', name=u'Joe', surname=u'Smith', gender=u'Female', description=u'Gold', age=u'25')]

or yet specifying your row class schema as following :

from pyspark.sql import Row
Person = Row('id', 'name', 'surname', 'gender', 'description', 'age')
person = rdd2.map(lambda r: Person(*r))
df3 = sqlContext.createDataFrame(person)
df3.collect()

Output:

[Row(id=u'E8B98', name=u'John', surname=u'Smith', gender=u'Male', description=u'Gold', age=u'25'), Row(id=u'E8B2', name=u'Joe', surname=u'Smith', gender=u'Female', description=u'Gold', age=u'25')]

I hope this helps!

NB: Spark version >= 1.3.0