I am facing the below error.while adding the neo4j sink connector in confluent platform.
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
please check the below configuration part:
curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '{
"topics": "pls.bookneo",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "*****"
}'
Updated changes as follows : curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-book/config -H "Content-Type: application/json" -d '{
"topics": "pulseTest.bookneo",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"true",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pulse",
"neo4j.encryption.enabled": false,
"neo4j.topic.cdc.schema": "pulseTest.bookneo"
}'
now facing the issue :
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pulseTest.bookneo to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Now,My connector is running properly but the node is created blank in neo4j.
curl -X PUT http://localhost:8083/connectors/Neo4j-Sink-Connect-projectp/config -H "Content-Type: application/json" -d '{
"topics": "projectsp",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"errors.tolerance": "all",
"neo4j.server.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pulse",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.projectsp": " MERGE (p:projects{projectname:coalesce(event.projectname,0),count:coalesce(event.count,0)}) "
}'
this is my cypher query : " MERGE (p:projects{projectname:coalesce(event.projectname,0),count:coalesce(event.count,0)}) "