0
votes

I have a Kafka topic in which I have received around 500k events.

Currently, I need to insert those events into a Hive table. Since events are time-driven, I decided to use the following strategy:

1) Define a route inside HDFS, which I call users. Inside of this route, there will be several Parquet files, each one corresponding to a certain date. E.g.: 20180412, 20180413, 20180414, etc. (Format YYYYMMDD). 2) Create a Hive table and use the date in the format YYYYMMDD as a partition. The idea is to use each of the files inside the users HDFS directory as a partition of the table, by simply adding the corresponding parquet file through the command:

ALTER TABLE users DROP IF EXISTS PARTITION 
(fecha='20180412') ;
ALTER TABLE users ADD PARTITION
(fecha='20180412') LOCATION '/users/20180412';

3) Read the data from the Kafka topic by iterating from the earliest event, get the date value in the event (inside the parameter dateClient), and given that date value, insert the value into the corresponding Parque File. 4) In order to accomplish the point 3, I read each event and saved it inside a temporary HDFS file, from which I used Spark to read the file. After that, I used Spark to convert the temporary file contents into a Data Frame. 5) Using Spark, I managed to insert the DataFrame values into the Parquet File.

The code follows this approach:

val conf = ConfigFactory.parseResources("properties.conf")
val brokersip = conf.getString("enrichment.brokers.value")
val topics_in = conf.getString("enrichment.topics_in.value")
val spark = SparkSession
    .builder()
    .master("yarn")
    .appName("ParaTiUserXY")
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._

val properties = new Properties
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
properties.put("bootstrap.servers", brokersip)
properties.put("auto.offset.reset", "earliest")
properties.put("group.id", "UserXYZ2")

//Schema para transformar los valores del topico de Kafka a JSON
val my_schema = new StructType()
    .add("longitudCliente", StringType)
    .add("latitudCliente", StringType)
    .add("dni", StringType)
    .add("alias", StringType)
    .add("segmentoCliente", StringType)
    .add("timestampCliente", StringType)
    .add("dateCliente", StringType)
    .add("timeCliente", StringType)
    .add("tokenCliente", StringType)
    .add("telefonoCliente", StringType)

val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe( util.Collections.singletonList("geoevents") )

val fs = {
    val conf = new Configuration()
    FileSystem.get(conf)
}

val temp_path:Path = new Path("hdfs:///tmp/tmpstgtopics")
    if( fs.exists(temp_path)){
        fs.delete(temp_path, true)
}

while(true)
{
    val records=consumer.poll(100)
    for (record<-records.asScala){
        val data = record.value.toString
        val dataos: FSDataOutputStream = fs.create(temp_path)
        val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))
        bw.append(data)
        bw.close
        val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/tmpstgtopics")
        val fechaCliente = data_schema.select("dateCliente").first.getString(0)

        if( fechaCliente < date){
            data_schema.select("longitudCliente", "latitudCliente","dni", "alias", 
            "segmentoCliente", "timestampCliente", "dateCliente", "timeCliente", 
            "tokenCliente", "telefonoCliente").coalesce(1).write.mode(SaveMode.Append)
            .parquet("/desa/landing/parati/xyusers/" + fechaCliente)
          }
          else{
              break
          }
        }
    }

  consumer.close()

However, this method is taking around 1 second to process each record in my cluster. So far, it would mean I will take around 6 days to process all the events I have.

Is this the optimal way to insert the whole amount of events inside a Kafka topic into a Hive table?

What other alternatives exist or which upgrades could I do to my code in order to speed it up?

2
Kafka Connect has a HDFS connector with Hive integrationOneCricketeer

2 Answers

2
votes

Other than the fact that you're not using Spark Streaming correctly to poll from Kafka (you wrote a vanilla Scala Kafka consumer with a while loop) and coalesce(1) will always be a bottleneck as it forces one executor to collect the records, I'll just say you're really reinventing the wheel here.

What other alternatives exist

That I known of and are all open source

  • Gobblin (replaces Camus) by LinkedIn
  • Kafka Connect w/ HDFS Sink Connector (built into Confluent Platform, but also builds from source on Github)
  • Streamsets
  • Apache NiFi
  • Secor by Pinterest

From those listed, it would be beneficial for you to have JSON or Avro encoded Kafka messages, and not a flat string. That way, you can drop the files as is into a Hive serde, and not parse them while consuming them. If you cannot edit the producer code, make a separate Kafka Streams job taking the raw string data, parse it, then write to a new topic of Avro or JSON.

If you choose Avro (which you really should for Hive support), you can use the Confluent Schema Registry. Or if you're running Hortonworks, they offer a similar Registry.

HIve on Avro operates far better than text or JSON. Avro can easily be transformed into Parquet, and I believe each of the above options offers at least Parquet support while the others also can do ORC (Kafka Connect doesn't do ORC at this time).

Each of the above also support some level of automatic Hive partition generation based on the Kafka record time.

0
votes

You can improve the parallelism by increasing the partitions of the kafka topic and having one or more consumer groups with multiple consumers consuming one-to-one with each partition.

As, cricket_007 mentioned you can use one of the opensource frameworks or you can have more consumer groups consuming the same topic to off-load the data.