0
votes

My android application has a FeedDetailFragment that displays Feed details. A feed has basic information and metadata, which are retrieved through two separate calls to the server. The server interface is filled in with Retrofit. I have implemented something that, to my novice Rx knowledge, looks logical. However, as you may have guessed, it doesn't work.

External classes:

  • FeedInfo - parcellable class that contains basic feed info
  • FeedMetadata - parcellable class that contains metadata about feed
  • Feed - auxiliary class that combines feed info and metadata, providing some hepler functions
  • UvClient - server interface implemented with Retrofit

Relevant FeedDetailFragment code:

public class FeedDetailFragment extends Fragment implements OnMapReadyCallback {
    public static final String ARG_FEED_ID       = "feed_id";
    public static final String ARG_FEED_INFO     = "feed_info";
    public static final String ARG_FEED_METADATA = "feed_metadata";
    public static final int INVALID_FEED_ID      = -1;

    ...

    private class PlaceFeedSubscriber extends Subscriber<Pair<GoogleMap, Feed>> {
        @Override
        public void onNext(Pair<GoogleMap, Feed> pair) {
            Log.i(TAG, String.format("Placing feed %d on [%f, %f] onto map %s",
                    pair.second.getInfo(),
                    pair.second.getMetadata().getSensorLatitude(),
                    pair.second.getMetadata().getSensorLongitude(),
                    pair.first.getMapType()));

            pair.first.addMarker(new MarkerOptions()
                    .position(new LatLng(
                            pair.second.getMetadata().getSensorPoint().getCoordinates()[1],
                            pair.second.getMetadata().getSensorPoint().getCoordinates()[0]))
                    .title("Marker"));
            mMapAPI.moveCamera(CameraUpdateFactory.newLatLngZoom(
                    new LatLng(
                            pair.second.getMetadata().getSensorPoint().getCoordinates()[1],
                            pair.second.getMetadata().getSensorPoint().getCoordinates()[0])
                    , 15));
        }

        @Override
        public void onCompleted() {
            Log.i(TAG, "Completed drawing of feed");
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "Drawing of feed failed with: " + e);
        }
    }

    public FeedDetailFragment() {
        mMapObservable = Observable.empty().subscribeOn(Schedulers.io());
        mFeedIdObservable = Observable.empty().subscribeOn(Schedulers.io());
        mFeedInfoObservable = Observable.empty();
        mFeedMetadataObservable = Observable.empty();

        // Start fetching new feed information
        mFeedIdObservable.doOnEach(new Action1<Integer>() {
            @Override
            public void call(Integer feedId) {
                Log.d(TAG, "Got a new feed id - " + feedId);
                mFeedInfoObservable.mergeWith(mUvClient.getFeed(feedId));
            }
        });

        // Start fetching new feed metadata
        mFeedInfoObservable.doOnEach(new Action1<FeedInfo>() {
            @Override
            public void call(FeedInfo feedInfo) {
                Log.d(TAG, "Got a new feed info - " + feedInfo.getTitle());
                mFeedMetadataObservable.mergeWith(mUvClient.getFeedMetadata(feedInfo.getId()));
            }
        });

        // Produce a new feed
        mFeedObservable = Observable.combineLatest(mFeedInfoObservable, mFeedMetadataObservable, new Func2<FeedInfo, FeedMetadata, Feed>() {
            @Override
            public Feed call(FeedInfo feedInfo, FeedMetadata feedMetadata) {
                return new Feed(feedInfo, feedMetadata);
            }
        });

        // Render the feed onto map
        Observable.combineLatest(mFeedObservable, mMapObservable, new Func2<Feed, GoogleMap, Pair<GoogleMap, Feed>>() {
            @Override
            public Pair<GoogleMap, Feed> call(Feed feed, GoogleMap map) {
                return new Pair(map, feed);
            }
        }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(new PlaceFeedSubscriber());
    }

    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

        Bundle arguments = getArguments();

        if (arguments.containsKey(ARG_FEED_ID)) {
            setFeed(arguments.getInt(ARG_FEED_ID));
        }
        else if (arguments.containsKey(ARG_FEED_INFO)) {
            if (arguments.containsKey(ARG_FEED_METADATA)) {
                setFeed((FeedInfo)Parcels.unwrap(arguments.getParcelable(ARG_FEED_INFO)),
                        (FeedMetadata)Parcels.unwrap(arguments.getParcelable(ARG_FEED_METADATA)));
            }
            else {
                setFeed((FeedInfo)Parcels.unwrap(arguments.getParcelable(ARG_FEED_INFO)));
            }
        }
    }

    ...

    @Override
    public void onMapReady(GoogleMap googleMap) {
        mMapAPI = googleMap;

        mMapObservable.mergeWith(Observable.just(googleMap));
    }

    /**
     * Sets the feed ID to be shown in the fragment. This triggers the chain of fetching feed info
     * and feed metadata, finally displaying it on the map.
     * @param feedId ID of the feed to display in the fragment.
     */
    public void setFeed(int feedId) {
        Log.d(TAG, String.format("Setting new feed ID - %d", feedId));
        mFeedIdObservable.mergeWith(Observable.just(feedId));
    }

    /**
     * Sets feed info. This triggers fetching of feed metadata, finally displaying it on the map.
     * @param feedInfo Information of the feed to display on the map.
     */
    public void setFeed(FeedInfo feedInfo) {
        Log.d(TAG, String.format("Setting new feed info - %s", feedInfo.getTitle()));
        mFeedInfoObservable.mergeWith(Observable.just(feedInfo));
    }

    /**
     * Displays feed info on the map.
     * @param feedInfo Information of the feed to display on the map.
     * @param feedMetadata Metadata of the feed to display on the map.
     */
    public void setFeed(FeedInfo feedInfo, FeedMetadata feedMetadata) {
        Log.d(TAG, String.format("Setting new feed info and metadata - %s", feedInfo.getTitle()));
        mFeedObservable.mergeWith(Observable.just(new Feed(feedInfo, feedMetadata)));
    }
}

The log output I see is as follows:

Setting new feed info - SampleFeed
Completed drawing of feed

My overall idea was that the observables would emit new data when I merge it in. Some observables are created empty so that they do not emit anything but I can still work with them.

The potential flow could be as follows:

  • Activity gets a callback from FeedListFragment notifying the id of the feed that was clicked
  • Activity checks either gets and passes FeedInfo to FeedDetailFragment.setFeed or invokes FeedDetailFragment.setFeed with the feed's id (lets assume the latter for completeness)
  • FeedDetailFragment merges new observable with the received feed id
  • The merge triggers emission of new event on mFeedIdObservable
  • The .doOnEach of mFeedIdObservable kicks off Retrofit interface to fetch FeedInfo
  • The .doOnEach of mFeedInfoObservable kick off Retrofit interface to fetch FeedMetadata
  • The .combineLatest of mFeedObservable fires off when both mFeedInfoObservable and mFeedMetadataObservable return new data
  • Finally, getting the GoogleMap and Feed, the call is made to subscriber to draw the feed on the map

This is how the thought was put together in my head. Obviously, it is wrong. Where did I go wrong and how can I fix it? I'd love some pointers and maybe more general ideology/methodology approach teaching. Thanks for any advice!

UPDATE 1

So I've been trying to figure this out. Read more documents... a lot to learn. I've replaced Observable.empty() with Observable.never(). From documentation, I read that empty doesn't emit anything and completes, which is not what I want. On the other hand, never doesn't emit anything but does not complete. As such, I can use it for the purpose I'm seeking. Still not getting what I want but, I hope, one step closer.

UPDATE 2

Getting a bit more hang of it. Looking into source of .never() and .empty(), I see that the former does not call .onNext() and the latter calls .onComplete(). There is nothing in the middle I can choose. Started looking around for alternatives. Basically, my code doesn't execute because, in my previous tries, observable either completed immediately or never proceeded to call next. However, there is nothing to call .onNext() in the beginning. As such, I need a placeholder.

Reading more docs, I came across Subjects. In particular, PublishSubject doesn't emit anything until a subscriber subscribes. This seemed like a viable solution. However, the subscriber must subscribe directly to the subject. This didn't seem to work with .mergeWith() upon the subject.

Will not give up :)

UPDATE 3

Thanks to @dwursteisen, I continued with PublishSubject approach. This is the relevant code that changed:

...

private PublishSubject<GoogleMap> mMapObservable = null;
private PublishSubject<Feed> mFeedObservable = null;
private PublishSubject<Integer> mFeedIdObservable = null;
private PublishSubject<FeedInfo> mFeedInfoObservable = null;
private PublishSubject<FeedMetadata> mFeedMetadataObservable = null;

...

public FeedDetailFragment() {
    mMapObservable = PublishSubject.create();
    mFeedObservable = PublishSubject.create();
    mFeedIdObservable = PublishSubject.create();
    mFeedInfoObservable = PublishSubject.create();
    mFeedMetadataObservable = PublishSubject.create();

    mMapObservable.subscribe(new Action1<GoogleMap>() {
        @Override
        public void call(GoogleMap googleMap) {
            mMapApi = googleMap;
        }
    });

    mFeedMetadataObservable.subscribe(new Action1<FeedMetadata>() {
        @Override
        public void call(FeedMetadata feedMetadata) {
            // no code
        }
    });

    mFeedObservable.subscribe(new Action1<Feed>() {
        @Override
        public void call(Feed feed) {
            // no code
        }
    });

    // Start fetching new feed information
    mFeedIdObservable.subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer feedId) {
            mUvClient.getFeed(feedId).subscribe(new Action1<FeedInfo>() {
                @Override
                public void call(FeedInfo feedInfo) {
                    mFeedInfoObservable.onNext(feedInfo);
                }
            });
        }
    });

    // Start fetching new feed metadata
    mFeedInfoObservable
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<FeedInfo>() {
                @Override
                public void call(FeedInfo feedInfo) {
                    mFeedTitle.setText(feedInfo.getTitle());

                    mUvClient.getFeedMetadata(feedInfo.getId()).subscribe(new Action1<FeedMetadata>() {
                        @Override
                        public void call(FeedMetadata feedMetadata) {
                            mFeedMetadataObservable.onNext(feedMetadata);
                        }
                    });
                }
            });

    // Produce a new feed
    Observable.combineLatest(mFeedInfoObservable, mFeedMetadataObservable, new Func2<FeedInfo, FeedMetadata, Feed>() {
        @Override
        public Feed call(FeedInfo feedInfo, FeedMetadata feedMetadata) {
            Feed feed = new Feed(feedInfo, feedMetadata);
            return feed;
        }
    }).subscribeOn(Schedulers.io()).subscribe(new Action1<Feed>() {
        @Override
        public void call(Feed feed) {
            mFeedObservable.onNext(feed);
        }
    });

    // Render the feed onto map
    Observable.combineLatest(mFeedObservable, mMapObservable, new Func2<Feed, GoogleMap, Pair<GoogleMap, Feed>>() {
        @Override
        public Pair<GoogleMap, Feed> call(Feed feed, GoogleMap map) {
            return new Pair(map, feed);
        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new PlaceFeedSubscriber());
}

...

@Override
public void onMapReady(GoogleMap googleMap) {
    mMapObservable.onNext(googleMap);
}

/**
 * Sets the feed ID to be shown in the fragment. This triggers the chain of fetching feed info
 * and feed metadata, finally displaying it on the map.
 * @param feedId ID of the feed to display in the fragment.
 */
public void setFeed(int feedId) {
    mFeedIdObservable.onNext(feedId);
}

/**
 * Sets feed info. This triggers fetching of feed metadata, finally displaying it on the map.
 * @param feedInfo Information of the feed to display on the map.
 */
public void setFeed(FeedInfo feedInfo) {
    mFeedInfoObservable.onNext(feedInfo);
}

/**
 * Displays feed info on the map.
 * @param feedInfo Information of the feed to display on the map.
 * @param feedMetadata Metadata of the feed to display on the map.
 */
public void setFeed(FeedInfo feedInfo, FeedMetadata feedMetadata) {
    mFeedObservable.onNext(new Feed(feedInfo, feedMetadata));
}

Obviously, now that I got the basics working, I'll go through it and do proper handling of errors, caching, and other conditions. However, I do have one question: is there any way to simplify the following code to directly use Retrofit observable instead of subscribing to it inside the subscribe... maybe Rx operator that would inject its resolution into mFeedInfoObservable?

mFeedIdObservable.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer feedId) {
        mUvClient.getFeed(feedId).subscribe(new Action1<FeedInfo>() {
            @Override
            public void call(FeedInfo feedInfo) {
                mFeedInfoObservable.onNext(feedInfo);
            }
        });
    }
});

Also, I would love to hear any comments in the general approach. I'm still wrapping my head around Rx and my implementation is not best, I am sure.

1

1 Answers

3
votes
 mFeedInfoObservable = Observable.empty();

You build an empty Observable that will never emit value. So when you'll subscribe to this Observable, you'll be only notified of it's completion.

mFeedInfoObservable.mergeWith(Observable.just(feedInfo));

Observable are immutable. It's mean that calling a method won't change its state. mergeWith will produce a new Observable that is the result of the merge of an Observable with another.

So in your case, you build an new Observable that aren't used.

According to your code, it's seem that you need a Subject (like you mention : PublishSubject) to emit value from different user call.

private final Subject<Integer, Integer> subject = PublishSubject.create();


public void setFeed(int feedId) {
    subject.onNext(feedId);
}

public FeedDetailFragment() {
    subject.flatMap(feedId -> mUvClient.getFeed(feedId))
           .subscribe(/**...**/);
}

Please note that doOnNext should be used for side effect call (ie: code that will change an element outside of your Observable, like logging, ...). I think in your case you may need other operators like flatMap, zip, ... in order to compose the result like what you want to achieve.