0
votes

I'm updating an app to use RxAndroidBLE, and struggling with how to translate my existing callback pattern into an Rx pattern. In particular, I need to respond to characteristic notifications in different ways depending on the received data, and send a specific write command back to the device (which will then cause the characteristic to be updated, in a loop).

The rationale behind this is that the BLE device I'm integrating with has a special custom characteristic, to which we can send different commands and then listen for data back.

I've read up lots about chaining commands using RxBLE, but none seem to address my particular query, which is how to send a command back to the device on observing a change notification (since the connection itself seems to be out of scope by the time we get to the observable block). What is the "Rx Way" of doing this?

For clarity, this is the entire flow of my BLE service:

  1. scan for devices with a filter on our custom characteristic
  2. connect to a found device
  3. read a couple of standard characteristics (strings), and store these in our data model
  4. if and only if one of the characteristics matches one of an array of strings, proceed to 5. otherwise, dispose of the connection.
  5. subscribe to our custom "control" characteristic ("CC") for change notifications
  6. send command 1 to CC. this should trigger answer 1 to be set in CC, so our handler is called
  7. perform some calculations on answer 1 and save to our model. send command 2 (which includes these modified values, so we can't determine this at compile time) to CC. this should trigger answer 2 in CC.
  8. on receiving answer 2, send command 3, which should trigger answer 3.
  9. on reciving answer 3, parse into an int value. if answer 3 == 0, dispose of the connection - we are done.
  10. answer 3 > 0, so send command 4. this will trigger answer 4.
  11. perform some calculations on answer 4 and store the results in our model
  12. then send command 5, which will actually trigger answer 3 (both commands 5 and 3 trigger answer 3). since we are already subscribed to answer 3, this takes us back to step 9. above - we keep looping until answer 3 is 0 (ie. we have saved all the data).

Edit: I was loathe to share code, as I'm well aware there is no possible way the following will work - but I'm hoping it describes what I'm trying to do even if the syntax won't even compile:

                  connectedDevice.connectionDisposable = connectedDevice.getRxDevice().establishConnection(false)
                                                    .observeOn(AndroidSchedulers.mainThread())
                                                    .flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(BATTERY_CHARACTERISTIC_UUID))
                                                    .doOnNext(bytes -> {
                                                        //store the battery info in our model here
                                                    })
                                                    .flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(SERIAL_NUMBER_CHARACTERISTIC_UUID))
                                                    .doOnNext(bytes -> {
                                                                //store the serial number info in our model here
                                                                //TODO: how do we only proceed to the subscription if serialNumber is correct?
                                                            }
                                                    )
                                                    .flatMap(rxBleConnection -> rxBleConnection.setupNotification(CUSTOM_CHARACTERISTIC_UUID))
                                                    .doOnNext(notificationObservable -> {
                                                        // Notification has been set up
                                                        rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1); //we can't do this because rxBleConnection is out of scope!
                                                    })
                                                    .flatMap(notificationObservable -> notificationObservable) // <-- Notification has been set up, now observe value changes.
                                                    .subscribe(
                                                            bytes -> {
                                                                // Given characteristic has been changes, here is the value.

                                                                switch(commandFromBytes(bytes)){
                                                                    case answer1:
                                                                        int newCommand = doSomethingWith(bytes);
                                                                        rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_2 + newCommand);
                                                                        break;
                                                                    case answer2:
                                                                        rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_3);
                                                                        break;
                                                                    case answer3:
                                                                        if(bytes <= 0){
                                                                            connectedDevice.connectionDisposable.dispose();
                                                                        }
                                                                        else{
                                                                            rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_4);
                                                                        }
                                                                        break;
                                                                    case answer4:

                                                                            doSomethingLongWindedWith(bytes);
                                                                            //then
                                                                            rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_5);
                                                                            //command 5 will cause answer3 to be notified, so we loop back above                                                                             
                                                                        break;
                                                                }

                                                            },
                                                            throwable -> {
                                                                // Handle an error here.
                                                            }
                                                    );

Edit 2: after playing bracket tango for a bit, I think I'm close to a solution here:

 connectedDevice.connectionDisposable = connectedDevice.getRxDevice().establishConnection(false)
                                                    .observeOn(AndroidSchedulers.mainThread())
                                                    .flatMapSingle(rxBleConnection -> rxBleConnection.readCharacteristic(BATTERY_CHARACTERISTIC_UUID)
                                                            .doOnNext(bytes -> {
                                                                connectedDevice.setBatLevel(bytes);
                                                            })
                                                            .flatMapSingle(rxBleConnection2 -> rxBleConnection.readCharacteristic(SERIAL_NUMBER_CHARACTERISTIC_UUID))
                                                            .doOnNext(bytes -> {
                                                                        connectedDevice.setSerialNum(bytes);
                                                                        //we also notify a singleton listener here
                                                                    }
                                                            )
                                                            .flatMap(rxBleConnection3 -> {
                                                                        if (serialNumberIsCorrect(connectedDevice.getSerialNum())) {
                                                                            rxBleConnection.setupNotification(CUSTOM_CHARACTERISTIC_UUID).subscribe(
                                                                                    bytes -> {
                                                                                        // Given characteristic has been changes, here is the value.

                                                                                        switch (commandFromBytes(bytes)) {
                                                                                            case answer1:
                                                                                                int newCommand = doSomethingWith(bytes);
                                                                                                rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_2 + newCommand);
                                                                                                break;
                                                                                            case answer2:
                                                                                                rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_3);
                                                                                                break;
                                                                                            case answer3:
                                                                                                if (bytes <= 0) {
                                                                                                    //we also notify a singleton listener here
                                                                                                    connectedDevice.connectionDisposable.dispose();
                                                                                                } else {
                                                                                                    rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_4);
                                                                                                }
                                                                                                break;
                                                                                            case answer4:

                                                                                                doSomethingLongWindedWith(bytes);
                                                                                                //then
                                                                                                rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_5);
                                                                                                //command 5 will cause answer3 to be notified, so we loop back above
                                                                                                break;
                                                                                        }
                                                                                    },
                                                                                    throwable -> {
                                                                                        // Handle an error here.
                                                                                    }
                                                                            );
                                                                        } else {
                                                                            connectedDevice.connectionDisposable.dispose();
                                                                        }
                                                                    }
                                                                            .doOnNext(notificationObservable -> {
                                                                                // Notification has been set up
                                                                                if (serialNumberIsCorrect(connectedDevice.getSerialNum())) {
                                                                                    rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1);
                                                                                }
                                                                            })
                                                            ));
1
Have you tried anything? Feel free to show your codeDariusz Seweryn
@DariuszSeweryn I've tried lots of things, nothing which I've been able to actually compile :) have updated my question with one attemptPeter
rxBleConnection.writeCharacteristic(CUSTOM_CHARACTERISTIC_UUID, COMMAND_1); you actually need to subscribe to this operation.Christopher
@Christopher granted, but how do I then write commands back to the connection from within that subscription block?Peter
The RxBleConnection is accessible in it's scope by default. So in the first .flatMap() after the RxBleDevice.establishConnection() you may construct the part of the flow that is depending on the connection. In step 7 you write perform some calculations on answer 1 and save to our model. send command 2 (which includes these modified values, so we can't determine this at compile time)—does it mean that COMMAND_2 is not a static final constant? Which commands are not static?Dariusz Seweryn

1 Answers

3
votes

The best approach, according to this Jake Wharton's talk would be to construct an Observable that would emit just values that are needed for updating your model.

(example in Kotlin)

We could have these outputs of the Observable:

sealed class ConnectionEvent {
    object CloseConnection : ConnectionEvent() // dummy event to notify when the connection can be closed
    data class SerialNumber(val byteArray: ByteArray) : ConnectionEvent()
    data class BatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
    data class Answer4(val byteArray: ByteArray) : ConnectionEvent()
}

And the whole flow could look like this:

bleDevice.establishConnection(false)
        .flatMap { connection ->
            val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
            val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial number
            val continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
                    .flatMapObservable {
                        when {
                            it != matchingSerialNumber -> Observable.just(ConnectionEvent.CloseConnection as ConnectionEvent) // close connection if serial does not match
                            else -> createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
                        }
                    }
            Observable.concat( // the actual flow of the whole connection
                    batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
                    serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
                    continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
            )
        }
        .takeWhile { it != ConnectionEvent.CloseConnection } // if the connection is to be closed -> unsubscribe
        .subscribe(
                { connectionEvent ->
                    when(connectionEvent) {
                        is ConnectionEvent.SerialNumber -> { /* Update model */ }
                        is ConnectionEvent.BatteryLevel -> { /* Update model */ }
                        is ConnectionEvent.Answer4 -> { /* Update model */ }
                    }
                },
                { /* handle errors */ }
        )

where the write/notification dance is:

private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
    return connection.setupNotification(customCharUuid)
            .flatMap { ccNotifications ->
                ccNotifications.flatMap {
                    when (answerFromBytes(it)) {
                        answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(it)).ignoreEmissions()
                        answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
                        answer3 -> when (it.isEmpty()) {
                            true -> Observable.just(ConnectionEvent.CloseConnection)
                            else -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
                        }
                        answer4 -> connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()
                                .startWith(Observable.just(ConnectionEvent.Answer4(it)))
                        else -> Observable.error(Exception("Unexpected answer! => ${answerFromBytes(it)}"))
                    }
                }
                        .startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // initiate with the command1
            }
}

I have used an extension function for more clarity:

fun Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<ConnectionEvent>()

Edit:

I have changed the code a bit to get rid of CloseConnection event and leverage the completions of the observables. So now the outputs look like this:

sealed class ConnectionEvent {
    data class SerialNumber(val byteArray: ByteArray) : ConnectionEvent()
    data class BatteryLevel(val byteArray: ByteArray) : ConnectionEvent()
    data class Answer4(val byteArray: ByteArray) : ConnectionEvent()
}

The main flow:

bleDevice.establishConnection(false)
        .map { connection ->
            val batteryLevelSingle = connection.readCharacteristic(batteryLevelCharUuid).map { ConnectionEvent.BatteryLevel(it) as ConnectionEvent }
            val serialNumberSingle = connection.readCharacteristic(serialNumberCharUuid).map { ConnectionEvent.SerialNumber(it) }.cache() // cache as the output will be used by the continuation observable as well and we do not want to re-read the serial number
            val continuationObservable: Observable<ConnectionEvent> = serialNumberSingle // continuation observable will work if the serial number matches
                    .flatMapObservable {
                        if (it == matchingSerialNumber) createContinuationObservable(connection) // create flow for getting more data via additional writes and notifications
                        else Observable.empty() // do not continue if serial number does not match
                    }
            Observable.concat( // the actual flow of the whole connection
                    batteryLevelSingle.toObservable(), // we are starting with getting the battery level and emitting it
                    serialNumberSingle.toObservable(), // we are getting the serial number and emitting it
                    continuationObservable // if the serial number matches we continue with notifications and getting more data. otherwise CloseConnection
            )
        }
        .publish {
            // create a Completable from the above Observable.concat()
            val dataDownloadCompletable = it.take(1) // take the first emission (there will be only one)
                    .flatMapCompletable { it.ignoreElements() } // and wait until the first emission completes
            it.takeUntil(dataDownloadCompletable.toObservable<Any>()) // when dataDownloadCompletable completes —> unsubscribe from the upstream, mainly .establishConnection() to close it
        }
        .flatMap { it } // unwrap the above flow
        .subscribe(
                { connectionEvent ->
                    when (connectionEvent) {
                        is ConnectionEvent.SerialNumber -> { /* Update model */ }
                        is ConnectionEvent.BatteryLevel -> { /* Update model */ }
                        is ConnectionEvent.Answer4 -> { /* Update model */ }
                    }
                },
                { /* handle errors */ }
        )

Write/notification part:

private fun createContinuationObservable(connection: RxBleConnection): Observable<ConnectionEvent> {
    return connection.setupNotification(customCharUuid)
            .flatMap { ccNotifications ->
                ccNotifications.map { Pair(answerFromBytes(it), it) } // map every response to a pair of <answer, bytes>
                        .startWith(connection.writeCharacteristic(customCharUuid, command1).ignoreEmissions()) // and start with writing command1 to initiate the data exchange
            }
            .takeWhile { (answer, bytes) -> !(answer == answer3 && bytes.isEmpty()) } // end the createContinuationObservable on the first answer3 with an empty bytes
            .flatMap<ConnectionEvent> { (answer, bytes) ->
                when (answer) {
                    answer1 -> connection.writeCharacteristic(customCharUuid, command2FromAnswer1Bytes(bytes)).ignoreEmissions()
                    answer2 -> connection.writeCharacteristic(customCharUuid, command3).ignoreEmissions()
                    answer3 -> connection.writeCharacteristic(customCharUuid, command4).ignoreEmissions()
                    answer4 -> Observable.just(ConnectionEvent.Answer4(bytes)) // when answer4 is received emit actionable item to update the model
                            .concatWith(connection.writeCharacteristic(customCharUuid, command5).ignoreEmissions()) // and send the next command5
                    else -> Observable.error(Exception("Unexpected answer! => $answer"))
                }
            }
}

And the extension:

fun <T> Single<ByteArray>.ignoreEmissions() = this.toCompletable().toObservable<T>()