2
votes

Assume we have a cooperative game where players gain and lose score points. Any player can join the game at any moment. Once joined, player never leaves (for simplicity). The total score of the team is simply a sum of score points of each player at any moment of time. A player should be able to see his current score and the total score of the team.

So, we have a stream of scores of all players bound to timestamps.

Time:   0  1  2 3 4 5  6  7    8  9 10 11 12 13 14 15 16    17  18 19 20
Stream: A5 A4       B2 A3 A4B4 A9         K5    B6 A8 B10K9 A12

Letters denote players, numbers are score points. Values like A4B4 mean that the score points of two players have changed simultaneously. After groupBy we have three streams: each one for each player. Note, that the set of players is not predefined.

Time:   0  1  2 3 4 5  6  7    8  9 10 11 12 13 14 15 16    17  18 19 20

Alex:   5  4           3  4    9                   8        12
Ben:                2     4                     6     10
Katie:                                    5           9

What I expect to get is a stream of sums for every change of score:

Time:   0  1  2 3 4 5  6  7    8  9 10 11 12 13 14 15 16    17  18 19 20
Total:  5  4        6  5  8    13         18    20 19 27    31

That trivial. But the solution becomes sophisticated.

This is an input stream.

Observable< Record > input = Observable.from( new Record[] {
  new Record( "Alex", 0, 5 ),
  new Record( "Alex", 1, 4 ),
  new Record( "Ben", 5, 2 ),
  new Record( "Alex", 6, 3 ),
  new Record( "Alex", 7, 4 ),
  new Record( "Ben", 7, 4 ),
  new Record( "Alex", 8, 9 ),
  new Record( "Katie", 12, 5 ),
  new Record( "Ben", 14, 6 ),
  new Record( "Alex", 15, 8 ),
  new Record( "Katie", 16, 9 ),
  new Record( "Ben", 16, 10 ),
  new Record( "Alex", 17, 12 )
} )
.delay( e -> Observable.interval( e.timestamp, TimeUnit.SECONDS, scheduler ) )
.doOnCompleted( latch::countDown )
.share();

Record is a simple tuple: Record( String player, int timestamp, double score ).

Let's divide it into groups:

Observable< Observable< Record > > output = input
  .groupBy( e -> e.player )
  .map( g -> g.cache() )

A standard toList method expects an Observable to complete. But the stream of groups completes only when the input stream completes (any player can join the game at any time). So a new transformer is required. It returns a list at onNext call instead of onComplete.

  .compose( new ToListOnNextTransformer() )

And then we generate a new Observable each time a new group is emitted (each time a new player joins the game). This Observable returns the sum of latest score of all players.

  .map( eachPlayerRecordsList -> Observable.combineLatest( eachPlayerRecordsList, ( Object... eachPlayerRecordsArray ) -> {
    double total = Arrays
      .stream( eachPlayerRecordsArray )
      .mapToDouble( e -> ( ( Record )e ).score )
      .sum();
    int timestamp = Arrays
      .stream( eachPlayerRecordsArray )
      .mapToInt( e -> ( ( Record )e ).timestamp )
      .max()
      .getAsInt();
    return new Record( "Total", timestamp, total );
} ) );

At last, we output the values from first group until the second player joins the game. Then we output the values from the second group until the third player joins etc. This is exactly what switchOnNext does.

Observable.switchOnNext( outputg )
  .subscribe( System.out::println, e -> { ( ( Throwable )e ).printStackTrace(); } );

Custom transformer class:

private static class ToListOnNextTransformer implements Observable.Transformer< Observable< Record >, List< Observable< Record > > > {
  private final List< Observable< Record > > list = new ArrayList<>();
  private final PublishSubject< List< Observable< Record > > > ret = PublishSubject.create();

  @Override
  public Observable< List< Observable< Record > > > call( Observable< Observable< Record > > eachPlayerRecords ) {
    eachPlayerRecords.subscribe( playerRecords -> {
      list.add( playerRecords );
      ret.onNext( new ArrayList<>( list ) );
    } );
    return ret;
  }
}

The questions are:

  • Can it be simplified? Desirably, with reactive approach.
  • Didn't I break some rules of RxJava? Like using subscriber inside a subscriber etc.

It's an artificial task. I'm just learning RxJava and trying to understand reactive programming. I hope there exists a better way and I'm just missing something obvious.

The full source code of a test file.

1

1 Answers

2
votes

I don't fully understand your example code, but you could do a running-sum via scan:

source.scan(0, (a, b) -> a + b)

So given an event stream of persons and their points, you can create two groups:

ConnectableObservable<Record> co = input.publish(); // replay, cache, etc.

Observable<Pair<String, Integer>> userScores = co
.groupBy(r -> r.name)
.flatMap(g -> just(Pair.of(g.getKey(), g.scan(0, (a, b) -> a + b))));

Observable<Pair<String, Integer>> teamScores = co
.groupBy(r -> r.team)
.flatMap(g -> just(Pair.of(g.getKey(), g.scan(0, (a, b) -> a + b))));

userScores.filter(p -> p.name.equals("Alex"))
.concatMap(p -> p.second.map(s -> Pair.of(p.first, s)))
.subscribe(System.out::println);

co.connect();