I am just keen to know whether it's possible to replace the old Multi threading codes written with Java's Executor service to Akka. I have few doubts regarding this.
Is akka actor runs in their own thread?
How Threads will be assigned for the Actors ?
What are the pros and cons of migration of it is possible?
Currently I use Fixed Thread pool for multi threading, and submit a callable.
Sample Code,
public class KafkaConsumerFactory {
private static Map<String,KafkaConsumer> registry = new HashMap<>();
private static ThreadLocal<KafkaConsumer> consumers = new ThreadLocal<KafkaConsumer>(){
@Override
protected KafkaConsumer initialValue() {
return new KafkaConsumer(createConsumerConfig());
}
};
static {
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
registry.forEach((tid,con) -> {
try{
con.close();
} finally {
System.out.println("Yes!! Consumer for " + tid + " is closed.");
}
});
}
});
}
private static Properties createConsumerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "newcon-grp5");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", KafkaKryoSerde.class.getName());
return props;
}
public static <K,V> KafkaConsumer<K,V> createConsumer(){
registry.put(Thread.currentThread().getName(),consumers.get());
return consumers.get();
}
}
/////////////////////////////////////////////////////////
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class KafkaNewConsumer {
public static int MAX_THREADS = 10;
private ExecutorService es = null;
private boolean stopRequest = false;
public static void main(String[] args){
KafkaNewConsumer knc = new KafkaNewConsumer();
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run(){
knc.es.shutdown();
try {
knc.es.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}finally {
System.out.println("Finished");
}
}
});
knc.consumeTopic("rtest3",knc::recordConsuemer);
}
public void recordConsuemer(ConsumerRecord<?,?> record){
String result = new StringJoiner(": ")
.add(Thread.currentThread().getName())
.add("ts").add(String.valueOf(record.timestamp()))
.add("offset").add(String.valueOf(record.offset()))
.add("data").add(String.valueOf(record.value()))
.add("value-len").add(String.valueOf(record.serializedValueSize()))
.toString();
System.out.println(result);
}
public void consumeTopic(String topicName, Consumer<ConsumerRecord<?,?>> fun){
KafkaConsumer con= KafkaConsumerFactory.createConsumer();
int paritions = con.partitionsFor(topicName).size();
int noOfThread = (MAX_THREADS < paritions) ? MAX_THREADS :paritions;
es = Executors.newFixedThreadPool(noOfThread);
con.close();
for(int i=0;i<noOfThread;i++){
es.submit(()->{
KafkaConsumer consumer = KafkaConsumerFactory.createConsumer();
try{
while (!stopRequest){
consumer.subscribe(Collections.singletonList(topicName));
ConsumerRecords<?,?> records = consumer.poll(5000);
records.forEach(fun);
consumer.commitSync();
}
}catch(Exception e){
e.printStackTrace();
} finally {
consumer.close();
}
});
}
}
}
I went through some of the internet tutorial, some of them directly concludes
actors was very good and faster than traditional threads.
But no explanation how it can become faster than threads ?
I tried some sample Akka(Akka sample from activator) code, and printed Thread.currentThread.getName inside all actors and found different dispatcher threads named(helloakka-akka.actor.default-dispatcher-X) are created.
But how ? who is creating those threads ? where is the configuration for them ? What is the mapping relations between a thread and an Actor ?
Every time I send a message will Akka create new Thread? Or internally a Thread pool is used?
If I need 100 threads to do parallel execution of parts of the some task, do I need to create 100 Actors and send 1 message to each of them ? Or I need to create 1 actor and put 100 message in it's queue , It will get forked into 100 threads.
Really Confused