0
votes

My project needs a lot of asynchronous programming so I choose the AKKA platform as with the actor model one can implement asynchronous system just like writing synchronous code without worrying about thread issues. Everything works alright till I meet the following issue(demo code):

import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import java.util.concurrent.locks.ReentrantLock;

public class TestActor extends AbstractActor {
    private final ReentrantLock lock  = new ReentrantLock();

    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                .matchEquals("lock", s -> lock.lock())
                .matchEquals("unlock", s -> lock.unlock())
                .build();
    }
}

first a "lock" message is send, then a "unlock" message is send, in trying to unlock upon receiving the send message, a IllegalMonitorStateException is thrown, I found out that this is due different messages are actually handled by different threads, s -> lock.lock() and s -> lock.unlock() are executed in different threads so IllegalMonitorStateException is thrown.

My previous assumption is that all actions of an actor is executed in one thread so it is totally thread safe, one dose not have to worry about thread issues. As I use AKKA extensively in my project, now I'm quite concerned and unclear as to when dose one needs to consider thread issue in using AKKA. For example, in following demo code:

public class TestActor1 extends AbstractActor {
    private int count  = 0;
    private Map<Integer, Integer> map = new HashMap<>();
    
    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                .matchEquals("action1", s -> count++)
                .matchEquals("action2", s -> getSender().tell(count, getSelf()))
                .matchEquals("action3", s -> map.put(3, 2))
                .matchEquals("action4", s -> getSender().tell(map.get(3), getSelf()))
                .build();
    }
}

is the way count and map are used thread safe? Do I need to use volatile for count and use ConcurrentHashMap for map?

ps ======================

The following demo code demonstrates why I need lock in actor, basically I'm implementing a pipeline with back pressure control, once an actor receives too much tasks from upstream actor, it sends an backPressureHi message to the upstream actor to stall the upstream actor execution loop till the back pressure returns to normal and sends a backPressureNormal to resume:

public class PipeLineActor extends AbstractActor {
    private final ReentrantLock stallLock = new ReentrantLock();

    private Thread executionLoop = new Thread(() -> {
        while (true){
            stallLock.lock();
            stallLock.unlock();
            
            // issue tasks to down stream actors
        }
    });

    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                // down stream actor send "backPressureHi" when back pressure is high to stall the executionLoop
                .matchEquals("backPressureHi", s -> stallLock.lock())
                // down stream actor send "backPressureNormal" when back pressure resumed normal to resume the executionLoop
                .matchEquals("backPressureNormal", s -> stallLock.unlock())
                .build();
    }
}
1
You probably mean to have action3 and action4 instead of action2Ivan Stanislavciuc
@IvanStanislavciuc Yes, these are typos, thanks for point out. I also edited the question to add some more information to show why I need Locks within ActorYulin
This seems to be a XY problem. Regarding your edit, imo, you should not do it. Akka is designed to be lock-free. If you want to utilise back pressure, consider using akka-stream instead. It can integrate with an actor so your processing can still happen within an actor. But you should create a new question without modifying this one to avoid big changes to this question.Ivan Stanislavciuc

1 Answers

2
votes

Akka is designed to be thread safe. And there is never a need for a locking or synchronisation within an actor. It should not be done.

Akka achieves thread safety by processing a single message at a time. An actor cannot process multiple messages simultaneously. But messages may and will be processed within different threads. (this is the default behaviour but can be changed with a pin dispatcher for example).

From documentation

No concurrency guards such as synchronized or AtomicInteger are needed since an actor instance processes one message at a time.

To your final questions,

is the way count and map are used thread safe?

Yes, it is thread safe.

Do I need to use volatile for count and use ConcurrentHashMap for map?

No there is no need to do it. See Akka and the Java Memory Model

In layman’s terms this means that changes to internal fields of the actor are visible when the next message is processed by that actor. So fields in your actor need not be volatile or equivalent.