11
votes

I have a list of files that I'd like to upload to the backend from an Android device. Due to memory constraints, I'd like to make the second API call only after the first finished, the third after the second finished, and so on.

I wrote something like

private Observable<Integer> uploadFiles(List<File> files) {
        return Observable.create(subscriber -> {
            for (int i = 0, size = files.size(); i < size; i++) {
                UploadModel uploadModel = new UploadModel(files.get(0));
                int uploadResult = retrofitApi.uploadSynchronously(uploadModel);
                subscriber.onNext(uploadResult);
            }
            subscriber.onCompleted();
        }).subscribeOn(Schedulers.newThread());
    }

But I feel like this might be going against the spirit of Rx, and the saying is if you're using Observable.create, you're probably doing it wrong... Is this a reasonable approach? Is there a better way to achieve this with Retrofit's RxJava integration?

3
Why not define in your API interface uploadSynchronously to not return Observable? Another way is to use toBlocking on each observable. - marwinXXII
That is what I did - retrofitApi.uploadSynchronously is a blocking retrofit call that returns an int, not an observable. And if your question is why I didn't do a simple loop without Rx, well, it would have been difficult to handle errors and UI progress updates elegantly, plus there are some additional steps before and after this Observable in the chain. - Major Laslo
Actually, no, I it seems for me that basic loop is preferable here over using map. - marwinXXII

3 Answers

5
votes

Naively, I would do that (it does not work, though, see below):

return Observable.from(files).concatMap(file -> retrofitApi.upload(uploadModel));

Now the issue is that there is no way to tell retrofit to use only one thread for those calls.

reduce, however, passes the result of one function call to the next, along with the next emitted value from the original observable. That would work, but the function passed to reduce needs to be synchronous. Not good.

Another approach would be to modify the observable recursively:

void getNextFile(int i) {
    return retrofit.upload(i).
        onNext(result -> getNextFile(i + 1));
}

roughly. But I am not sure how to clean it to make it more readable.

The cleanest I would think would be something like:

Observable.from(files).map(file -> retrofitApi.uploadSynchronously(new UploadModel(file)));
0
votes

The natives of RxJava would emit all items in Observable.from(...) as if in parallel. That's the best way to think of it as parallel emission. However some cases require real consequent execution of the whole chain. I've come to the following solution, probably not the best one but working.

import rx.Observable;
import rx.Subscriber;

import java.util.Iterator;
import java.util.function.Function;

public class Rx {
    public static void ignore(Object arg) {
    }

    public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) {
        return Observable.create(collectorSubscriber ->
                Observable.<Void>create(producerSubscriber ->
                        producerSubscriber.setProducer(ignoredCount -> {
                            if (!iterator.hasNext()) {
                                producerSubscriber.onCompleted();
                                return;
                            }

                            E model = iterator.next();
                            action.apply(model)
                                    .subscribe(
                                            Rx::ignore,
                                            producerSubscriber::onError,
                                            () -> producerSubscriber.onNext(null));
                        }))
                        .subscribe(new Subscriber<Void>() {
                            @Override
                            public void onStart() {
                                request(1);
                            }

                            @Override
                            public void onCompleted() {
                                collectorSubscriber.onNext(null);
                                collectorSubscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                collectorSubscriber.onError(e);
                            }

                            @Override
                            public void onNext(Void aVoid) {
                                request(1);
                            }
                        }));
    }
}

Example usage would be:

    Iterator<? extends Model> iterator = models.iterator();

    Rx.sequential(iterator, model -> someFunctionReturnsObservable(model))
            .subscribe(...);

This method guarantees chained executions of

Observable<Dummy> someFunctionReturnsObservable(Model model)

0
votes

Currently the prefered way of creating observables is with fromAsync:

Observable.fromAsync(new Action1<AsyncEmitter<Object>>()
    {
        @Override
        public void call(final AsyncEmitter<Object> emitter)
        {
            emitter.onNext(object);
            emitter.onCompleted();

            emitter.setCancellation(new AsyncEmitter.Cancellable()
            {
                @Override
                public void cancel() throws Exception
                {
                    // on unSubscribe() callback
                }
            });
        }
    }, AsyncEmitter.BackpressureMode.BUFFER);