My project needs a lot of asynchronous programming so I choose the AKKA platform as with the actor model one can implement asynchronous system just like writing synchronous code without worrying about thread issues. Everything works alright till I meet the following issue(demo code):
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import java.util.concurrent.locks.ReentrantLock;
public class TestActor extends AbstractActor {
private final ReentrantLock lock = new ReentrantLock();
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.matchEquals("lock", s -> lock.lock())
.matchEquals("unlock", s -> lock.unlock())
.build();
}
}
first a "lock" message is send, then a "unlock" message is send, in trying to unlock upon receiving the send message, a IllegalMonitorStateException
is thrown, I found out that this is due different messages are actually handled by different threads, s -> lock.lock()
and s -> lock.unlock()
are executed in different threads so IllegalMonitorStateException
is thrown.
My previous assumption is that all actions of an actor is executed in one thread so it is totally thread safe, one dose not have to worry about thread issues. As I use AKKA extensively in my project, now I'm quite concerned and unclear as to when dose one needs to consider thread issue in using AKKA. For example, in following demo code:
public class TestActor1 extends AbstractActor {
private int count = 0;
private Map<Integer, Integer> map = new HashMap<>();
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.matchEquals("action1", s -> count++)
.matchEquals("action2", s -> getSender().tell(count, getSelf()))
.matchEquals("action3", s -> map.put(3, 2))
.matchEquals("action4", s -> getSender().tell(map.get(3), getSelf()))
.build();
}
}
is the way count
and map
are used thread safe? Do I need to use volatile
for count
and use ConcurrentHashMap
for map
?
ps ======================
The following demo code demonstrates why I need lock in actor, basically I'm implementing a pipeline with back pressure control, once an actor receives too much tasks from upstream actor, it sends an backPressureHi
message to the upstream actor to stall the upstream actor execution loop till the back pressure returns to normal and sends a backPressureNormal
to resume:
public class PipeLineActor extends AbstractActor {
private final ReentrantLock stallLock = new ReentrantLock();
private Thread executionLoop = new Thread(() -> {
while (true){
stallLock.lock();
stallLock.unlock();
// issue tasks to down stream actors
}
});
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
// down stream actor send "backPressureHi" when back pressure is high to stall the executionLoop
.matchEquals("backPressureHi", s -> stallLock.lock())
// down stream actor send "backPressureNormal" when back pressure resumed normal to resume the executionLoop
.matchEquals("backPressureNormal", s -> stallLock.unlock())
.build();
}
}
action3
andaction4
instead ofaction2
– Ivan Stanislavciuc