0
votes

I wrote a kafka-producer code to produce avro data, but it shows the following error in serializing the data:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: java.net.UnknownHostException: sandbox-hdf.hortonworks.com atjava.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at sun.net.NetworkClient.doConnect(NetworkClient.java:180) at sun.net.www.http.HttpClient.openServer(HttpClient.java:463) at sun.net.www.http.HttpClient.openServer(HttpClient.java:558) at sun.net.www.http.HttpClient.(HttpClient.java:242) at sun.net.www.http.HttpClient.New(HttpClient.java:339) at sun.net.www.http.HttpClient.New(HttpClient.java:357) at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220) at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156) at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050) at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)......

Following is my producer code:

package com.perfaware.kafka01;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.example.Customer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class producerAvro {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Properties properties = new Properties();
        // setting producer properties

        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("acks", "1");
        properties.setProperty("retries", "10");

        // Serialization(avro part)
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        properties.setProperty("schema.registry.url", "http://sandbox-hdf.hortonworks.com:7788/api/v1");

        Producer<String, Customer> producer = new KafkaProducer<String, Customer>(properties);

        String topic = "topic1";

        Customer customer = Customer.newBuilder()
                .setAge(21)
                .setAutomatedEmail(false)
                .setFirstName("Manish")
                .setLastName("B.")
                .setHeight(176f)
                .setWeight(62f)
                .build();

        ProducerRecord<String, Customer> producerRecord = new ProducerRecord<String, Customer>("topic1", customer);

        System.out.println(customer);
        producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata.toString());
                } else {
                    exception.printStackTrace();
                }
            }
        }).get();

        producer.flush();
        producer.close();
    }
}

I am also attaching my pom.xml file if that helps:

 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>Kafka_Avro</groupId>
  <artifactId>Kafka_Avro_Practise</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <properties>
    <avro.verion>1.7.4</avro.verion>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <confluent.version>3.1.1</confluent.version>
  </properties>

 <repositories>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven/</url>
        </repository>
 </repositories>

  <dependencies>


  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-tools -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-tools</artifactId>
    <version>2.0.0</version>
</dependency>



  <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>3.1.1</version>
        </dependency>

  <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-compiler -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-compiler</artifactId>
    <version>1.8.2</version>
</dependency>

<dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.3.1</version>
            <scope>provided</scope>
        </dependency>

<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-compiler-plugin -->
<dependency>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-mapred -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-mapred</artifactId>
    <version>1.8.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.avro/avro-ipc -->
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro-ipc</artifactId>
    <version>1.8.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/commons-codec/commons-codec -->
<dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.11</version>
</dependency>

  </dependencies>

   <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

I also tried changing the value serializer:

com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer

but that didn't solve the issue.

1

1 Answers

0
votes

UnknownHostException: sandbox-hdf.hortonworks.com

If you're using the sandbox, you should have edited your /etc/hosts file to make this a known host

You'll definitely want to use the Hortonworks serializer, though, if using their registry. It's not clear what error you got when using it, but if the same, it's a networking problem, nothing to do with Avro.

"value.serializer","com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer"

Additionally, bootstrap.servers would probably need to resolve to the Kafka instances of the sandbox as well, not just localhost

If you did want to use the Confluent one, while I'm not sure if it'll work, you will need to use consistent Kafka version numbers: e.g. You've put Kafka 1.1.1, 2.0, and Confluent 3.1.1, which is based around Kafka 0.10.x.
Similarly with Avro - everything should be set to only 1.8.1, for example, though you don't need the IPC or Mapred Avro libraries for your code to work. Probably not the compiler either.