1
votes

I am trying to process JSON column from PostgreSQL database. I am able to connect to database using:

import os
import pyspark
import findspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
findspark.init(os.environ['SPARK_HOME'])

# DB credentials
user = os.environ['EVENTS_DEV_UID']
password = os.environ['EVENTS_DEV_PWD']
host = os.environ['EVENTS_DEV_HOST']
port = os.environ['EVENTS_DEV_PORT']
db = os.environ['EVENTS_DEV_DBNAME']

# Initiate spark session
sc = SparkContext()
spark = SQLContext(sc)

# Set properties
properties = {"user": user, "password": password, "driver": "org.postgresql.Driver"}

# Load data
df = spark.read.jdbc(
    url = 'jdbc:postgresql://' + host + ':' + port + '/' + db,
    table = 'events',
    properties = properties)

The problem starts with casting JSON field. Spark doesn't recognize struct format of params. When I print schema:

df.printSchema()

root
|-- time: timestamp (nullable = true)
|-- name: string (nullable = true)
|-- params: string (nullable = true)

When I try to cast string to struct:

df = df.withColumn('params', df.params.cast('struct'))

I am getting following error:

ParseException: '\nDataType struct is not supported.(line 1, pos 0)\n\n== SQL ==\nstruct\n^^^\n'

I guess problem is the escape characters. Anybody has idea how to proceed?

1

1 Answers

2
votes

"struct" is not a valid casting type. You can define your own UDF using python's json.loads function. Let's start with a sample data frame:

df = sc.parallelize([[1, "a", "{\"a\":1, \"b\":2}"], [2, "b", "{\"a\":3, \"b\":4}"]])\
    .toDF(["col1", "col2", "json_col"])
df.show()

    +----+----+--------------+
    |col1|col2|      json_col|
    +----+----+--------------+
    |   1|   a|{"a":1, "b":2}|
    |   2|   b|{"a":3, "b":4}|
    +----+----+--------------+

Then the output StructType would have schema:

from pyspark.sql.types import IntegerType, StructField, StructType
schema = StructType([StructField("a", IntegerType()), StructField("b", IntegerType())])

You cannot cast StringType to StructType, hence the UDF:

import pyspark.sql.functions as psf
import json
json_load = psf.udf(json.loads, schema)

Now we can process json_col:

df_parsed = df.withColumn("parsed_json", json_load("json_col"))
df_parsed.show()
df_parsed.printSchema()

    +----+----+--------------+-----------+
    |col1|col2|      json_col|parsed_json|
    +----+----+--------------+-----------+
    |   1|   a|{"a":1, "b":2}|      [1,2]|
    |   2|   b|{"a":3, "b":4}|      [3,4]|
    +----+----+--------------+-----------+

    root
     |-- col1: long (nullable = true)
     |-- col2: string (nullable = true)
     |-- json_col: string (nullable = true)
     |-- parsed_json: struct (nullable = true)
     |    |-- a: integer (nullable = true)
     |    |-- b: integer (nullable = true)

You can also try passing the schema directly when loading the data frame.