1
votes

Firstly, let me begin this by saying that I'm relatively new to RxJava so please bear with me as I grasp the basics. :)

I basically understand the whole subscribe[On|With]() beginner type of stuff (more or less). I'm trying however to do something a tad bit more complicated, and JavaRx seems perfect for this kind of thing.

I have a curated movie database that I'm querying movie details from. Now, the API here is a bit weird. You have to make two calls to get everything about a movie. The first API call yields the id, title, categories and rating and the second one other important stuff like the description, trailer etc. As you might have guessed, you need the movieId from part1 to get the extra info. Nested calls work here, but they are by no means considered best practice.

My code looks something like this:

pageSingle.subscribeOn(Schedulers.io())
        .subscribeWith(new DisposableSingleObserver<mPage>() {
            @Override public void onSuccess(mPage value) {
                moviesInPage = value.movies;
                if(moviesInPage==null){
                    Log.w(TAG, "mergeMovieParts: no movies returned");
                }else {
                    for (int i = 0; i < moviesInPage.size(); i++) {
                        final fMovie firstMovie = moviesInPage.get(i);
                        apiService.getSecondPart(firstMovie.id)
                                .subscribeWith(new DisposableSingleObserver<sMovie>() {
                            @Override 
                            public void onSuccess(sMovie secondMovie) {
                                mergeMovieParts(firstMovie, secondMovie);
                                }
                            @Override 
                            public void onError(Throwable e) {
                                e.printStackTrace();
                                }
                        });
                    }
                }

            }
            @Override public void onError(Throwable e) {
                handleError(e);
            }
        });

with pageSingle also being a Single. You can probably see why this isn't guaranteed to work all the time. Single.zip() won't work here, as I need information (the id more specifically) from the first call to start the second one. My question boils down to:

  1. how do I go about replacing those nested Retrofit calls with JavaRx ones? Please note that mPage returns a List<fMovie> object.

  2. at the moment, I'm doing these calls one after the other. Is it possible to fetch the movie list and based on this to make multiple calls simultaneously to fetch the second part? I'm guessing multi-threading is the answer here, but how do I go about it in RxJava and what are the implications e.g. is it worth it, in your opinion or are the tradeoffs (if any) to big?

  3. unrelated to this question, but do you have any suggestions of books, videos etc. I could read/watch that would help me catch up with RxJava the most. It excites me a lot, but at the same time I feel it's a bit too overwhelming to learn everything by "reading the documentation".

If I wasn't clear enough, please don't hesitate to ask for more context/ clarification. Thanks in advance!

EDIT: the error I get with flatMap():

Error:(109, 17) error: no suitable method found for flatMap(<anonymous Function<Movie1,ObservableSource<?>>>)
method Observable.<R#1>flatMap(Function<? super Object,? extends ObservableSource<? extends R#1>>) is not applicable
(cannot infer type-variable(s) R#1
(argument mismatch; <anonymous Function<Movie1,ObservableSource<?>>> cannot be converted to Function<? super Object,? extends ObservableSource<? extends R#1>>))
method Observable.<R#2>flatMap(Function<? super Object,? extends ObservableSource<? extends R#2>>,boolean) is not applicable
(cannot infer type-variable(s) R#2
(actual and formal argument lists differ in length))
method Observable.<R#3>flatMap(Function<? super Object,? extends ObservableSource<? extends R#3>>,boolean,int) is not applicable
(cannot infer type-variable(s) R#3
(actual and formal argument lists differ in length))
method Observable.<R#4>flatMap(Function<? super Object,? extends ObservableSource<? extends R#4>>,boolean,int,int) is not applicable
[...]

My implementation:

movieService.getMoviesFromPath(path)
                .subscribeOn(Schedulers.io())
                .flattenAsObservable(new Function<MoviePage, Iterable<?>>() {
                    @Override
                    public Iterable<?> apply(MoviePage moviePage) throws Exception {
                        return moviePage.movieList;  // movieList is a List<Movie1>
                    }
                })
                .flatMap(new Function<Movie1, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(Movie1 movie1) throws Exception {
                        return null;
                    }
                });
1
.flatMap to the rescue! See [this post])stackoverflow.com/questions/39214073/…) for an example (that is order and line-item based, but the data dependency is there too). - Tassos Bassoukos
Thanks for the comment, @TassosBassoukos. Please check my edit. - JohnTheWalker

1 Answers

2
votes

It seems you would need something like the following:

movieService.getPage()
    .flattenAsObservable(page -> page.movies)
    .flatMap(movie -> apiService.getSecondPart(movie.movieId))

Or, taking away the lambdas,

movieService.getPage()
            .flattenAsObservable(new Function<Page, Iterable<Movie>>() {
                @Override
                public Iterable<Movie> apply(Page page) throws Exception {
                    return page.movies;
                }
            })
            .flatMap(new Function<Movie, ObservableSource<DetailedMovie>>() {
                @Override
                public ObservableSource<DetailedMovie> apply(Movie movie) throws Exception {
                    return apiService.getSecondPart(movie.movieId);
                }
            })

That being said, writing something like new DisposableSingleObserver<mPage> in the original post is a bad sign. Before continuing with Rx, it may be worth your time learning about Generics in Java since this is the cause of your second error.

You probably need to have a good working knowledge of generics (and standard Java naming practices) to get the most out of RxJava. Good luck!

Note the two examples assume that subscribeOn(Schedulers.io()) is implicit and configured at the level of Retrofit. You can see how to do this in these answers

Update: if you want to merge the two Movie objects together I suppose you could do:

    movieService.getPage()
            .flattenAsObservable(new Function<Page, Iterable<? extends Movie>>() {
                @Override
                public Iterable<? extends Movie> apply(Page page) throws Exception {
                    return page.movies;
                }
            })
            .flatMap(new Function<Movie, ObservableSource<DetailedMovie>>() {
                @Override
                public ObservableSource<DetailedMovie> apply(Movie movie) throws Exception {
                    return apiService.getSecondPart(movie.movieId).toObservable();
                }
            }, new BiFunction<Movie, DetailedMovie, MergedMovie>() {
                @Override
                public MergedMovie apply(Movie movie, DetailedMovie detailedMovie) throws Exception {
                    return new MergedMovie(movie, detailedMovie);
                }
            });