0
votes

I have a Mongo change stream (a pymongo application) that is continuously getting the changes in collections. These change documents as received by the program are sent to Azure Event Hubs. A Spark notebook has to read the documents as they get into Event Hub and do Schema matching (match the fields in the document with spark table columns) with the spark table for that collection. If there are fewer fields in the document than in the table, columns have to be added with Null.

I am reading the events from Event Hub like below.

spark.readStream.format("eventhubs").option(**config).load().

As said in the documentation, the original message is in the 'body' column of the dataframe that I am converting to string. Now I have got the Mongo document as a JSON string in a streaming dataframe. I am facing below issues.

I need to extract the individual fields in the mongo document. This is needed to compare what fields are present in the spark table and what is not in Mongo document. I saw a function called get_json_object(col,path). This essentially returns a string again and I cannot individually select all the columns.

If from_json can be used to convert the JSON string to Struct type, I cannot specify the Schema because we have close to 70 collections (corresponding number of spark tables as well) each sending Mongo docs with fields from 10 to 450.

If I can convert the JSON string in streaming dataframe to a JSON object whose schema can be inferred by the dataframe (something like how read.json can do), I can use SQL * representation to extract the individual columns, do few manipulations & then save the final dataframe to the spark table. Is it possible to do that? What is the mistake I am making?

Note: Stram DF doesn't support collect() method to individually extract the JSON string from underlying rdd and do the necessary column comparisons. Using Spark 2.4 & Python in Azure Databricks environment 4.3.

Below is the sample data I get in my notebook after reading the events from event hub and casting it to string.

{
  "documentKey": "5ab2cbd747f8b2e33e1f5527",
  "collection": "configurations",
  "operationType": "replace",
  "fullDocument": {
    "_id": "5ab2cbd747f8b2e33e1f5527",
    "app": "7NOW",
    "type": "global",
    "version": "1.0",
    "country": "US",
    "created_date": "2018-02-14T18:34:13.376Z",
    "created_by": "Vikram SSS",
    "last_modified_date": "2018-07-01T04:00:00.000Z",
    "last_modified_by": "Vikram Ganta",
    "last_modified_comments": "Added new property in show_banners feature",
    "is_active": true,
    "configurations": [
      {
        "feature": "tip",
        "properties": [
          {
            "id": "tip_mode",
            "name": "Delivery Tip Mode",
            "description": "Tip mode switches the display of tip options between percentage and amount in the customer app",
            "options": [
              "amount",
              "percentage"
            ],
            "default_value": "tip_percentage",
            "current_value": "tip_percentage",
            "mode": "multiple or single"
          },
          {
            "id": "tip_amount",
            "name": "Tip Amounts",
            "description": "List of possible tip amount values",
            "default_value": 0,
            "options": [
              {
                "display": "No Tip",
                "value": 0
              }
            ]
          }
        ]
      }
    ]
  }
}

I would like to separate and take out the full_document in the sample above. When I use get_json_object, I am getting the full_document in another streaming dataframe as JSON string and not as an object. As you can see, there are some array types in full_document which I can explode (documentation says that explode is supported in streaming DF, but havent tried) but there are some objects also (like struct type) which I would like to extract the individual fields. I cannot use the SQL '*' notation because what get_json_object returns is a string and not the object itself.

1
See stackoverflow.com/questions/49369218/… I still have issues using this example but it appears it worked for the poster.zpp
Why "I cannot individually select all the columns."? The path in get_json_object should be exactly what you need, shouldn't it?Jacek Laskowski
got a clue that once I get the full document using get_json_object, there is a new method schema_of_json (in Spark 2,4) to get the schema of the document and use that schema to get the object using from_json. But schema_of_json returns the schema as string always as it doesnt get inside the string to infer the schema. Share how to use that. When I use schema_of_json(df.column) I am getting an error.Bharath Bharath

1 Answers

0
votes

Its convincing that this much varied Schema of the JSON would be better with schema mentioned explicitly. So I took it like, in a streaming environment with very different Schema of the incoming stream, its always better to specify the schema. So I am proceeding with get_json_object and from_json and reading the schema through a file.