4
votes

I am just keen to know whether it's possible to replace the old Multi threading codes written with Java's Executor service to Akka. I have few doubts regarding this.

Is akka actor runs in their own thread? 

How Threads will be assigned for the Actors ?

What are the pros and cons of migration of it is possible?

Currently I use Fixed Thread pool for multi threading, and submit a callable.

Sample Code,

public class KafkaConsumerFactory {

    private static Map<String,KafkaConsumer> registry = new HashMap<>();

    private static ThreadLocal<KafkaConsumer> consumers = new ThreadLocal<KafkaConsumer>(){
        @Override
        protected KafkaConsumer initialValue() {
            return new KafkaConsumer(createConsumerConfig());
        }
    };

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                registry.forEach((tid,con) -> {
                    try{
                        con.close();
                    } finally {
                        System.out.println("Yes!! Consumer for " + tid + " is closed.");
                    }
                });
            }
        });
    }

    private static Properties createConsumerConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "newcon-grp5");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", KafkaKryoSerde.class.getName());
        return props;
    }


    public static <K,V> KafkaConsumer<K,V> createConsumer(){
        registry.put(Thread.currentThread().getName(),consumers.get());
        return consumers.get();
    }
}

/////////////////////////////////////////////////////////

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class KafkaNewConsumer {
    public static int MAX_THREADS = 10;
    private ExecutorService es = null;
    private boolean stopRequest = false;




    public static void main(String[] args){
        KafkaNewConsumer knc = new KafkaNewConsumer();
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run(){
                knc.es.shutdown();
                try {
                    knc.es.awaitTermination(500, TimeUnit.MILLISECONDS);
                } catch (InterruptedException ignored) {

                }finally {
                    System.out.println("Finished");
                }
            }
        });

        knc.consumeTopic("rtest3",knc::recordConsuemer);

    }

    public void recordConsuemer(ConsumerRecord<?,?> record){
        String result = new StringJoiner(": ")
                .add(Thread.currentThread().getName())
                .add("ts").add(String.valueOf(record.timestamp()))
                .add("offset").add(String.valueOf(record.offset()))
                .add("data").add(String.valueOf(record.value()))
                .add("value-len").add(String.valueOf(record.serializedValueSize()))
                .toString();
        System.out.println(result);
    }
    public void  consumeTopic(String topicName, Consumer<ConsumerRecord<?,?>> fun){
        KafkaConsumer con= KafkaConsumerFactory.createConsumer();
        int paritions = con.partitionsFor(topicName).size();
        int noOfThread = (MAX_THREADS < paritions) ? MAX_THREADS :paritions;
         es = Executors.newFixedThreadPool(noOfThread);
        con.close();
        for(int i=0;i<noOfThread;i++){
            es.submit(()->{
                KafkaConsumer consumer = KafkaConsumerFactory.createConsumer();
                try{
                    while (!stopRequest){
                        consumer.subscribe(Collections.singletonList(topicName));
                        ConsumerRecords<?,?> records = consumer.poll(5000);

                        records.forEach(fun);
                        consumer.commitSync();
                    }
                }catch(Exception e){
                    e.printStackTrace();
                } finally {
                    consumer.close();
                }
            });
        }
    }
}

I went through some of the internet tutorial, some of them directly concludes

actors was very good and faster than traditional threads.

But no explanation how it can become faster than threads ?

I tried some sample Akka(Akka sample from activator) code, and printed Thread.currentThread.getName inside all actors and found different dispatcher threads named(helloakka-akka.actor.default-dispatcher-X) are created.

But how ? who is creating those threads ? where is the configuration for them ? What is the mapping relations between a thread and an Actor ?

Every time I send a message will Akka create new Thread? Or internally a Thread pool is used?

If I need 100 threads to do parallel execution of parts of the some task, do I need to create 100 Actors and send 1 message to each of them ? Or I need to create 1 actor and put 100 message in it's queue , It will get forked into 100 threads.

Really Confused

1
may I know the reason for downvote ?RBanerjee
There's not enough information. What are "the codes"? Which threading model do you replace and why? How do you synchronize? Where do you push results? What is blocking and what isn't? Where's the source of truth before and after? Answering at least a couple of those might turn this into a real question, right now - I don't see what even can be answered there. Because if you ask "can I" - the only answer is "yes, you technically can".M. Prokhorov
A simple System out println would be enough for me, I am just confused about the mapping of threads in which Actors will run and how those threads are managed. I modified my question. Any pointers would be helpful.RBanerjee
@M.Prokhorov Sample code is also added.RBanerjee

1 Answers

5
votes

Migration to an actor system is not a small task for an executor based system but it can be done. It requires you to re-think the way you design the system and consider the impact of actors. For example in a threaded architecture you create some handler for a business process, toss it in a runnable and let it go off doing things on a thread. This is wholly inappropriate for an actor paradigm. You have to re-architect your system to deal with message passing and using the messaging to invoke tasks. Also you have to change the way you think about business processes from an imperative approach to a message based approach. Consider for the example the simple task of purchasing the product. I will assume you know how to do it in an executor. In an actor system you do this:

(Purchase Product) -> UserActor -> (BillCredit Card) -> CCProcessing Actor -> (Purchase Approved and Billed Item) -> inventory manager -> ... and so on

At each phase what is in the parentheses is an asynchronous message sent to the actor in question which performs business logic then forwards a message to the next actor in the process.

Now this is only one means of creating an actor based system, there are many other techniques but the core fundamental is you cant think imperatively but rather as a collection of steps that each run independently. Then the messages blast through the system at regular order but you cant be sure of the order or even if the message will get there so you have to design in semantics to handle that. In the system above I might have another actor checking every two minutes for orphaned orders that have not been presented to billing. Of course that means my messages need to ideimpotent to make sure if I send them the second time its ok, they wont bill the user twice.

I know I didn't deal with your specific example, I just wanted to provide some context for you that actors are not just another way to create an executor (well I suppose you could abuse them that way but its not advisable) but rather a completely different paradigm of design. A very worthwhile paradigm to learn for sure and if you make the leap you will never want to do executors ever again.