0
votes

I'm working with a BLE GATT service that has two characteristics that work in tandem. One is a write-only characteristic where you can submit a string value as a query and the other is a notify-only characteristic where you receive the response to the query.

The notification service is a little slow and it is important not to move on to the next query before the notification is read -- otherwise the response is lost.

To this end, I've been using RxAndroidBle Observables, with separate channels for the write and notify characteristics. A third Observable provides the queries. However, the writes have been going far too quickly.

ConnectableObservable notifyObservable =
    createNotifyObservable(NOTIFY_UUID).publish();

queryObserverable
    .doOnSubscribe(notifyObservable::connect)
    .doOnNext(query -> Log.d(TAG, "Processing query: " + query))
    .flatMap(query -> createWriteObservable(WRITE_UUID, query)))
    .doOnNext(request -> Log.d(TAG, "Write initiated."))
    .flatMap(request -> notifyObservable)
    .doOnNext(response -> Log.d(TAG, "Query response: " + response));

So on running the application, here's what I see in the logs (included are timestamp, process id and thread id):

06-22 14:30:01.991 14085-15360 Processing query: Query1
06-22 14:30:02.011 14085-15360 Processing query: Query2
06-22 14:30:02.011 14085-15360 Processing query: Query3
06-22 14:30:07.261 14085-15443 Write initiated.
06-22 14:30:07.301 14085-15445 Write initiated
06-22 14:30:07.321 14085-15447 Query response: Response3
06-22 14:30:07.321 14085-15449 Write initiated.
06-22 14:30:07.321 14085-15447 Query response: Response3
06-22 14:30:07.351 14085-15453 Query response: Response3

Is there a way with RxJava to ensure that the next write only occurs after the response is received?

EDIT: On specifying the suggested maxConnection argument to both flatMap calls, the calls occur in the proper order, but only for the first query from the Observable. Here's the log for this case:

06-22 14:39:09.841 22245-23079 Processing query: Query1
06-22 14:39:15.131 22245-23166 Write initiated.
06-22 14:39:15.201 22245-23169 Query response: Response1
2
Can you include the thread id for each log line?Dave Moten
Ok, I've added more details from the logs.Tim Clemons

2 Answers

1
votes

Try to pass maxConcurrent = 1 argument to your flatMap calls:

...
.flatMap(query -> createWriteObservable(WRITE_UUID, query), 1)
...
.flatMap(request -> createNotifyObservable(NOTIFY_UUID), 1)
...
1
votes

If there is exactly one response for each query then you could pack your queries in a function:

Observable<String> query(String query) {
  return notifyObservable.flatMap(notifyBytesObservable -> // to be sure that when we will start writing we're already listening
    Observable.combineLatest( // with combineLatest both Observables will be subscribed at the same time
      notifyBytesObservable.first(), // to unsubscribe from notification after the first one
      createWriteObservable(uuid, query), 
      { notificationBytes, writtenBytes -> notificationBytes }
    )
  )
    .map(notificationBytes -> String(notificationBytes))
}

And now you should be able to make calls like this:

queryObserverable
  .flatMap(queryString -> query(queryString), 1)

I hope this helps you.

Best Regards