0
votes

I am fairly new to functional programming and reactive RxJava. I want to get id and name of a device from database and store it in a Map, I am doing it in RxJava style. I am calling a function that doesn't need to return anything

.doOnNext(t -> updateAssetNameMap())

then the function looks like;

    private void updateDeviceNameMap() {
    LOGGER.debug("Reading device name and id from database");
    Observable<SQLConnection> jdbcConnection = createJdbcConnection();
    Scheduler defaultScheduler = RxHelper.scheduler(vertx);
    Observable<JsonArray> res =  jdbcConnection  //need to return JsonArray
            .flatMap(connection -> just(connection)
                    .flatMap(j -> runQuery(connection, "SELECT name,id FROM device")
                                .observeOn(defaultScheduler)
                                .doOnNext(m -> LOGGER.info("size: " + m.size()))
                                .flatMap(job -> { LOGGER.info(">>" + job.getJsonArray(0));
                                         //or if I can extract JsonArray items here, 
                                         //I can update my Map here too.
                                                  return just(job.getJsonArray(0));
                                                }
                                       )
            .doOnError(e -> { LOGGER.error("failed to connect to db", e);
                        connection.close(); })
            .doOnCompleted(connection::close)
            .onErrorReturn(e -> null));

    //System.out.println("" + res.map(d -> LOGGER.info(d.toString())));
    //get the JsonArray and update the deviceNameMap

The connection to DB is made successfully and query is also done correctly. I can convert any Object to Observable by Observable.from(ObjectName), but can't to the opposite. An appropriate mapping needs to be done after .flatMap(job -> just(job.getJsonArray(0)) but I have no clue how. After running the Verticle, I even cannot see anything logged from line .flatMap(job -> { LOGGER.info(">>" + job.getJsonArray(0));. Am I missing something ?

1

1 Answers

0
votes

You must subscribe to your Observable<JsonArray> otherwise nothing happens.