We had a use case that lead me to write this and I am sure many of you would have faced this situation. The situation was to migrate multiple collections from MongoDB into Snowflake Database through a single Talend job and retain the top level nodes of the collection as an individual field in Snowflake table.
Now as we know Talend does not support dynamic schema for MongoDB sources because of MongoDB collections do not enforce a schema , this means that we have to create separate jobs/sub-jobs for each existing/new collection that we would like to ingest also we have to redesign the jobs for future alterations in the documents while ensuring it will work all the time, thus we have to look into alternative solution.
Here is the approach ,
Step One : Get all the top level keys and their types from MongoDB collection . We have used aggregation with $objectToArrray to convert all top key and value pairs into document arrays followed by $unwind and $group with $addToSet to get distinct keys and value types across entire collection.
{
"_id" : "1",
"keys" : [
"field1~string",
"field2~object",
"filed3~date",
"_id~objectId"
]
}
Step Two : Create a one to one map between Mongo Datatype and Snowflake Datatype. We have created a hash map called as "dataTypes" to store this information . Alternatively this information can be stored in a table or in a file etc.
java.util.Map<String,String> dataTypes = new java.util.HashMap<String,String>();
dataTypes.put("string","VARCHAR");
dataTypes.put("int","NUMBER");
dataTypes.put("objectId","VARCHAR");
dataTypes.put("object","VARIANT");
dataTypes.put("date","TIMESTAMP_LTZ");
dataTypes.put("array","VARCHAR");
dataTypes.put("bool","BOOLEAN");
Step Three : Compare the keys against the Snowflake : First we query the snowflake INFORMATION_SCHEMA if the table exists or not , if it does not exist we create the table ,if it exists then we check for change in fields in the documents and add or modify those columns in snowflake table. The DDL script is generated by using the "Datatype Mapping" in the step two and iterating over the keys in the Step One
Step Four : Unload data from MongoDB to the local filesystem using the mongoexport command:
mongoexport --db <databaseName> --collection <collectionName> --type=csv --fields=<fieldList> --out <filename>
the is prepared from the keys in Step One.
Step Five : Stage the .csv file from local filesystem to snowflake staging location using the PUT command using Snowsql.
snowsql -d <database> -s <schema> -o exit_on_error=true -o log_level=DEBUG -q 'put <fileName> @<internalStage> OVERWRITE=TRUE';
Step Six : Load the data from staging location to snowflake table
COPY INTO <tableName> FROM @<internalStage>
[file_format=<fileFormat>] [pattern=<regex_pattern>]
Specifying the file_format and pattern are optional here, we have used a regular expression as we are staging multiple files for each collection in one snowflake stage.
Step Seven : Maintain a list of collections , the list can be placed in a file in local filesystem or in a database table and in Talend job iterate over the list of collections and process each collection through above steps by parametrizing the collection names, table names , file names and staging names etc in the job.