0
votes

I'm very new to vert.x platform. I have a standard and a worker verticle in my project which communicates through the eventBus. The worker verticle performs multiple REST API calls in loop and database access.

My problem is the worker verticle complete the task without issue at some run but sometimes it throws below error.

Exception in thread "vert.x-worker-thread-12" io.vertx.core.VertxException: Connection was closed

I'm using kotlin coroutine to handle constructDevice(vertx: Vertx) function which performs most of the REST API calls and database access.

Could anyone please tell me what's the reason for the above problem also is there any way to improve constructDevice(vertx: Vertx) function to handle multiple REST API calls and MongoDB access effectively.

    // worker verticle to handle multiple REST API calls and MongoDB database access
    
    class DeviceDiscoverVerticle : CoroutineVerticle() {
        override suspend fun start() {
            val consumer = vertx.eventBus().localConsumer<String>("listDevice")
            consumer.handler { message ->
                CoroutineScope(vertx.dispatcher()).launch {
                    constructDevice(vertx)
                }
                message.reply("discovered")
            }
        }
    }
    
    // standard verticle to communicate with worker verticle 
    
    class ListDeviceVerticle : CoroutineVerticle() {
        override suspend fun start() {
            val reply = awaitResult<Message<String>> { h ->
                vertx.eventBus().request("listDevice", "deviceIPs", h)
            }
            println("Reply received: ${reply.body()}")
        }
    }
    
    fun main() {
        val vertx = Vertx.vertx()
        val workOption = DeploymentOptions().setWorker(true)
        vertx.deployVerticle(DeviceDiscoverVerticle(), workOption)
        vertx.deployVerticle(ListDeviceVerticle())
    }


    suspend fun constructDevice(vertx: Vertx) {
        val deviceRepository = listOf(
            "10.22.0.106",
            "10.22.0.120",
            "10.22.0.115",
            "10.22.0.112"
        )
    
        val webClient = WebClient.create(vertx)
        val config = json { obj("db_name" to "mnSet", "connection_string" to "mongodb://localhost:27017") }
        val mongoClient: MongoClient = MongoClient.create(vertx, config)
        val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true))
        
        // loop through the IP list and calls REST endpoints
        
        val deviceList = deviceRepository.map { deviceIP ->
            val deviceIPconfig: DeviceIPconfig
            val deviceType: DeviceType
            val requestDeviceIP: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/ipconfig/")
            val requestDeviceType: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/information/")
    
            val responseDeviceIP = awaitResult<HttpResponse<Buffer>> { handler ->
                requestDeviceIP.send(handler)
            }
            deviceIPconfig = if (responseDeviceIP.statusCode() == 200) {
                json.parse(DeviceIPconfig.serializer(), responseDeviceIP.bodyAsString())
            } else {
                println("request to device $deviceIP failed with ${responseDeviceIP.statusCode()}")
                DeviceIPconfig()
            }
            
            val responseDeviceType = awaitResult<HttpResponse<Buffer>> { handler ->
                requestDeviceType.send(handler)
            }
            if (responseDeviceType.statusCode() == 200) {
                deviceType = json.parse(DeviceType.serializer(), responseDeviceType.bodyAsString())
                val device = DeviceModel(deviceIPconfig, deviceType)
                json {
                    obj(
                        "_id" to deviceIPconfig.localMac,
                        "device" to json.stringify(DeviceModel.serializer(), device)
                    )
                }
            } else {
                println("request to device $deviceIP failed with ${responseDeviceType.statusCode()}")
                jsonObjectOf()
            }
    
        }.filterNot { it.isEmpty }
        
        // construct data to upload in mongoDB
        
        val activeDeviceIDs = json {
            obj("_id" to "activeDeviceIDs",
                "activeDeviceIDs" to deviceList.map { it.get<String>("_id") })
        }
        val activeDevices = json {
            obj("_id" to "activeDevices",
                "activeDevices" to json { array(deviceList) }
            )
        }
        
        // save the data in MongoDB
        
        mongoClient.save("devices", activeDeviceIDs) { res ->
            if (res.succeeded()) {
                println("saved successfully")
            } else {
                res.cause().printStackTrace()
            }
        }
        mongoClient.save("devices", activeDevices) { res ->
            if (res.succeeded()) {
                println("saved successfully")
            } else {
                res.cause().printStackTrace()
            }
        }
    }

Updated question: 1

@Damian I have updated my question based on your input. I have simplified my question above for easy to understand, but when I tried to implement the things using promise/future I'm getting stuck at some point.

My task is to get data from different REST endpoints and contract kotlin class from it and I wants to it in parallel.

fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed");
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<List<Future<HttpResponse<Buffer>>>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
    val deviceDevicesPromise: Promise<List<Future<HttpResponse<Buffer>>>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            // this will return Json array and each element of that array needs to be called again in a loop.
            val result = asyncResult.result().bodyAsJsonArray().map { device ->
                constructDeviceDevice(deviceIP, device.toString(), webClient)
            }
            deviceDevicesPromise.complete(result)
        } else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}

fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): List<Future<HttpResponse<Buffer>>> {

    val deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>> = constructDeviceDevices(deviceIP, webClient)
    // I need to call other rest points similar to this and I need map the result to kotlin class.

   // how do get HTTP response out of each future request in deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>>. 

}

class DeviceDiscoverVerticle : AbstractVerticle() {
        override fun start() {
            val deviceRepository = // list of IP strings
    
            val webClient = WebClient.create(vertx)
            vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
                deviceRepository.forEach { deviceIP ->
                    val futureList = constructDevice(vertx, webClient, deviceIP)
                    CompositeFuture.all(futureList).onComplete { allFuturesResult ->
                            if (allFuturesResult.succeeded()) {
                                // how to handle individual future result here to construct data
                            } else {
                                println("failed")
                            }
                        }
                }
            }
        }

Updated question: 2

@Damian as you suggested I have updated my code.

fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed")
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/")
    val deviceDevicesPromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            deviceDevicesPromise.complete(asyncResult.result())
        }
        else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}


fun constructDevice(webClient: WebClient, deviceIP: String): Future<DeviceFlow> {
    val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true, isLenient = true))
    val constructDevicePromise: Promise<DeviceFlow> = Promise.promise()
    val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)

    httpDevicesFuture.onComplete { ar ->
        if(ar.succeeded()) {
            val futureList = ar.result().bodyAsJsonArray().map { device ->
                constructDeviceDevice(deviceIP, device.toString(), webClient)
            }
            CompositeFuture.all(futureList).onComplete { asyncResult ->
                if (asyncResult.succeeded()) {
                    asyncResult.result().list<HttpResponse<Buffer>>().forEach { res ->
                        //not all future in futureList are completed here some of them shows Future{unresolved}
                    }
                    constructDevicePromise.complete(DeviceFlow(label = "xyz"))
                }
                else {
                    constructDevicePromise.fail("failed")
                }
            }

        }
    }
    return constructDevicePromise.future()
}


class DeviceDiscoverVerticle : AbstractVerticle() {
    override fun start() {
        val deviceRepository = //list of IPs

        val webClient = WebClient.create(vertx)
        vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
            deviceRepository.forEach { deviceIP ->
                val constructDeviceFuture = constructDevice(webClient, deviceIP)
                constructDeviceFuture.onComplete {ar ->
                    //println(ar.result().toString())
                }
            }
        }
    }
}

My problem is inside

CompositeFuture.all(futureList).onComplete { asyncResult ->
                        if (asyncResult.succeeded()) {
                            asyncResult.result().list<HttpResponse<Buffer>>().forEach {

here most of the futures are unresolved and the execution gets hanged here.

[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@67d2e79}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@8bad0c6}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@c854509}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]

so I have changed CompositeFuture.all(futureList).onComplete to CompositeFuture.join(futureList).onComplete as per vert.x docs join will wait for all future to complete

The join composition waits until all futures are completed, either with a success or a failure. CompositeFuture.join takes several futures arguments (up to 6) and returns a future that is succeeded when all the futures are succeeded and failed when all the futures are completed and at least one of them is failed

but now few futures are getting failed. Here is the output of future list after changing to CompositeFuture.join

CompositeFuture.join(futureList).onComplete { asyncResult ->
println(futureList)
                            if (asyncResult.succeeded()) { res ->
// println(res) this one gets hanged and not printing all response
                                asyncResult.result().list<HttpResponse<Buffer>>().forEach {



[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5e9d3832}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@379c326a}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@51a39962}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@edcd528}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@293c3e5c}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5f86d3ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@12a329f7}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@7abedb1e}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@3238d4cb}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5bc868d3}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@50af1ecc}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5cc549ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@282f4033}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@41a890b3}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@147d772a}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]

So few futures are getting failed because of my device can't handle the concurrent request? also why the program execution stuck inside

asyncResult.result().list<HttpResponse<Buffer>>().forEach { 

Incase if It's problem with device concurrency request handling then what would be the other solution for this problem. Is it possible to run this whole rest calls out of vertx environment and communicate with it through event bus?

Also if I deployed DeviceDiscoverVerticle as standard verticle instead of worker verticle the application gets stuck completely at CompositeFuture.all(futureList).onComplete.

2
why would you deploy them as worker=true? It is intended for blocking code, vert.x shines in async operations (mongo client and http calls via vertx's WebClient are examples of async operations), read more about event loop concept and reactivityDidar Burmaganov
if I didn't deploy it as a worker then it always throws event loop thread errorVencat

2 Answers

1
votes

I'm not familiar with kotlin and coroutines, but I might have some suggestions regarding vert.x itself. First of all as per documentation

In most cases, a Web Client should be created once on application startup and then reused. Otherwise you lose a lot of benefits such as connection pooling and may leak resources if instances are not closed properly.

I see that you call Webclient.create(vertx) inside constructDevice method, so you create new WebClient each time you send 'listDevice' event so you can consider changing it.

I had pretty similar thing to do recently and ended up using Futures. Be aware that when you call awaitResult you are blocking the thread to wait for the async execution, if that would be standard Verticle you would indeed get a spam of blocked thread warnings. What you can do instead is create a promise, complete/fail it inside you http handler, outside of handler you just return promise.future() object. Outside of the loop you can handle all the futures, the difference is that futures handling will also be asynchronous so you won't block the thread.

Also, for the code to be a bit cleaner and to utilize vert.x asynchronous nature, it would be good to split http and mongo handling into separate verticles i. e.

  1. HttpVerticle gets listDevice event
  2. HttpVerticle creates 5 futures for 5 different requests
  3. When all futures completes future.onComplete()/compositeFuture.all() is triggered and it sends 'updateDB' event
  4. MongoVerticle receives and handles 'updateDB' event

Your specific issue is probably not addressed here, but I hope that it will lead you at least one step further

Following the comment here's an example for futures in java

public class HttpVerticle extends AbstractVerticle {

WebClient webClient;

@Override
public void start() throws Exception {

    webClient = WebClient.create(vertx);

    vertx.eventBus().consumer("run_multiple_requests", event -> {
        //When event is received this block is handled by some thread from worker pool, let's call it 'main thread'
        Promise<HttpResponse<Buffer>> request1Promise = Promise.promise();
        Promise<HttpResponse<Buffer>> request2Promise = Promise.promise();
        Promise<HttpResponse<Buffer>> request3Promise = Promise.promise();

        //Since webclient is async, all calls will be asynchronous
        webClient.get("ip1", "/endpoint")
                .send(asyncResult -> {
                    //async block #1 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request1Promise.complete(asyncResult.result());
                    } else {
                        request1Promise.fail("Http request failed");
                    }
                });

        //at this point async block #1 is probably still processing
        webClient.get("ip2", "/endpoint")
                .send(asyncResult -> {
                    //async block #2 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request2Promise.complete(asyncResult.result());
                    } else {
                        request2Promise.fail("Http request failed");
                    }
                });

        //at this point async block #1 and #2 are probably still processing
        webClient.get("ip3", "/endpoint")
                .send(asyncResult -> {
                    //async block #3 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request3Promise.complete(asyncResult.result());
                    } else {
                        request3Promise.fail("Http request failed");
                    }
                });

        //retrieving futures from promises
        Future<HttpResponse<Buffer>> future1 = request1Promise.future();
        Future<HttpResponse<Buffer>> future2 = request2Promise.future();
        Future<HttpResponse<Buffer>> future3 = request3Promise.future();

       
        CompositeFuture.all(future1, future2, future3).onComplete(allFuturesResult -> {
            //async block #4 this will be executed only when all futures complete, but since it's async it does
            // not block our 'main thread'
            if (allFuturesResult.succeeded()) {
                //all requests succeeded
                vertx.eventBus().send("update_mongo", someMessage);
            } else {
                //some of the requests failed, handle it here
            }
        });
        
        //at this point async block #1 #2 #3 are probably still processing and #4 is waiting for callback
        //but we leave our event handler and free 'main thread' without waiting for anything
    });
}

Of course this code can (and should) be much shorter, all of this is hardcoded without any arrays and loops just for clarity

If you use logback or log4j (other probably as well) you can put [%t] in log pattern, it will show you thread name in log message, for me personally it's really helpful to understand the flow of all these async blocks

One more thing, with this setup, all three requests will be send practically at the same time, so make sure http server is capable of handling multiple requests at once.

1
votes

Knowing a bit more what you are trying to achieve, first of all in method constructDeviceDevices() I would change the return type to just Future<HttpResponse<Buffer>> and if it succeeds just call deviceDevicesPromise.complete(asyncResult.result())

Then, in constructDevice() method I would call our modified constructDeviceDevices() method and get a future object from it, let's call it Future<HttpResponse<Buffer>> httpDevicesFuture. Next step would be to call httpDevicesFuture.onComplete(ar -> {<handler code>}) in this handler you have access to ar.result() which is response from ".../devices/" endpoint so now in the same block I will loop through that response and get List<Future<HttpResponse<Buffer>>>. Still staying in the same block I would write CompositeFuture.all(futuresList).onComplete(ar -> handler) this ar will be of type CompositeFuture it has a method list() which actually returns list of finished futures (and in this handler they are all finished) so now using that list you can retrieve HttpResponse<Buffer> for each future, and each of that will be your ".../devices/$device" response and you can map them to whatever objects you want. Now in the same handler I would decide where do I want to go next, and I would probably do it by sending a message on the eventBus like eventBus.send("HTTP_PROCESSING_DONE", serializedDevices) or in case something goes wrong eventBus.send("HTTP_FAILURE", someMessage). But in your case if you want to perform all that stuff for each IP in some list, and not force it to be synchronous then still in the same block, you can do any objects mapping and call constructDeviceFuture.complete(mappedObject/List<MappedObject>) that means that you have to create one more future, which you will return from constructDevice() method

Basically you are stuck because you try to reproduce sequential execution in async world, specifically at the moment when you try to return a value from constructDevice() method, that would mean that we actually want to wait for all that execution to finish at the time this line of code is processed and in vert.x that's not the case.

it would look something like that (syntax is probably off so treat it as pseudo code)

    fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed");
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
    val deviceDevicesPromise: Future<HttpResponse<Buffer>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            deviceDevicesPromise.complete(asyncResult.result())
        } else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}

fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): Future<SomeDomainObject> {

    //Type of below promise depends on what you are mapping responses to. It may also be a list of mapped objects
    val constructDevicePromise: Promise<SomeDomainObject> = Promise.promise()
    val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)

    httpDevicesFuture.onComplete { ar ->
        if (ar.succeeded()) {
            val futureList: List<Future<HttpResponse<Buffer>>>
            //loop through ar.result() and populate deviceDevicesFuture list

            CompositeFuture.all(futureList).onComplete { allFuturesResult ->
                if (allFuturesResult.succeeded()) {
                    // here you have access to allFuturesResult.list() method
                    // at this point you know all futures have finished, you can retrieve result from them (you may need to cast them from Object)
                    // when you have List<HttpResponse> you map it to whatever you want
                    val myMappedObject: SomeDomainObject = mappingResult()
                    constructDevicePromise.complete(myMappedObject)
                } else {
                    constructDevicePromise.fail("failed")
                }
            }
        }
    }

    return constructDevicePromise.future()
}

class DeviceDiscoverVerticle : AbstractVerticle() {
    override fun start() {
        val deviceRepository = // list of IP strings

        val webClient = WebClient.create(vertx)
        vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
            deviceRepository.forEach { deviceIP ->
                //here dependent on your logic, you handle each future alone or create a list and handle them together
                val constructDeviceFuture: Future<SomeDomainObject> = constructDevice(vertx, webClient, deviceIP)
                constructDeviceFuture.onComplete(ar -> {
                    ar.result() // <- this is your mapped object
                    eventBus.send("SOME_NEXT_LOGIC", serializedDomainObject)
                })
            }
            
            //if you need to handle all devices at once, once again you need to make CompositeFuture from all responses of constructDevice
        }
    }
}

Update 2 response

About CompositeFuture.all(): You're missing one thing, CompositeFuture.all() waits until all futures succeeds OR at least one failed. If even one has failed, it's not waiting for others (it's like logical AND, no need to wait for the rest cause we know the result already). CompositeFuture.join() on the other hand just waits for all futures to complete, but still if anyone of them fails, the resulting future will also be failed (but you should at least get result for all of them).

That's actually what you see in your output, with CompositeFuture.all() you get bunch of completed Futures, single one failed, and the rest of them unresolved.

One more thing that's missing in this part:

vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
        deviceRepository.forEach { deviceIP ->
            val constructDeviceFuture = constructDevice(webClient, deviceIP)
            constructDeviceFuture.onComplete {ar ->
                //println(ar.result().toString())
            }
        }
    }

you are not checking if ar.succeeded(), if you would you will see that the final future is actually failed, that's why the final result is not what you expected.

Now just purely speculating what's happening. You probably kill (to some extent) this rest API (I assume it's the same API per vertx event) with so much concurrent requests, if you put some log message with milliseconds precision inside single request handler, you should probably see that the requests are few milliseconds apart from each other. I suppose that API is able to serve few of requests, then the next one is failing due to some exception/block/timeout or whatever, and all others are probably not getting response at all, or are waiting until they hit some timeout. If you define Verticle as standard, you will get a warning when anything lasts more than two seconds, what's more, there is a single thread handling all of that stuff so if one requests hangs for a long time, standard verticle will be completely unresponsive for that time. That's probably the reason you are stuck in CompositeFuture.join() method.

So now you can do few things:

  1. You can change concurrent execution to sequential execution. Basically, instead of creating n futures beforehand, you create a future for single element, then call future.compose(ar -> {}) this handler will be called ONLY WHEN the future completes. Then in the same handler you create and return future for next element. It's a bit tricky to implement imo, but doable (I've done it using java stream reduce to reduce x future into single one). When you implement it that way, you will have one request at a time, so there should be no problems with API. NOTE that different IPs will be still handled concurrently, but requests per IP will be sequential so it might work just fine.

  2. You can create another standard verticle, which will respond only to a single event, that event will be calling "/devices/$device" endpoint. Now in the code you have right now, when you loop through the initial http response, instead of spawning 20 more HTTP requests, you just send 20 events to the eventBus. When you have only one instance of the verticle handling that particular message, and it's a standard verticle with only one thread, effectively there should be only one message handled at the moment and should just queue. This would also be super easy to adjust, cause you can just bump up number of verticle instances and you will have as many concurrent requests as the number of verticle instances.

  3. You mentioned handling it outside of vertx completely, I think it's not necessary at all, but if you decide it's best for you, then it's pretty straightforward. If you already have Vertx object from somwhere, there is no problem to pass that object to the constructor of some other class. There, you can have your own http client, your own methods, basically anything you want, and at some point when you decide you want to utilize vert.x you can just call vertx.eventBus().send() and trigger some logic that will be handled by vert.x. The most important thing to remember is just not to create more than one instance of Vertx object since they will have separate event buses. Actually as documentation states

Verticles ... This model is entirely optional and Vert.x does not force you to create your applications in this way if you don’t want to.

So you can have you regular application written in whatever framework and still at some point just instantiate Vertx object, perform single task, and get back to your base framework, but honestly I think you very close to solving this :)