2
votes

I'm trying to load data from Elasticsearch to Mongo DB. I want to retain the same _id value that is present in elasticsearch while writing to Mongo as well. I'm able to do it, but the _id field is of type String in Elastic search and I would like to push it into Mongo DB after coverting it to Mongo ObjectId datatype.

The data from elasticsearch is loaded into a dataframe. I'm using spark scala for doing the same. Any help to achieve this?

I have tried it this way modifying the dataframe, but it throws up error,

    df("_id") = new ObjectId(df("_id"))

It doesn't work this way.

    val df = spark.read
                  .format("org.elasticsearch.spark.sql")
                  .option("query", esQuery)
                  .option("pushdown", true)
                  .option("scroll.size", Config.ES_SCROLL_SIZE)
                  .load(Config.ES_RESOURCE)
                  .withColumn("_id", $"_metadata".getItem("_id"))
                  .drop("_metadata")

    df("_id") = new ObjectId(df("_id"))

I want to load dataframe into mongo DB with _id field as Mongo ObjectId datatype rather than String datatype.

    Present:  _id : "123456ABCD" 
    Expected: _id : ObjectId(123456ABCD)
1

1 Answers

1
votes

Try this

import org.apache.spark.sql.functions.typedLit
.withColumn("date", typedLit(new ObjectId($"_metadata".getItem("_id"))))