Iterator
is mutable internally, so you have to take that into consideration if you use it in multi-threaded environment. If you guaranteed that you won't end up in situation when e.g.
- 2 threads check
hasNext()
- one of them calls
next()
- it happens to be the last element
- the other calls
next()
- NPE
(or similar) then you should be ok. In your example Iterator
doesn't even leave the scope, so the errors shouldn't come from Iterator
.
However, in your code I see the issue with having aObject.wait()
and aObject.notifyAll()
next to each other - if you call .wait
then you won't reach .notifyAll
which would unblock it. You can check in REPL that this hangs:
@ val anObject = new Object { def foo() = throw new Exception }
anObject: {def foo(): Nothing} = ammonite.$sess.cmd21$$anon$1@126ae0ca
@ anObject.synchronized {
if (Try(anObject.foo()).isFailure) {
Iterator.continually {
anObject.wait()
Try(anObject.foo())
}.dropWhile(_.isFailure).next()
}
anObject.notifyAll()
}
// wait indefinitelly
I would suggest changing the design to NOT rely on wait
and notifyAll
. However, from your code it is hard to say what you want to achieve so I cannot tell if this is more like Promise
-Future
case, monix.Observable
, monix.Task
or something else.
If your use case is a queue, produces and consumers, then it sound like a use case for reactive streams - e.g. FS2 + Monix, but it could be FS2+IO or something from Akka Streams
val queue: Queue[Task, Item] // depending on use case queue might need to be bounded
// in one part of the application
queue.enqueu1(item) // Task[Unit]
// in other part of the application
queue
.dequeue
.evalMap { item =>
// ...
result: Task[Result]
}
.compile
.drain
This approach would require some change in thinking about designing an application, because you would no longer work on thread directly, but rather designed a flow data and declaring what is sequential and what can be done in parallel, where threads become just an implementation detail.