1
votes

In Spark structured Streaming I want to create a StructType from STRING.

In the below example, spark read method accepts only "Struct Type" for schema, how can I create a StructType from String. I want to convert employeeSchema String to StructType.

public static void main(String[] args) throws AnalysisException {
    String master = "local[*]";

    SparkSession sparkSession = SparkSession
            .builder().appName(EmployeeSchemaLoader.class.getName())
            .master(master).getOrCreate();

    String employeeSchema = "StructType(\n" +
            "StructField(firstName,StringType,true),\n" +
            "StructField(lastName,StringType,true),\n" +
            "StructField(addresses,\n" +
            "ArrayType(\n" +
            "StructType(\n" +
            "StructField(city,StringType,true), \n" +
            "StructField(state,StringType,true)\n" +
            "),\n" +
            "true),\n" +
            "true) \n" +
            ")";

    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();
    Dataset<Row> employeeDataset = sparkSession.read()
            //.schema(employeeSchema)  // Accepts only Struct Type
            .json("simple_employees.json");

    employeeDataset.printSchema();
    employeeDataset.createOrReplaceTempView("employeeView");

    sparkSession.catalog().listTables().show();

    sqlCtx.sql("select * from employeeView").show();
2

2 Answers

0
votes

I'm not sure why you would want to do this. Instead of making employeeSchema a String, why not make it a StructType? Like this:

StructType employeeSchema = StructType(
    StructField(firstName,StringType,true),
    StructField(lastName,StringType,true),
    StructField(addresses, ArrayType(StructType(
            StructField(city,StringType,true), 
            StructField(state,StringType,true)
    ), true), true) 
0
votes
from pyspark.sql.types import StructType

schema = inputdf.schema
print(type(inputdf.schema))

# just to display all methods available on schema
print(dir(schema))

new_schema = StructType.fromJson(schema.jsonValue())

print(type(new_schema))