0
votes

Folks, I'm new to all this data streaming process but I was able to build and submit a Flink job that will read some CSV data from Kafka and aggregate it then put it in Elasticsearch.

I was able to do the first two parts, and print out my aggregation to STDOUT. But when I added the code to put it to Elasticsearch, it seems nothing is happening there (no data being added). I looked at the Flink job manager log and it looks fine (no errors) and says:

2020-03-03 16:18:03,877 INFO 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge
- Created Elasticsearch RestHighLevelClient connected to [http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200]

Here is my code at this point:

/*
 * This Scala source file was generated by the Gradle 'init' task.
 */
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}

object Demo {
  /**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
  class TransfersMapper extends RichMapFunction[String, Transfers] {
    private var formatter = null

    @throws[Exception]
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      //formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
    }

    @throws[Exception]
    override def map(csvLine: String): Transfers = {
      //var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
      var splitCsv = csvLine.stripLineEnd.split(",")

      val arrLength = splitCsv.length
      val i = 0
      if (arrLength != 13) {
        for (i <- arrLength + 1 to 13) {
          if (i == 13) {
            splitCsv = splitCsv :+ "0.0"
          } else {
            splitCsv = splitCsv :+ ""
          }
        }
      }
      var trans = new Transfers()
      trans.rowId = splitCsv(0)
      trans.subjectId = splitCsv(1)
      trans.hadmId = splitCsv(2)
      trans.icuStayId = splitCsv(3)
      trans.dbSource = splitCsv(4)
      trans.eventType = splitCsv(5)
      trans.prev_careUnit = splitCsv(6)
      trans.curr_careUnit = splitCsv(7)
      trans.prev_wardId = splitCsv(8)
      trans.curr_wardId = splitCsv(9)
      trans.inTime = splitCsv(10)
      trans.outTime = splitCsv(11)
      trans.los = splitCsv(12).toDouble

      return trans
    }
  }

  def main(args: Array[String]) {
    // Create streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // Set properties per KafkaConsumer API
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty("group.id", "test")

    // Add Kafka source to environment
    val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
    // Read from beginning of topic
    myKConsumer.setStartFromEarliest()

    val streamSource = env
      .addSource(myKConsumer)

    // Transform CSV (with a header row per Kafka event into a Transfers object
    val streamTransfers = streamSource.map(new TransfersMapper())

    // create a TableEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    println("***** NEW EXECUTION STARTED AT " + LocalDateTime.now() + " *****")

    // register a Table
    val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView("transfers", tblTransfers)

    tEnv.connect(
      new Elasticsearch()
        .version("7")
        .host("elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
        .index("transfers-sum")
        .documentType("_doc")
        .keyNullLiteral("n/a")
    )
      .withFormat(new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(new Schema()
        .field("curr_careUnit", DataTypes.STRING())
        .field("sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable("transfersSum")

    val result = tEnv.sqlQuery(
      """
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |""".stripMargin)

    result.insertInto("transfersSum")

    // Elasticsearch elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200

    env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

I'm not sure how I can debug this beast... Wondering if somebody can help me figure out why the Flink job is not adding data to Elasticsearch :( From my Flink cluster, I'm able to query Elasticsearch just fine (manually) and add records to my index:

curl -XPOST "http://elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local:9200/transfers-sum/_doc" -H 'Content-Type: application/json' -d'{"curr_careUnit":"TEST123","sum":"123"}'
2

2 Answers

2
votes

A kind soul in the Flink mailist pointed out the fact that it could be Elasticsearch buffering my records... Well, it was. ;)

I have added the following options to the Elasticsearch connector:

.bulkFlushMaxActions(2)
.bulkFlushInterval(1000L)
0
votes

Flink Elasticsearch Connector 7 using Scala

Please find a working and detailed answer which I have provided here.