0
votes

I am trying to implement a customized Akka Sink, but I could not find a way to handle future inside it.

class EventSink(...) {

  val in: Inlet[EventEnvelope2] = Inlet("EventSink")
  override val shape: SinkShape[EventEnvelope2] = SinkShape(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {

      // This requests one element at the Sink startup.
      override def preStart(): Unit = pull(in)

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val future = handle(grab(in))
          Await.ready(future, Duration.Inf)
          /*
          future.onComplete {
            case Success(_) =>
              logger.info("pulling next events")
              pull(in)
            case Failure(failure) =>
              logger.error(failure.getMessage, failure)
              throw failure
          }*/
          pull(in)
        }
      })
    }
  }

  private def handle(envelope: EventEnvelope2): Future[Unit] = {
    val EventEnvelope2(query.Sequence(offset), _/*persistenceId*/, _/*sequenceNr*/, event) = envelope
    ...
    db.run(statements.transactionally)
  }
}

I have to go with blocking future at the moment, which does not look good. The non-blocking one I commented out only works for the first event. Could anyone please help?


Updated Thanks @ViktorKlang. It seems to be working now.

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
{
    new GraphStageLogic(shape) {
      val callback = getAsyncCallback[Try[Unit]] {
        case Success(_) =>
          //completeStage()
          pull(in)
        case Failure(error) =>
          failStage(error)
      }

      // This requests one element at the Sink startup.
      override def preStart(): Unit = {
        pull(in)
      }

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val future = handle(grab(in))
          future.onComplete { result =>
            callback.invoke(result)
          }
        }
      })
    }
  }

I am trying to implement a Rational DB event sink connnecting to ReadJournal.eventsByTag. So this is a continuous stream, which will never end unless there is an error - This is what I want. Is my approach correct?

Two more questions:

  1. Will the GraphStage never end unless I manually invoke completeStage or failStage?

  2. Am I right or normal to declare callback outside preStart method? and Am I right to invoke pull(in) in preStart in this case?

Thanks, Cheng

1
Thanks @ViktorKlang, I already read that before. Did not find anything helpful.Cheng
Why was the segment I linked to not useful? Did you try it?Viktor Klang
Thanks a lot @ViktorKlang. It was my fault. It is useful. I updated with some new questions. Could you please assist with them as well? Appreciate your help.Cheng
What does your tests show?Viktor Klang

1 Answers

1
votes

Avoid Custom Stages

In general, you should try to exhaust all possibilities with the given methods of the library's Source, Flow, and Sink. Custom stages are almost never necessary and make your code difficult to maintain.

Writing Your "Custom" Stage Using Standard Methods

Based on the details of your question's example code I don't see any reason why you would use a custom Sink to begin with.

Given your handle method, you could slightly modify it to do the logging that you specified in the question:

val loggedHandle : (EventEnvelope2) => Future[Unit] =
  handle(_) transform {
    case Success(_)       => {
      logger.info("pulling next events")
      Success(Unit)
    }
    case Failure(failure) => {
      logger.error(failure.getMessage, failure)
      Failure(failure)
    }
  }

Then just use Sink.foreachParallel to handle the envelopes:

val createEventEnvelope2Sink : (Int) => Sink[EventEnvelope2, Future[Done]] = 
  (parallelism) =>
    Sink[EventEnvelope2].foreachParallel(parallelism)(handle _)

Now, even if you want each EventEnvelope2 to be sent to the db in order you can just use 1 for parallelism:

val inOrderDBInsertSink : Sink[EventEnvelope2, Future[Done]] =
  createEventEnvelope2Sink(1)

Also, if the database throws an exception you can still get a hold of it when the foreachParallel completes:

val someEnvelopeSource : Source[EventEnvelope2, _] = ???

someEnvelopeSource
  .to(createEventEnvelope2Sink(1))
  .run()
  .andThen {
    case Failure(throwable) => { /*deal with db exception*/ }
    case Success(_)         => { /*all inserts succeeded*/  }
  }