1
votes

I am using scala Iterator for waiting loop in synchronized block:

anObject.synchronized {
    if (Try(anObject.foo()).isFailure) {
        Iterator.continually { 
            anObject.wait()
            Try(anObject.foo()) 
        }.dropWhile(_.isFailure).next()
    }
    anObject.notifyAll()
}

Is it acceptable to use Iterator with concurrency and multithreading? If not, why? And then what to use and how?


There are some details, if it matters. anObject is a mutable queue. And there are multiple producers and consumers to the queue. So the block above is a code of such producer or consumer. anObject.foo is a common simplified declaration of function that either enqueue (for producer) or dequeue (for consumer) data to/from the queue.

1

1 Answers

2
votes

Iterator is mutable internally, so you have to take that into consideration if you use it in multi-threaded environment. If you guaranteed that you won't end up in situation when e.g.

  • 2 threads check hasNext()
  • one of them calls next() - it happens to be the last element
  • the other calls next() - NPE

(or similar) then you should be ok. In your example Iterator doesn't even leave the scope, so the errors shouldn't come from Iterator.

However, in your code I see the issue with having aObject.wait() and aObject.notifyAll() next to each other - if you call .wait then you won't reach .notifyAll which would unblock it. You can check in REPL that this hangs:

@ val anObject = new Object { def foo() = throw new Exception }
anObject: {def foo(): Nothing} = ammonite.$sess.cmd21$$anon$1@126ae0ca

@ anObject.synchronized {
      if (Try(anObject.foo()).isFailure) {
          Iterator.continually {
              anObject.wait()
              Try(anObject.foo())
          }.dropWhile(_.isFailure).next()
      }
      anObject.notifyAll()
  }
// wait indefinitelly

I would suggest changing the design to NOT rely on wait and notifyAll. However, from your code it is hard to say what you want to achieve so I cannot tell if this is more like Promise-Future case, monix.Observable, monix.Task or something else.


If your use case is a queue, produces and consumers, then it sound like a use case for reactive streams - e.g. FS2 + Monix, but it could be FS2+IO or something from Akka Streams

val queue: Queue[Task, Item] // depending on use case queue might need to be bounded

// in one part of the application
queue.enqueu1(item) // Task[Unit]

// in other part of the application
queue
  .dequeue
  .evalMap { item =>
    // ...
    result: Task[Result]
  }
  .compile
  .drain

This approach would require some change in thinking about designing an application, because you would no longer work on thread directly, but rather designed a flow data and declaring what is sequential and what can be done in parallel, where threads become just an implementation detail.