2
votes

I don't quite understand how i can register a schema to use it in jdbc source or sink connectors and read the data in Spark

this is the avro schema that i would like to use to retrieve records from a MS SQL Database

{
 "type": "record",
 "name": "myrecord",
 "fields": [
   { "name": "int1", "type": "int" },
   { "name": "str1", "type": "string" },
   { "name": "str2", "type": "string" }
 ]
} 

i want to use this schema for this source connector

{"name": "mssql-source",
 "config": {
 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
 "key.converter": "io.confluent.connect.avro.AvroConverter",
 "key.converter.schema.registry.url": "http://localhost:8081",
 "value.converter": "io.confluent.connect.avro.AvroConverter",
 "value.converter.schema.registry.url": "http://localhost:8081",
 "incrementing.column.name": "int1",
 "tasks.max": "1",
 "table.whitelist": "Hello",
 "mode": "incrementing",
 "topic.prefix": "mssql-",
 "name": "mssql-source",
 "connection.url":
 "jdbc:sqlserver://XXX.XXX.X;databaseName=XXX;username=XX;password=XX"
 }

and this is the Spark Consumer that i'm using

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class SparkAvroConsumer {


    private static Injection<GenericRecord, byte[]> recordInjection;

    private static final String USER_SCHEMA = "{"
            + "\"type\":\"record\","
            + "\"name\":\"myrecord\","
            + "\"fields\":["
            + "  { \"name\":\"int1\", \"type\":\"int\" },"
            + "  { \"name\":\"str1\", \"type\":\"string\" },"
            + "  { \"name\":\"str2\", \"type\":\"string\" }"
            + "]}";

    static {
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        recordInjection = GenericAvroCodecs.toBinary(schema);
    }

    public static void main(String[] args) {

        SparkConf conf = new SparkConf()
                .setAppName("kafka-sandbox")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        Set<String> topics = Collections.singleton("mssql-Hello");
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "localhost:9092");
        kafkaParams.put("metadata.broker.list", "localhost:9092");
        kafkaParams.put("schema.registry.url", "http://localhost:8081");
        JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);

        directKafkaStream
                .map(message -> recordInjection.invert(message._2).get())
                .foreachRDD(rdd -> {
                    rdd.foreach(record -> {
                        System.out.println("int1= " + record.get("int1")
                                + ", str1= " + record.get("str1")
                                + ", str2=" + record.get("str2"));
                    });
                });

        ssc.start();
        ssc.awaitTermination();
    }

}
1
You might want to try this: stackoverflow.com/questions/40728056/…Explorer
@Explorer i did, but i still don't understand how i can use it after registering it.Mahmoud Elbably

1 Answers

3
votes

Every schema has a schemaId, when you registers a Schema with Confluent Schema Registry it creates Int Id for it. The ID is appended the messages sent by source system. (Check this link). You can use the CachedSchemaRegistryClient to get the Schema from SchemaRegistry, you can do something like this (Its Scala code):

var schemaRegistry: SchemaRegistryClient = null
val url = "http://schema.registry.url:8181"
schemaRegistry = new CachedSchemaRegistryClient(url, 10)
val schema = schemaRegistry.getByID(schemaId) // consult the Schema Registry if you know the `SchemaId` in advance (you get this while registering your Schema)
//CachedSchemaRegistryClient have getAllSubjects API that will return all the schemas in your registry.
println(schema)

If you want to get the Schema ID from the incoming messages then do something like:

def getSchema(buffer: Array[Byte]): String = { //buffer is your incoming binary Avro message
    val url = "http://schema.registry.url:8181"
    val schemaRegistry = new CachedSchemaRegistryClient(url, 10)
    val bb = ByteBuffer.wrap(buffer)
    bb.get() // consume MAGIC_BYTE
    val schemaId = bb.getInt // consume schemaId //println(schemaId.toString)
    //println(schemaId.toString)
    schemaRegistry.getByID(schemaId) // consult the Schema Registry
}

I hope this helps.