1
votes

I am trying to convert a binary file to ascii values and store it in a dataframe. Conversion to ascii is working fine. But when I am trying to convert to Spark Dataframe, I am getting only null values for all fields. Not sure about the missing part.

The returned df supposed to be pandas DF, but its showing as list.

The Binary file contains 2 records of fixed size of 16 bytes. The input values looks like this:

01 01 02 0D FF E3 33 52 14 75 26 58 87 7F FF FF 01 01 02 0D FF E3 33 52 14 75 26 58 87 7F FF FF

Kindly help to resolve. Below is the code and output.

%spark2.pyspark

from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

import binascii
import pandas as pd
import numpy as np
import datetime
from string import printable

recordsize = 16
chunkcount = 100
chunksize = recordsize * chunkcount

sparkSchema = StructType([
    StructField ("Field1", IntegerType(), True),
    StructField ("Field2", StringType(), True),
    StructField ("Field3", StringType(), True),
    StructField ("Field4", StringType(), True)

])

dt = np.dtype([
        ('Field1', 'b'),
        ('Field2', np.void, 4),
        ('Field3', np.void, 3),
        ('Field4', np.void, 8)

    ])

StartTime = datetime.datetime.now()
print ("Start Time: " + str(StartTime))

inputfile = "/user/maria_dev/binfiles1"


def decodeRecord(data):

    x = np.frombuffer (data[1], dtype=dt)

    newx = x.byteswap().newbyteorder()

    df = pd.DataFrame(newx)

    st = set(printable)

    df[['Field2', 'Field3', 'Field4']] = df[['Field2', 'Field3', 'Field4']].applymap(
                                                            lambda x: binascii.hexlify(x).decode('utf-8').rstrip('f'))

    return df


conf = SparkConf().setAppName("BinaryReader").setMaster("local")

sqlContext = SQLContext (sc)

rdd = sc.binaryFiles(inputfile).map(decodeRecord).collect()
print (type(rdd))
print (rdd)

df = sqlContext.createDataFrame(rdd, sparkSchema)
print ("Number of records in DataFrame: " + str(df.count()))

df.show()

Output is below:

    Start Time: 2018-12-12 20:11:55.141848
<type 'list'>
[   Field1  Field2  Field3       Field4
0       1  01020d  e33352  14752658877
1       1  01020d  e33352  14752658877]
Number of records in DataFrame: 1
+------+------+------+------+
|Field1|Field2|Field3|Field4|
+------+------+------+------+
|  null|  null|  null|  null|
+------+------+------+------+
1

1 Answers

0
votes

Your decodeRecord() function returns a pandas dataframe, so the resulting PipelinedRDD contains a single row containing the complete pandas dataframe. So your have to take that first row and convert it to spark dataframe.

This is piece of modified code:

rdd = sc.binaryFiles(inputfile).map(decodeRecord)
panda_df = rdd.first()
print (type(rdd))
print (type(panda_df))

df = sqlContext.createDataFrame(panda_df)
print ("Number of records in DataFrame: " + str(df.count()))

df.show()

Output:

Start Time: 2018-12-15 17:43:21.241421
<class 'pyspark.rdd.PipelinedRDD'>
<class 'pandas.core.frame.DataFrame'>
Number of records in DataFrame: 4
+------+--------+------+----------------+
|Field1|  Field2|Field3|          Field4|
+------+--------+------+----------------+
|    48|31303130|323044|4646453333333532|
|    49|34373532|363538|3837374646464646|
|    48|31303130|323044|4646453333333532|
|    49|34373532|363538|3837374646464646|
+------+--------+------+----------------+

There are other possible improvements to your code, like using rdd.flatMap(), or using the decodeRecord() function directly to get the pandas DF and converting to spark DF without calling rdd.map(). Just some suggestions.