0
votes

I'm trying to create an observable that returns a list from a firebase query. the problem is when I call onNext to emit the Item then onComplete it stops emitting items which is after the first item, and not calling onComplete at all emits nothing. Is there a correct way to do what I'm trying to achieve? I'm very new to RxJava still, so please excuse my ignorance. thank you in advanced for any help :)

public Observable<Message> getMessageObservable(String uid) {
    currentUser = auth.getCurrentUser();
    DatabaseReference db_messages = db_root.child("Messages").child(currentUser.getUid())
            .child(uid);
    Query messageQuery = db_messages.orderByKey().limitToLast(10);
    return Observable.create(emitter -> {
        messageQuery.addChildEventListener(new ChildEventListener() {
            @Override
            public void onChildAdded(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
                String messageText = dataSnapshot.child("message").getValue().toString();
                String messageId = dataSnapshot.child("MessageId").getValue().toString();
                Boolean seen = dataSnapshot.child("seen").getValue(Boolean.class);
                Long timestamp = dataSnapshot.child("timestamp").getValue(long.class);
                String fromUser = dataSnapshot.child("from").getValue().toString();
                String toUser = dataSnapshot.child("to").getValue().toString();
                Message message = new Message(messageText, toUser, messageId, seen, timestamp, null, fromUser);
                emitter.onNext(message);
                emitter.onComplete();
            }

            @Override
            public void onChildChanged(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {

            }

            @Override
            public void onChildRemoved(@NonNull DataSnapshot dataSnapshot) {

            }

            @Override
            public void onChildMoved(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {

            }

            @Override
            public void onCancelled(@NonNull DatabaseError databaseError) {

            }
        });
    });
}

@Override
public void getMessages(String userId) {
    currentUser = auth.getCurrentUser();
    Observable.just(userId)
            .flatMap(this::getMessageObservable)
            .toList()
            .subscribe(messages -> {
                chatResults.getMessagesResult(messages);
            });
}
1
I don't think you can return list with that code. Immediately upon receiving onChildAdded you're onComplete'ing and surely it stops emitting. At the same time, by not calling onComplete you simply cannot receive your list in your subscription.ror
I know, that's what I stated in the question lol is there a way to achieve the desired result?ShadowSuave
Yes. toList() cannot produce list for you assuming you emit "forever" - you need onComplete to be called, but not after each item like you do now.ror
In other words either you trigger onComplete upon some condition that your logic has, or you have to listen to onNext in your subscription (and remove toList())ror
How would I go about triggering onComplete with a condition? I've literally tried everything that I can think of. I've tried using different libraries, I've read documentation, searched google far and wide and I still can't figure out this seemingly simply task :/ShadowSuave

1 Answers

0
votes

There are as always many ways to solve the problem. Please check if this one works for you:

  1. Change getMessageObservable to simple method that gets reference, query and adds ChildEventListener listener (no observables created etc)
  2. Create PublishSubject<String> myMessages = PublishSubject.create() pub subject, subscribe to it as you'd usually do with observables. In your subscription make sure to listen to onNext action (Action1)
  3. In your ChilddEventListener impl, make sure to call myMessages.onNext(message) once new messages arrives

With the above setup, you'll now have messages coming to your onNext subscription. You could keep mutable list and append (or prepend) coming messages, thus notifying interested parties re updated list of messages.