I have created a build of https://github.com/mongodb/mongo-kafka
But how does this run to connect with my running kafka instance.
Even how stupid this question sound. But there is no documentation seems to be available to make this working with locally running replicaset
of mongodb
.
All blogs point to using mongo atlas instead.
If you have a good resource, please guide me towards it.
UPDATE 1 --
Used maven plugin - https://search.maven.org/artifact/org.mongodb.kafka/mongo-kafka-connect
Placed it in kafka plugins, restarted kafka.
UPDATE 2 -- How to enable mongodb as source for kafka?
https://github.com/mongodb/mongo-kafka/blob/master/config/MongoSourceConnector.properties
file to be used as a configuration for Kafka
bin/kafka-server-start.sh config/server.properties --override config/MongoSourceConnector.properties
UPDATE 3 - The above method hasn't worked going back to the blog which does not mention what the port 8083 is.
Installed Confluent and confluent-hub, still unsure of the mongo-connector working with kafka.
UPDATE 4 -
Zookeeper, Kafka Server, Kafka connect running
Mongo Kafka Library Files Kafka Connect Avro Connector Library Files
Using below commands my source got working -
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/connect-standalone.sh config/connect-standalone.properties config/MongoSourceConnector.properties
And using below configuration for logstash I was able to push data into elasticsearch -
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["users","organisations","skills"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout { codec => rubydebug }
}
So now one MongoSourceConnector.properties keeps a single collection name it reads from, I need to run kafka connect with different property files for each collection.
My Logstash is pushing new data into elasticsearch, instead of updating old data. Plus it is not creating indexes as per the name of the collection. The idea is this should be able to sync with my MongoDB Database perfectly.
FINAL UPDATE - Everything is now working smoothly,
- Created multiple properties files for kafka connect
- The latest logstash actually creates index as per the topic name, and updates the indexes accordingly
input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["users","organisations","skills"]
}
}
filter {
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}
mutate {
add_field => { "[es_index]" => "%{[@metadata][kafka][topic]}" }
rename => { "[payload][fullDocument][_id][$oid]" => "mongo_id"}
rename => { "[payload][fullDocument]" => "document"}
remove_field => ["message","json_payload","payload"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
action => "update"
doc_as_upsert => true
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}