I am running flink from IDE. Storing data in the queryable is working, but somehow when I query it, it throws an exception.
Exception
Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:6123/), Path(/user/jobmanager)])
My code:
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost")
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123")
@throws[Throwable]
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure)
else {
// At startup some failures are expected
// due to races. Make sure that they don't
// fail this test.
return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() {
@throws[Exception]
def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey)
})
}
}
@SuppressWarnings(Array("unchecked"))
private def getKvStateWithRetries(queryName: String,
keyHash: Int,
serializedKey: Array[Byte]): Future[Array[Byte]] = {
val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
kvState.recoverWith(recover(queryName, keyHash, serializedKey))
}
def onSuccess = new OnSuccess[Array[Byte]]() {
@throws(classOf[Throwable])
override def onSuccess(result: Array[Byte]): Unit = {
println("found record ")
val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer)
println(value)
}
}
override def invoke(query: QueryMetaData): Unit = {
println("getting inside querystore"+query.record)
val serializedResult = flinkQuery.getResult(query.record, queryName)
serializedResult.onSuccess(onSuccess)
I am not spawning a new mini-cluster or cluster.submit like https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java as I want to this in the same cluster in the same environment as main app running with env.execute. Is that step necessary.
From the documentation by default flink runs at localhost:6123 Is there problem with connection? Do I need to submit job in separate cluster?