1
votes

I have written a flow which creates a transaction that outputs a new state (TransactionBuilder.signInitialTransaction), and then passes it to FinalityFlow to notarize/record/broadcast it. My client-application is starting this flow over RPC with CordaRPCOps.startFlowDynamic and waits for the returned CordaFutures getOrThrow(). This is rather slow, since FinalityFlow only returns once it has delivered the transaction to all other parties/nodes (in fact, if a remote-node is down it seems to never return).

I figured I can speed things up by letting my application only wait for FinalityFlow to have completed notarizeAndRecord(), as I should then have the tx/states in my nodes vault and I can safely assume that other nodes will eventually have this tx delivered and accept it. I implemented this using ProgressTracker, waiting only until FinalityFlow sets currentStep to BROADCASTING.

However, what I'm observing is that if I query the vault (using CordaRPCOps.vaultQueryByCriteria) for the new state very shortly after notarizeAndRecord has returned, I sometimes do not yet get it returned. Is this a bug or rather some deliberate asynchronous behavior where the database is not immediately written to ?

To work around this I then tried to synchronize with the vault inside my flow, in order to update the progressTracker only after the tx/state was actually written to the vault:

val stx = serviceHub.signInitialTransaction(tx)
serviceHub.vaultService.rawUpdates.subscribe {
    logger.info("receiving update $it")
    if(it.produced.any { it.ref.txhash == stx.id }) {
        progressTracker.currentStep = RECORDED
    }
}
subFlow(FinalityFlow(stx))

I can see the update in the node-logs, yet a subsequent vault-query by the RPC-Client (which also shows in the node-logs, after the update) for that very state still does not return anything if executed immediately afterwards...

I am running Corda v2.0.

1

1 Answers

1
votes

I do not know whether vault writes are synchronous.

However, you can side-step this issue by creating an observable on the vault so that you are notified when the new state is recorded. Here's an example where we update a state using its linear ID, then wait for vault updates matching that linear ID:

proxy.startFlowDynamic(UpdateState::class.java, stateLinearId)
val queryCriteria = QueryCriteria.LinearStateQueryCriteria(linearId = listOf(stateLinearId))
val (snapsnot, updates) = proxy.vaultTrackBy<MyLinearState>(queryCriteria)

updates.toBlocking().subscribe { update ->
    val newVaultState = update.produced.single()
    // Perform action here.
}