2
votes

I want to produce a message into kafka topic. That message should have this pattern:

   {"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}}

I know that is a json pattern, so how can i convert that json in String?

I use a maven project, so which dependencies are needed to use

 String stringData = JSON.stringify({"targetFileInfo":{"path":"2018-05-07-10/row01-small-01.txt.ready"}});

So I think it is better don't convert Json to string and send indeed that massage into kafka topic.

My Code is like that, it can send a String but i don't know how i can modify my code to send the massage above. maybe you can help me.

 Producer<String, String> producer = null;

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);
    String msg = "welcome";
    producer.send(new ProducerRecord<String, String>("event", msg));

    producer.close();
2
stringify is not displayed to select that. - guguli
It looks like your json is already string, no need to stringify again. - Pratapi Hemant Patel
so can you give me an example how I can produce that message in kafka? producer.send(new ProducerRecord<String, JsonNode>("event-orsted-v1", jsonNode)); - guguli
Is it com.fasterxml.jackson.databind.JsonNode, or from another package? - Pratapi Hemant Patel

2 Answers

3
votes

That solved my problem:

 Producer<String, String> producer = null;

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);

    try {
        producer = new KafkaProducer<String, String>(props);
    } catch (Exception e) {
        e.printStackTrace();
    }
    blobStorageChecker = new BlobStorageChecker();
    String folder = blobStorageChecker.getCurrentDateUTC();
    String msg = "{\"targetFileInfo\":{\"path\":\"test/"+folder+"row01-small.txt\"},\"sourceFileInfo\":{\"lastModifiedTime\":1525437960000,\"file\":\"/row01-small-01.txt\",\"filename\":\"/data/row01/row01-small.txt\",\"size\":19728,\"remoteUri\":\"ftp://waws-prod-am2-191.ftp.net/data/orsted-real/inbound/row01\",\"contentEncoding\":\"\",\"contentType\":\"\"}}";
    ProducerRecord<String, String> record = new ProducerRecord<String, String>("event-orsted-v1", null, msg);
    if (producer != null) {
        try {
            Future<RecordMetadata> future = producer.send(record);
            RecordMetadata metadata = future.get();
        } catch (Exception e) {
            System.err.println(e.getMessage());
            e.printStackTrace();
        }
    }
    producer.close();
2
votes

As per the comment you need to send JsonNode as message on kafka. Write a custom Serializer / Deserializer for the same.

import java.io.IOException;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonNodeSerDes implements Serializer<JsonNode>, Deserializer<JsonNode> {

    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public byte[] serialize(String topic, JsonNode data) {

        try {
            return mapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            return new byte[0];
        }
    }

    @Override
    public JsonNode deserialize(String topic, byte[] data) {

        try {
            return mapper.readValue(data, JsonNode.class);
        } catch (IOException e) {
            return null;
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public void close() {
    }
}

I wrote serializer / deserializer in the same class. You can separate them in two class (one implementing Serializer, another implementing Deserializer).

While creating KafkaProducer you need to provide "value.serializer" config and "value.deserializer" config for KafkaConsumer.

External Dependencies used:

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.8.8</version>
</dependency>