I have a following algorithm with scala:
- Do initial call to db to initialize cursor
- Get 1000 entities from db (Returns Future)
- For every entity process one additional request to database and get modified entity (returns future)
- Transform original entity
- Put transformed entity to Future call back from #3
- Wait for all Futures
In scala it some thing like:
val client = ...
val size = 1000
val init:Future = client.firstSearch(size) //request over network
val initResult = Await(init, 30.seconds)
var cursorId:String = initResult.getCursorId
while (!cursorId.isEmpty) {
val futures:Seq[Future] = client.grabWithSize(cursorId).map{response=>
response.getAllResults.map(result=>
val grabbedOne:Future[Entity] = client.grabOneEntity(result.id) //request over network
val resultMap:Map[String,Any] = buildMap(result)
val transformed:Map[String,Any] = transform(resultMap) //no future here
grabbedOne.map{grabbedOne=>
buildMap(grabbedOne) == transformed
}
}
Futures.sequence(futures).map(_=> response.getNewCursorId)
}
}
def buildMap(...):Map[String,Any] //sync call
I noticed that if I increase size say two times, every iteration in while
started working slowly ~1.5. But I do not see that my PC processor loaded more. It loaded near zero, but time increases in ~1.5. Why? I have setuped:
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1024))
I think, that not all Futures executed in parallel. But why? And ho to fix?
response.getAllResults.map
? It does not traverse results. It just "put"map
function, but actual (!) translating comes intoFutures.sequence
, because it iterates overSeq[Future]
which call map function undersresponse.getAllResults
. So eachFuture
starts one by one, not all at same time. Can it be a source of problem? – Cherry