0
votes

I have a very simple actor that just prints the number :-

public class PrintLineActor extends AbstractLoggingActor {

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(Integer.class, i -> {
          System.out.println("Processing: " + i);
          sender().tell(i, self());
        }).build();
  }
}

Now, I have a stream to print the even numbers until I encounter an odd element:-

  @Test
  public void streamsTest() throws Exception {

    ActorSystem system = ActorSystem.create("testSystem");
    ActorRef printActor = system.actorOf(Props.create(PrintLineActor.class));

    Integer[] intArray = new Integer[]{2,4,6,8,9,10,12};
    CompletionStage<List<Integer>> result = Source.from(Arrays.asList(intArray))
        .ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS))
        .takeWhile(i -> i != 9)
        .runWith(Sink.seq(), ActorMaterializer.create(system));

    List<Integer> result1 = result.toCompletableFuture().get();
    System.out.println("Result :- ");
    result1.forEach(System.out::println);
  }

I do NOT expect any element after 9 being processed aka being sent to actor. However, I see the number "10" also being processed by actor (but not 12) as seen in below output

Processing: 2
Processing: 4
Processing: 6
Processing: 8
Processing: 9
Processing: 10 //WHY IS THIS BEING PROCESSED BY ACTOR??

Result :- 
2
4
6
8

Why is 10 being processed by actor? How to stop this?

EDIT:

I have tried debugging by recording timestamp of events, just to see if 10 is being processed before 9 actually completes, but no, 10 is taken after 9 is processed fully. here are the logs :-

Before Ask: 2 in 1596035906509
Processing inside Actor: 2 at 1596035906509
Inside TakeWhile 2 at  in 1596035906509

Before Ask: 4 in 1596035906609
Processing inside Actor: 4 at 1596035906610
Inside TakeWhile 4 at  in 1596035906610

Before Ask: 6 in 1596035906712
Processing inside Actor: 6 at 1596035906712
Inside TakeWhile 6 at  in 1596035906712

Before Ask: 8 in 1596035906814
Processing inside Actor: 8 at 1596035906814
Inside TakeWhile 8 at  in 1596035906815

Before Ask: 9 in 1596035906915
Processing inside Actor: 9 at 1596035906915
Inside TakeWhile 9 at  in 1596035906916

Before Ask: 10 in 1596035907017 //so 10 is taken much after the 9 is processed fully
Processing inside Actor: 10 at 1596035907017

Result :- 
2
4
6
8

Also, if i replace the .ask with a direct .map(print..), then 10 does not get printed. So why this happens when actor.ask is involved is very strange to me.

2

2 Answers

1
votes

Because you ask printActor asynchronously rather than print synchronously. In your exact run:

  • message 9 arrives at PrintActor, prints "Processing: 9"
  • message 10 arrives at PrintActor, prints "Processing: 10"
  • Your Akka stream receives the response message for message 9 from PrintActor, complete the Akka stream, therefore in the result there is neither 9 nor 10.

To solve the exact problem, remove the async ask and print synchronously instead. But not sure if PrintActor is just an analogy, let me know.

0
votes

The akka streams are buffering the values thru the different streams. Please note that 10 was precessed, but it is not part of the result. If you wish, you can configure the buffer size:

.ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS)).buffer(1, OverflowStrategy.backpressure)