I'm very new to vert.x platform. I have a standard and a worker verticle in my project which communicates through the eventBus. The worker verticle performs multiple REST API calls in loop and database access.
My problem is the worker verticle complete the task without issue at some run but sometimes it throws below error.
Exception in thread "vert.x-worker-thread-12" io.vertx.core.VertxException: Connection was closed
I'm using kotlin coroutine to handle constructDevice(vertx: Vertx)
function which performs most of the REST API calls and database access.
Could anyone please tell me what's the reason for the above problem also is there any way to improve constructDevice(vertx: Vertx)
function to handle multiple REST API calls and MongoDB access effectively.
// worker verticle to handle multiple REST API calls and MongoDB database access
class DeviceDiscoverVerticle : CoroutineVerticle() {
override suspend fun start() {
val consumer = vertx.eventBus().localConsumer<String>("listDevice")
consumer.handler { message ->
CoroutineScope(vertx.dispatcher()).launch {
constructDevice(vertx)
}
message.reply("discovered")
}
}
}
// standard verticle to communicate with worker verticle
class ListDeviceVerticle : CoroutineVerticle() {
override suspend fun start() {
val reply = awaitResult<Message<String>> { h ->
vertx.eventBus().request("listDevice", "deviceIPs", h)
}
println("Reply received: ${reply.body()}")
}
}
fun main() {
val vertx = Vertx.vertx()
val workOption = DeploymentOptions().setWorker(true)
vertx.deployVerticle(DeviceDiscoverVerticle(), workOption)
vertx.deployVerticle(ListDeviceVerticle())
}
suspend fun constructDevice(vertx: Vertx) {
val deviceRepository = listOf(
"10.22.0.106",
"10.22.0.120",
"10.22.0.115",
"10.22.0.112"
)
val webClient = WebClient.create(vertx)
val config = json { obj("db_name" to "mnSet", "connection_string" to "mongodb://localhost:27017") }
val mongoClient: MongoClient = MongoClient.create(vertx, config)
val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true))
// loop through the IP list and calls REST endpoints
val deviceList = deviceRepository.map { deviceIP ->
val deviceIPconfig: DeviceIPconfig
val deviceType: DeviceType
val requestDeviceIP: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/ipconfig/")
val requestDeviceType: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/information/")
val responseDeviceIP = awaitResult<HttpResponse<Buffer>> { handler ->
requestDeviceIP.send(handler)
}
deviceIPconfig = if (responseDeviceIP.statusCode() == 200) {
json.parse(DeviceIPconfig.serializer(), responseDeviceIP.bodyAsString())
} else {
println("request to device $deviceIP failed with ${responseDeviceIP.statusCode()}")
DeviceIPconfig()
}
val responseDeviceType = awaitResult<HttpResponse<Buffer>> { handler ->
requestDeviceType.send(handler)
}
if (responseDeviceType.statusCode() == 200) {
deviceType = json.parse(DeviceType.serializer(), responseDeviceType.bodyAsString())
val device = DeviceModel(deviceIPconfig, deviceType)
json {
obj(
"_id" to deviceIPconfig.localMac,
"device" to json.stringify(DeviceModel.serializer(), device)
)
}
} else {
println("request to device $deviceIP failed with ${responseDeviceType.statusCode()}")
jsonObjectOf()
}
}.filterNot { it.isEmpty }
// construct data to upload in mongoDB
val activeDeviceIDs = json {
obj("_id" to "activeDeviceIDs",
"activeDeviceIDs" to deviceList.map { it.get<String>("_id") })
}
val activeDevices = json {
obj("_id" to "activeDevices",
"activeDevices" to json { array(deviceList) }
)
}
// save the data in MongoDB
mongoClient.save("devices", activeDeviceIDs) { res ->
if (res.succeeded()) {
println("saved successfully")
} else {
res.cause().printStackTrace()
}
}
mongoClient.save("devices", activeDevices) { res ->
if (res.succeeded()) {
println("saved successfully")
} else {
res.cause().printStackTrace()
}
}
}
Updated question: 1
@Damian I have updated my question based on your input. I have simplified my question above for easy to understand, but when I tried to implement the things using promise/future I'm getting stuck at some point.
My task is to get data from different REST endpoints and contract kotlin class from it and I wants to it in parallel.
fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDevices.send { asyncResult ->
if (asyncResult.succeeded())
deviceDevicePromise.complete(asyncResult.result())
else
deviceDevicePromise.fail("Http request failed");
}
return deviceDevicePromise.future()
}
fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<List<Future<HttpResponse<Buffer>>>> {
val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
val deviceDevicesPromise: Promise<List<Future<HttpResponse<Buffer>>>> = Promise.promise()
requestDeviceDevices.send { asyncResult ->
if (asyncResult.succeeded()) {
// this will return Json array and each element of that array needs to be called again in a loop.
val result = asyncResult.result().bodyAsJsonArray().map { device ->
constructDeviceDevice(deviceIP, device.toString(), webClient)
}
deviceDevicesPromise.complete(result)
} else
deviceDevicesPromise.fail("Http request failed")
}
return deviceDevicesPromise.future()
}
fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): List<Future<HttpResponse<Buffer>>> {
val deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>> = constructDeviceDevices(deviceIP, webClient)
// I need to call other rest points similar to this and I need map the result to kotlin class.
// how do get HTTP response out of each future request in deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>>.
}
class DeviceDiscoverVerticle : AbstractVerticle() {
override fun start() {
val deviceRepository = // list of IP strings
val webClient = WebClient.create(vertx)
vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
deviceRepository.forEach { deviceIP ->
val futureList = constructDevice(vertx, webClient, deviceIP)
CompositeFuture.all(futureList).onComplete { allFuturesResult ->
if (allFuturesResult.succeeded()) {
// how to handle individual future result here to construct data
} else {
println("failed")
}
}
}
}
}
Updated question: 2
@Damian as you suggested I have updated my code.
fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/$device")
val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDevices.send { asyncResult ->
if (asyncResult.succeeded())
deviceDevicePromise.complete(asyncResult.result())
else
deviceDevicePromise.fail("Http request failed")
}
return deviceDevicePromise.future()
}
fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/")
val deviceDevicesPromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDeviceDevices.send { asyncResult ->
if (asyncResult.succeeded()) {
deviceDevicesPromise.complete(asyncResult.result())
}
else
deviceDevicesPromise.fail("Http request failed")
}
return deviceDevicesPromise.future()
}
fun constructDevice(webClient: WebClient, deviceIP: String): Future<DeviceFlow> {
val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true, isLenient = true))
val constructDevicePromise: Promise<DeviceFlow> = Promise.promise()
val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)
httpDevicesFuture.onComplete { ar ->
if(ar.succeeded()) {
val futureList = ar.result().bodyAsJsonArray().map { device ->
constructDeviceDevice(deviceIP, device.toString(), webClient)
}
CompositeFuture.all(futureList).onComplete { asyncResult ->
if (asyncResult.succeeded()) {
asyncResult.result().list<HttpResponse<Buffer>>().forEach { res ->
//not all future in futureList are completed here some of them shows Future{unresolved}
}
constructDevicePromise.complete(DeviceFlow(label = "xyz"))
}
else {
constructDevicePromise.fail("failed")
}
}
}
}
return constructDevicePromise.future()
}
class DeviceDiscoverVerticle : AbstractVerticle() {
override fun start() {
val deviceRepository = //list of IPs
val webClient = WebClient.create(vertx)
vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
deviceRepository.forEach { deviceIP ->
val constructDeviceFuture = constructDevice(webClient, deviceIP)
constructDeviceFuture.onComplete {ar ->
//println(ar.result().toString())
}
}
}
}
}
My problem is inside
CompositeFuture.all(futureList).onComplete { asyncResult ->
if (asyncResult.succeeded()) {
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
here most of the futures are unresolved and the execution gets hanged here.
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@67d2e79}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@8bad0c6}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@c854509}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
so I have changed CompositeFuture.all(futureList).onComplete
to CompositeFuture.join(futureList).onComplete
as per vert.x docs join will wait for all future to complete
The join composition waits until all futures are completed, either with a success or a failure. CompositeFuture.join takes several futures arguments (up to 6) and returns a future that is succeeded when all the futures are succeeded and failed when all the futures are completed and at least one of them is failed
but now few futures are getting failed. Here is the output of future list after changing to CompositeFuture.join
CompositeFuture.join(futureList).onComplete { asyncResult ->
println(futureList)
if (asyncResult.succeeded()) { res ->
// println(res) this one gets hanged and not printing all response
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5e9d3832}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@379c326a}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@51a39962}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@edcd528}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@293c3e5c}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5f86d3ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@12a329f7}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@7abedb1e}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@3238d4cb}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5bc868d3}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@50af1ecc}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5cc549ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@282f4033}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@41a890b3}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@147d772a}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
So few futures are getting failed because of my device can't handle the concurrent request? also why the program execution stuck inside
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
Incase if It's problem with device concurrency request handling then what would be the other solution for this problem. Is it possible to run this whole rest calls out of vertx environment and communicate with it through event bus?
Also if I deployed DeviceDiscoverVerticle
as standard verticle instead of worker verticle the application gets stuck completely at CompositeFuture.all(futureList).onComplete
.