1
votes

I have two Flux instances:

  • A flux of strings containing the states of a task that is being executed. For example:
    Flux.just("pending", "running", "running", "successful");
    This flux is finite and will eventually end when the task is successfully executed.
  • Another flux of strings that contains the logs of the task. Of course, the logs are also finite, but for reasons that are not relevant to this question (and are out of my control), the flux is infinite. It will produce a finite number of strings, but never a completed signal:
Flux.<Integer> create(emitter -> { // I'm using integers for illustration purposes.
  for (int i = 50; i < 60; i++) {
    emitter.next(i);
  }
  // Note that there's no emitter.complete() here.
})

I want to block until the task finishes, while processing the logs of the task in the background. After the task's done, I want to cancel the logs flux, so that it doesn't keep occupying resources. If it was finite, I could've easily done this:

public static void main(String[] args) {
    Flux<String> state = Flux.just("pending", "running", "running", "success")
                             .delayElements(Duration.ofSeconds(5)) // Simulate the time it takes for a task to finish.
                             .doOnNext(System.out::println); // Here the status would be sent to another microservice.

    Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
                                                                  // illustration purposes.
        for (int i = 50; i < 60; i++) {
            emitter.next(i);
        }
        // Note that there's no emitter.complete() here.
    })
                                   .delayElements(Duration.ofSeconds(4)) // Simulate the time between each log message.
                                   .bufferTimeout(30, Duration.ofSeconds(15)) // The logs are again sent to another microservice, but we
                                                                              // don't want to make a separate request for each log
                                                                              // line, so we batch them.
                                   .doOnNext(System.out::println);
    Flux.mergeDelayError(1, state.then(), logs.then())
        .blockLast();
}

Unfortunately, it is not... So, my first idea for solving this issue was to subscribe to the logs flux and then terminate the subscription once the states flux is completed:

public static void main(String[] args) {
    Flux<String> state = Flux.just("pending", "running", "running", "success")
                             .delayElements(Duration.ofSeconds(5)) // Simulate the time it takes for a task to finish.
                             .doOnNext(System.out::println); // Here the status would be sent to another microservice.

    Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
                                                                  // illustration purposes.
        for (int i = 50; i < 60; i++) {
            emitter.next(i);
        }
        // Note that there's no emitter.complete() here.
    })
                                   .delayElements(Duration.ofSeconds(4)) // Simulate the between each log message.
                                   .bufferTimeout(30, Duration.ofSeconds(15)) // The logs are again sent to another microservice, but we
                                                                              // don't want to make a separate request for each log
                                                                              // line, so we batch them.
                                   .doOnNext(System.out::println);
    Disposable logsSubscription = logs.subscribe();
    state.doFinally(signalType -> logsSubscription.dispose()).blockLast();
}

This works, but it doesn't play nice with the logs buffering (.bufferTimeout(30, Duration.ofSeconds(15))), and it can sometimes lead to missing task logs. For example, two logs get inside the buffer, but before the buffer limit is reached (or the timeout exceeded) the logs flux is cancelled. As a result, these two logs will never get processed.
My second idea was to somehow use takeUntil(Predicate<? super T> predicate) to decide when to stop taking elements from the logs flux. However, that's problematic, because the predicate is only called whenever there are new elements in the flux. This means that the predicate will never be called if the task completes after the last log has been emitted. I could, however, use takeUntil if I merge the infinite flux of logs with another infinite stream of dummy objects that make sure that takeUntil is periodically triggered:

private static boolean completed;

public static void main(String[] args) {
    Flux<String> state = Flux.just("pending", "running", "running", "success")
                             .delayElements(Duration.ofSeconds(2)) // Simulate the time it takes for a task to finish.
                             .doOnNext(System.out::println) // Here the status would be sent to another microservice.
                             .doFinally(signalType -> {
                                 completed = true;
                             });

    Flux<Integer> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
                                                            // illustration purposes.
        for (int i = 50; i < 60; i++) {
            emitter.next(i);
        }
        // Note that there's no emitter.complete() here.
    })
                             .delayElements(Duration.ofSeconds(4)); // Simulate the time between each log message.

    Flux<Integer> healthCheck = Flux.<Integer> generate(sink -> {
        sink.next(1);
    })
                                    .delayElements(Duration.ofSeconds(3));
    Flux<List<Integer>> logsWithHealthCheck = Flux.merge(logs, healthCheck)
                                                  .takeUntil(i -> completed)
                                                  .filter(i -> i != 1)
                                                  .bufferTimeout(30, Duration.ofSeconds(15))
                                                  .doOnNext(System.out::println)
                                                  .doFinally(System.out::println);
    Flux<Object> mergeDelayError = Flux.mergeDelayError(1, state, logsWithHealthCheck);
    mergeDelayError.then()
                   .block();
}

This works fine, but it seems kind of... hacky. Is there a better way to accomplish what I want?

1

1 Answers

0
votes

I was able to solve my problem by using takeUntilOther and turning the state flux into a hot source:

    public static void main(String[] args) {
        Flux<String> state = Flux.just("pending", "running", "running", "success")
                                 .delayElements(Duration.ofSeconds(10)) // Simulate the time it takes for a task to finish.
                                 .doOnNext(System.out::println) // Here the status would be sent to another microservice.
                                 .replay() // Important
                                 .autoConnect();

        Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> {
            for (int i = 50; i < 60; i++) {
                emitter.next(i);
            }
            // Note that there's no emitter.complete() here.
        })
                                       .delayElements(Duration.ofSeconds(5)) // Simulate the time between each log message.
                                       .takeUntilOther(state.then())
                                       .bufferTimeout(30, Duration.ofSeconds(15))
                                       .doOnNext(System.out::println)
                                       .doFinally(System.out::println);
        Flux<Object> mergeDelayError = Flux.mergeDelayError(1, state, logs);
        mergeDelayError.then()
                       .block();
    }

The hot source part is important, because otherwise the .takeUntilOther(state.then()) and the mergeDelayError.then().block() lines would create two subscriptions on the state flux, which would duplicate the work being done by it.