1
votes

I have created a build of https://github.com/mongodb/mongo-kafka

But how does this run to connect with my running kafka instance.

Even how stupid this question sound. But there is no documentation seems to be available to make this working with locally running replicaset of mongodb.

All blogs point to using mongo atlas instead.

If you have a good resource, please guide me towards it.

UPDATE 1 --

Used maven plugin - https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect

Placed it in kafka plugins, restarted kafka.

UPDATE 2 -- How to enable mongodb as source for kafka?

https://github.com/mongodb/mongo-kafka/blob/master/config/MongoSourceConnector.properties

file to be used as a configuration for Kafka

bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties

UPDATE 3 - The above method hasn't worked going back to the blog which does not mention what the port 8083 is.

Installed Confluent and confluent-hub, still unsure of the mongo-connector working with kafka.

UPDATE 4 -

Zookeeper, Kafka Server, Kafka connect running

Mongo Kafka Library Files Kafka Connect Avro Connector Library Files

Using below commands my source got working -

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties

And using below configuration for logstash I was able to push data into elasticsearch -

input {
  kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["users","organisations","skills"]
  }
}
output {
  elasticsearch {
        hosts => ["localhost:9200"]
  }
  stdout { codec => rubydebug }
}

So now one MongoSourceConnector.properties keeps a single collection name it reads from, I need to run kafka connect with different property files for each collection.

My Logstash is pushing new data into elasticsearch, instead of updating old data. Plus it is not creating indexes as per the name of the collection. The idea is this should be able to sync with my MongoDB Database perfectly.

FINAL UPDATE - Everything is now working smoothly,

  • Created multiple properties files for kafka connect
  • The latest logstash actually creates index as per the topic name, and updates the indexes accordingly

input {
    kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["users","organisations","skills"]
    }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
        codec =>
        rubydebug {
            metadata => true
        }
    }
}
2
Rather than edit the question, you can post an answer with your solution - OneCricketeer
Ok I will do that. - zion

2 Answers

2
votes

Steps to successfully get MongoDb syncing with Elasticsearch -

  • First deploy the mongodb Replica -
//Make sure no mongo deamon instance is running
//To check all the ports which are listening or open
sudo lsof -i -P -n | grep LISTEN 

//Kill the process Id of mongo instance
sudo kill 775

//Deploy replicaset
mongod --replSet "rs0" --bind_ip localhost --dbpath=/data/db
  • Create config properties For Kafka
//dummycollection.properties <- Filename
name=dummycollection-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=dummydatabase
collection=dummycollection
copy.existing=true
topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
publish.full.document.only=true
pipeline=[]
batch.size=0
collation=
  • Make sure JAR files from below urls are available for your kafka plugins -

Maven Central Repository Search

Kafka Connect Avro Converter

  • Deploy kafka
//Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

//Kaka Server
bin/kafka-server-start.sh config/server.properties

//Kaka Connect
bin/connect-standalone.sh config/connect-standalone.properties config/dummycollection.properties
  • Config Logstash -
// /etc/logstash/conf.d/apache.conf  <- File 
input {
  kafka {
        bootstrap_servers => "localhost:9092"
        decorate_events => true
        topics => ["dummydatabase.dummycollection"]
  }
}
filter {
    json {
        source => "message"
        target => "json_payload"
    }

    json {
        source => "[json_payload][payload]"
        target => "payload"
    }

    mutate {
        add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
        rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
        rename => { "[payload][fullDocument]" => "document"}
        remove_field => ["message","json_payload","payload"]
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{es_index}"
        action => "update"
        doc_as_upsert => true
        document_id => "%{mongo_id}"
    }
    stdout {
      codec =>
        rubydebug {
            metadata => true
        }
    }
}
  • Start ElasticSearch, Kibana and Logstash
sudo systemctl start elasticsearch
sudo systemctl start kibana
sudo systemctl start logstash
  • Test

Open Mongo Compass, and

  • create a collection, Mention those collection in logstash topics and create properties files for Kafka
  • Add data to it
  • Update Data

Review indexes in Elasticsearch

1
votes

Port 8083 is Kafka Connect, which you start with one of the connect-*.sh scripts.

It is standalone from the broker, and properties do not get set from kafka-server-start