I am very new to Kafka and Samza. I tried the hello-samza exampe and it is working. What I am looking for is to create a samza task that reads the message from a kafka topic.The task I added does not throw any error, and is not reading any message from the topic. Yarn UI shows task as accepted.Not sure what I am doing wrong here.
Here is the Class
public class MyTask implements StreamTask {
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) throws Exception {
System.out.println(" key - " + incomingMessageEnvelope.getKey() + " | message " + incomingMessageEnvelope.getMessage());
}
}
Here is the properties file
# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=addresses
# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz
# Task
task.class=samza.examples.wikipedia.task.MyTask
task.inputs=addressestopic
# Serializers
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory
# Kafka System
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
# Job Coordinator
job.coordinator.system=kafka
# Add configuration to disable checkpointing for this job once it is available in the Coordinator Stream model
# See https://issues.apache.org/jira/browse/SAMZA-465?focusedCommentId=14533346&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14533346 for more details
job.coordinator.replication.factor=1