1
votes

I've used spark structured streaming conume kafka messages and save data to redis. By extending the ForeachWriter[org.apache.spark.sql.Row], I used a redis sink to save data. The code runs well but just a little more than 100 datas be saved to redis per second. Is there any better way to speed up the procedure? While code like below would connect and disconnect to redis server every mico batch, any way to just connect once and keep the connections to miniminze the cost of connection which I supposed is the main cause of time consuming? I tried broadcast jedis but neither jedis nor jedispool isserializable so it didn't work.

My sink code is below:

class StreamDataSink extends ForeachWriter[org.apache.spark.sql.Row]{

  var jedis:Jedis = _

  override def open(partitionId:Long,version:Long):Boolean={
    if(null == jedis){
      jedis = FPCRedisUtils.getPool.getResource
    }
    true
  }

  override def process(record: Row): Unit = {

    if(0 == record(3)){
      jedis.select(Constants.REDIS_DATABASE_INDEX)
      if(jedis.exists("counter")){
        jedis.incr("counter")
      }else{
        jedis.set("counter",1.toString)
      }
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    if(null != jedis){
      jedis.close()
      jedis.disconnect()
    }
  }

Any suggestions will be appreciated.

1

1 Answers

0
votes

Don't do jedis.disconnect(). This will actually close the socket, forcing a new connection next time around. Use only jedis.close(), it will return the connection to the pool.

When you call INCR on a non-existing key, it is automatically created, default to zero and then incremented, resulting in a new key with value 1.

This simplifies your if-else to simply jedis.incr("counter").

With this you have:

jedis.select(Constants.REDIS_DATABASE_INDEX)
jedis.incr("counter")

Review if you really need the SELECT. This is per connection and all connections default to DB 0. If all workloads sharing the same jedis pool are using DB 0, there is no need to call select.

If you do need both select and incr, then pipeline them:

Pipeline pipelined = jedis.pipelined()
pipelined.select(Constants.REDIS_DATABASE_INDEX)
pipelined.incr("counter")
pipelined.sync()

This will send the two commands in one network message, further improving your performance.