3
votes

We had a use case that lead me to write this and I am sure many of you would have faced this situation. The situation was to migrate multiple collections from MongoDB into Snowflake Database through a single Talend job and retain the top level nodes of the collection as an individual field in Snowflake table.

Now as we know Talend does not support dynamic schema for MongoDB sources because of MongoDB collections do not enforce a schema , this means that we have to create separate jobs/sub-jobs for each existing/new collection that we would like to ingest also we have to redesign the jobs for future alterations in the documents while ensuring it will work all the time, thus we have to look into alternative solution.

Here is the approach ,

Step One : Get all the top level keys and their types from MongoDB collection . We have used aggregation with $objectToArrray to convert all top key and value pairs into document arrays followed by $unwind and $group with $addToSet to get distinct keys and value types across entire collection.

 {
"_id" : "1",
"keys" : [ 
    "field1~string", 
    "field2~object", 
    "filed3~date",
    "_id~objectId"
 ]

}

Step Two : Create a one to one map between Mongo Datatype and Snowflake Datatype. We have created a hash map called as "dataTypes" to store this information . Alternatively this information can be stored in a table or in a file etc.

 java.util.Map<String,String> dataTypes = new java.util.HashMap<String,String>();
 dataTypes.put("string","VARCHAR");
 dataTypes.put("int","NUMBER");
 dataTypes.put("objectId","VARCHAR");
 dataTypes.put("object","VARIANT");
 dataTypes.put("date","TIMESTAMP_LTZ");
 dataTypes.put("array","VARCHAR");
 dataTypes.put("bool","BOOLEAN");

Step Three : Compare the keys against the Snowflake : First we query the snowflake INFORMATION_SCHEMA if the table exists or not , if it does not exist we create the table ,if it exists then we check for change in fields in the documents and add or modify those columns in snowflake table. The DDL script is generated by using the "Datatype Mapping" in the step two and iterating over the keys in the Step One

Step Four : Unload data from MongoDB to the local filesystem using the mongoexport command:

mongoexport --db <databaseName> --collection <collectionName> --type=csv --fields=<fieldList> --out <filename>

the is prepared from the keys in Step One.

Step Five : Stage the .csv file from local filesystem to snowflake staging location using the PUT command using Snowsql.

snowsql -d <database> -s <schema> -o exit_on_error=true -o log_level=DEBUG  -q  'put <fileName> @<internalStage> OVERWRITE=TRUE';

Step Six : Load the data from staging location to snowflake table

COPY INTO <tableName> FROM @<internalStage> 
[file_format=<fileFormat>] [pattern=<regex_pattern>]

Specifying the file_format and pattern are optional here, we have used a regular expression as we are staging multiple files for each collection in one snowflake stage.

Step Seven : Maintain a list of collections , the list can be placed in a file in local filesystem or in a database table and in Talend job iterate over the list of collections and process each collection through above steps by parametrizing the collection names, table names , file names and staging names etc in the job.

1

1 Answers

1
votes

One solution is to load the records of your Mongodb collection into Snowflake field of variant type. Then, create a Snowflake view to extract the specific keys using Snowflake's dot notation.

Export your data as JSON type.

mongoexport --type=json --out <filename>

Load that export into a table with a structure like the following.

create table collection_name_exports (
  data variant,  -- This column will contain your export
  inserted_at datetime default current_timestamp()
);

Extract the keys into columns of a view as you need.

create view collection_name_view as
select
  collection_name_exports:key1 as field1,
  collection_name_exports:key2 as field2
from collection_name_exports