1
votes

Data from SampleCSV2.csv is as follows:

AAA|25|IT|50.5
BBB|28|Comp|100.5

Getting issue while creating dataframe with specific datatype in pyspark

from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType, DateType, TimestampType
from pyspark.sql.types import * 

def structSchema(cols):
    field = []
    for c in cols:
        fieldType = c[c.find(":")+1:]
        fieldName = c[:c.find(":")]
        print(fieldType,"P",fieldName)
        if fieldType == "int":
            field.append(StructField(fieldName, IntegerType(), True))
        elif fieldType == "double":
            field.append(StructField(fieldName, DoubleType(), True))
        else :
            field.append(StructField(fieldName, StringType(), True))
    return StructType(field)

conf = SparkConf().setAppName('OutputGenerator')
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
file =r'C:\Users\Desktop\SampleCSV2.csv'

delim = '|'

#This part works well
cols = ['Name:string','Age:string','Dept:string','Salary:string']
rdd = sc.textFile(file).map(lambda x: x.split(delim))
cols = structSchema(cols)
df1 = sqlContext.createDataFrame(rdd, schema=cols)
print(df1.toPandas()) #works fine


#Getting error while executing below part
cols = ['Name:string','Age:int','Dept:string','Salary:double']
rdd = sc.textFile(file).map(lambda x: x.split(delim))
cols = structSchema(cols)
df2 = sqlContext.createDataFrame(rdd, schema=cols)
print(df2.toPandas())  #Getting Error

print("Done...")

Getting error to the line print(df2.toPandas()) Please help me with defining schema and loading csv to dataframe with specific datatype. Schema definition works when all columns are of stringType(), but fails when I define type like Integer, Double.

Any help much appreciated.

1

1 Answers

0
votes

TL;DR Use csv reader:

spark.read.schema(cols).options(delimiter="|", header="false").csv(file)

Your code won't work because schema doesn't match the data. You have to case values to declared types:

sc.textFile(file).map(lambda x: x.split(delim)).map(lambda x: (
    (x[0], int(x[1]), x[2], float(x[3]))
))