0
votes

I have an akka stream from a web-socket like akka stream consume web socket and would like to build a reusable graph stage (inlet: the stream, FlowShape: add an additional field to the JSON specifying origin i.e.

{
...,
"origin":"blockchain.info"
}

and an outlet to kafka.

I face the following 3 problems:

  • unable to wrap my head around creating a custom Inlet from the web socket flow
  • unable to integrate kafka directly into the stream (see the code below)
  • not sure if the transformer to add the additional field would be required to deserialize the json to add the origin

The sample Project (flow only) looks like:

import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val incoming: Sink[Message, Future[Done]] =
    Flow[Message].mapAsync(4) {
      case message: TextMessage.Strict =>
        println(message.text)
        Future.successful(Done)
      case message: TextMessage.Streamed =>
        message.textStream.runForeach(println)
      case message: BinaryMessage =>
        message.dataStream.runWith(Sink.ignore)
    }.toMat(Sink.last)(Keep.right)

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")

val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right)

val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv"))

val ((completionPromise, upgradeResponse), closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.both)
      .toMat(incoming)(Keep.both)
      // TODO not working integrating kafka here
      // .map(_.toString)
      //    .map { elem =>
      //      println(s"PlainSinkProducer produce: ${elem}")
      //      new ProducerRecord[Array[Byte], String]("topic1", elem)
      //    }
      //    .runWith(Producer.plainSink(producerSettings))
      .run()

val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      system.terminate
    }
  }

// kafka that works / writes dummy data
val done1 = Source(1 to 100)
    .map(_.toString)
    .map { elem =>
      println(s"PlainSinkProducer produce: ${elem}")
      new ProducerRecord[Array[Byte], String]("topic1", elem)
    }
    .runWith(Producer.plainSink(producerSettings))
1

1 Answers

2
votes

One issue is around the incoming stage, which is modelled as a Sink. where it should be modelled as a Flow. to subsequently feed messages into Kafka.

Because incoming text messages can be Streamed. you can use flatMapMerge combinator as follows to avoid the need to store entire (potentially big) messages in memory:

  val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) {
    case msg: BinaryMessage =>
      msg.dataStream.runWith(Sink.ignore)
      Future.successful(None)
    case TextMessage.Streamed(src) =>
      src.runFold("")(_ + _).map { msg => Some(msg) }
  }.collect {
    case Some(msg) => msg
  }

At this point you got something that produces strings, and can be connected to Kafka:

  val addOrigin: Flow[String, String, NotUsed] = ???

  val ((completionPromise, upgradeResponse), closed) =
    outgoing
      .viaMat(webSocketFlow)(Keep.both)
      .via(incoming)
      .via(addOrigin)
      .map { elem =>
        println(s"PlainSinkProducer produce: ${elem}")
        new ProducerRecord[Array[Byte], String]("topic1", elem)
      }
      .toMat(Producer.plainSink(producerSettings))(Keep.both)
      .run()