9
votes

I am building an Android application that has specific requirements regarding Bluetooth Low Energy.

I need to write to a Write-only characteristic and receive responses on a separate notification characteristic, and I need to do it in many, many activities. Is there a Rx way to send a request on the 1st characteristic, wait for the answer on the second one, then proceed to another request?

Also, to share my instance of RxAndroidBle I thought about doing some sort of BleManager Singleton where I would expose the Observables, so I can easily subscribe to them in my Presenter. I just want to avoid having to copy the connection logic for each activity and have (ideally) a persistant connection. This way I could only expose the connectionObservable and subscribe to it, so I can easily send Write Requests and get Notifications, but I am sure there is a better way to do it.

This is what I have for now:

@Singleton
public class BleManager {

  private PublishSubject<Void> disconnectTriggerSubject = PublishSubject.create();
  private Observable<RxBleConnection> connectionObservable;
  private boolean isConnected;

  private final UUID CTRL_FROM_BRIDGE_UUID = UUID.fromString("someUUID");
  private final UUID BLE_WRITE_CHARACTERISTIC_UUID = UUID.fromString("someOtherUUID");

  private final RxBleClient bleClient;
  private String mMacAddress;
  private final Context context;
  private RxBleDevice bleDevice;

  @Inject
  public BleManager(Context context, RxBleClient client) {
    Timber.d("Constructing BleManager and injecting members");
    this.context = context;
    this.bleClient = client;
  }

  public void setMacAddress(String mMacAddress) {
    this.mMacAddress = mMacAddress;

    // Set the associated device on MacAddress change
    bleDevice = bleClient.getBleDevice(this.mMacAddress);
  }

  public String getMacAddress() {
    return mMacAddress;
  }

  public RxBleDevice getBleDevice() {
    Preconditions.checkNotNull(mMacAddress);
    return bleClient.getBleDevice(mMacAddress);
  }

  public Observable<RxBleScanResult> getScanSubscription() {
    Preconditions.checkNotNull(context);
    Preconditions.checkNotNull(bleClient);

    return bleClient.scanBleDevices().distinct();
  }

  public Observable<RxBleConnection> getConnectionSubscription() {
    Preconditions.checkNotNull(context);
    Preconditions.checkNotNull(bleDevice);

    if (connectionObservable == null) {
      connectionObservable = bleDevice.establishConnection(context, false)
                                      .takeUntil(disconnectTriggerSubject)
                                      .observeOn(AndroidSchedulers.mainThread())
                                      .doOnUnsubscribe(this::clearSubscription)
                                      .compose(new ConnectionSharingAdapter());
    }

    return connectionObservable;
  }

  public Observable<byte[]> setupListeners() {
    return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.setupNotification(CTRL_FROM_BRIDGE_UUID))
                               .doOnNext(notificationObservable -> Timber.d("Notification Setup"))
                               .flatMap(notificationObservable -> notificationObservable)
                               .observeOn(AndroidSchedulers.mainThread());
  }

  private void triggerDisconnect() {
    disconnectTriggerSubject.onNext(null);
  }


  public Observable<byte[]> writeBytes(byte[] bytes) {
    return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic(
      BLE_WRITE_CHARACTERISTIC_UUID,
      bytes)).observeOn(AndroidSchedulers.mainThread());
  }

  private boolean isConnected() {
    return bleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.CONNECTED;
  }

  /**
   * Will update the UI with the current state of the Ble Connection
   */
  private void registerConnectionStateChange() {
    bleDevice.observeConnectionStateChanges().observeOn(AndroidSchedulers.mainThread()).subscribe(connectionState -> {
      isConnected = connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED);
    });
  }

  private void clearSubscription() {
    connectionObservable = null;
  }

}
1

1 Answers

6
votes

I have thought a bit about your use-case. With sharing the same connection you're introducing states to your application which needs a bit of state handling and therefore it is impossible (or at least I have no idea how) to be purely reactive.

I have focused on establishing a connection and performing a write-notify transmission to a BLE device that is serialized.

private PublishSubject<Pair<byte[], Integer>> inputSubject = PublishSubject.create();

private PublishSubject<Pair<byte[], Integer>> outputSubject = PublishSubject.create();

private Subscription connectionSubscription;

private volatile int uniqueId = 0; // used to identify the transmission we're interested in in case more than one will be started at the same time

public void connect() {
    Observable<RxBleConnection> connectionObservable = // your establishing of the connection wether it will be through scan or RxBleDevice.establishConnection()
    final UUID notificationUuid = // your notification characteristic UUID
    final UUID writeUuid = // your write-only characteristic UUID

    connectionSubscription = connectionObservable
            .flatMap(
                    rxBleConnection -> rxBleConnection.setupNotification(notificationUuid), // subscribing for notifications
                    (rxBleConnection, notificationObservable) -> // connection is established and notification prepared
                            inputSubject // waiting for the data-packet to transmit
                                    .onBackpressureBuffer()
                                    .flatMap(bytesAndFilter -> {
                                                return Observable.combineLatest( // subscribe at the same time to
                                                        notificationObservable.take(1), // getting the next notification bytes
                                                        rxBleConnection.writeCharacteristic(writeUuid, bytesAndFilter.first), // transmitting the data bytes to the BLE device
                                                        (responseBytes, writtenBytes) -> responseBytes // interested only in the response bytes
                                                )
                                                        .doOnNext(responseBytes -> outputSubject.onNext(new Pair<>(responseBytes, bytesAndFilter.second))); // pass the bytes to the receiver with the identifier
                                            },
                                            1 // serializing communication as only one Observable will be processed at the same time
                                    )
            )
            .flatMap(observable -> observable)
            .subscribe(
                    response -> { /* ignored here - used only as side effect with outputSubject */ },
                    throwable -> outputSubject.onError(throwable)
            );
}

public void disconnect() {
    if (connectionSubscription != null && !connectionSubscription.isUnsubscribed()) {
        connectionSubscription.unsubscribe();
        connectionSubscription = null;
    }
}

public Observable<byte[]> writeData(byte[] data) {
    return Observable.defer(() -> {
                final int uniqueId = this.uniqueId++; // creating new uniqueId for identifying the response
                inputSubject.onNext(new Pair<>(data, uniqueId)); // passing the data with the id to be processed by the connection flow in connect()
                return outputSubject
                        .filter(responseIdPair -> responseIdPair.second == uniqueId)
                        .first()
                        .map(responseIdPair -> responseIdPair.first);
            }
    );
}

This is an approach that I think is good as the whole flow is described in one place and therefore easier to understand. The part of communication that is stateful (writing request and waiting for response) is serialized and it has the possibility to persist the connection till the disconnect() call.

The downside is that the transmission relies on the side effects of different flow and calling the writeData() before the connection is established and notification setup will never complete the returned Observable though it shouldn't be a problem to add handling for this scenario with a state check.

Best Regards