1
votes

I apologise if this is a duplicate question, I could not find a helpful answer anywhere. I am also a beginner in reactive programming so I am sorry in advance for silly questions.

I am using rxandroidble2 in a Kotlin Android app. I can successfully connect to a BLE device if I only want to read from one characteristic. However, I cannot get it to work for connecting to two separate characteristics.

Characteristic A sends packets every two seconds. Characteristic B sends packets multiple times per second. I can connect to both of them if I do it separately, i.e. run the app with the connection made for only CharA or CharB.

The solutions I have tried are all from stackoverflow, in particular I have tried:

My code is this:

respeckLiveSubscription = connectionObservable?.flatMap{
            Observable.combineLatest(
                it.setupNotification(UUID.fromString(Constants.CHARACTERISTIC_A_UUID)),
                it.setupNotification(UUID.fromString(Constants.CHARACTERISTIC_B_UUID)),
                io.reactivex.functions.BiFunction<Observable<ByteArray>, Observable<ByteArray>, Pair<Observable<ByteArray>, Observable<ByteArray>>> {
                    t1, t2 -> Pair.create(t1, t2)
                }
            )
        }?.subscribe({
            processRESpeckPacket(it.first.blockingFirst(), respeckVersion, this)
            Log.i("ble", "processed first packet")
            processRESpeckPacket(it.second.blockingFirst(), 7, this)
            Log.i("ble", "processed second packet")
        },{
            Log.i("ble", "error when connecting = " + it.stackTrace)
        })

        Log.i("ble", "does this mean connection is over")

I have a few issues:

  • The subscribe method receives an object of type Pair<Observable, Observable>. When I try to get the objects in the pair, they are returned as observables and the only way I managed to get the observable values was by calling blockingFirst(). Obviously that is not a good solution as my code doesn't run.
  • I cannot get the BiFunction to return a Pair of ByteArrays instead of Observable.
  • If I comment out the processRESpeckPacket lines and just leave the logs, they only run once, which makes me think that the value change notifications are not working.
  • I also tried most of the solutions above in Java and I am getting the same behaviour. Did something change in rxandroidble2 that is not explained in the stackoverflow answers that I linked to?
  • Is the fact that my characteristics are sent at different frequencies causing all the problems?

Please help me out I have been stuck with this forever! How can I get it to work in Kotlin?

Thanks a lot.

1

1 Answers

1
votes

Is the fact that my characteristics are sent at different frequencies causing all the problems?

Yes. Observable<ByteArray> emitted from setupNotification() is a hot Observable — it does emit whether you listen or not — same as a radio. Using .blocking*() operators blocks your listening thread and if the other Observable would emit during blocked time you would miss it.

I also tried most of the solutions above in Java and I am getting the same behaviour. Did something change in rxandroidble2 that is not explained in the stackoverflow answers that I linked to?

Changes between RxJava 1 and 2 were mostly cosmetic with changes from Observable to Single or Completable where appropriate. Some operators were deleted or changed their name but it is not your case.

If you only need latest values emitted by both of your characteristics you can use code like this:

respeckLiveSubscription = connectionObservable?.flatMap{
            Observable.combineLatest(
                it.setupNotification(UUID.fromString(Constants.CHARACTERISTIC_A_UUID)).flatMap { it }, // notice flatMap
                it.setupNotification(UUID.fromString(Constants.CHARACTERISTIC_B_UUID)).flatMap { it }, // notice flatMap
                io.reactivex.functions.BiFunction<ByteArray, ByteArray, Pair<ByteArray, ByteArray>> {
                    t1, t2 -> Pair.create(t1, t2) // now we have 'ByteArray's here, not 'Observable<ByteArray>'s 
                }
            )
        }?.subscribe({
            processRESpeckPacket(it.first, respeckVersion, this)
            Log.i("ble", "processed first value")
            processRESpeckPacket(it.second, 7, this)
            Log.i("ble", "processed second value")
        },{
            Log.i("ble", "error when connecting = " + it.stackTrace)
        })

        Log.i("ble", "does this mean connection is over") // no it is only subscribed