1
votes

I am developing application with akka,kafka,scala and my application is working fine on my local system but whenever I am deploying it on cluster which is kerberos security enabled I am unable to recevie messages at kafka consumer side. For testing purpose I am running producer from kafka command line and sending messages from command line.Could you please let me know if anything wrong in my configuration? Please find below my application.config file:

abc-management {
  kafka {
    capture-topic = [${?ABC}, "abc.pqr"]
    producer {
      brokers  =  ["abc123.systems.pqr:3230,abc12314.systems.pqr:3230,abc11234.systems.pqr:1244"]
    }
    consumer {
      zkConnect  = ["abc12443.systems.abc:234","abc1244.systems.abc:1241","abc121414.systems.abc:2181"]
      groupid = ["abc-consumer"]
      auto-offset-reset = "earliest"
    }
    offset {
      group-batch = 5
      group-time = 3
      parallelism-factor = 2
    }
  }
  akka {
    actor {
      cluster-name = "ABCSystem"
      timeout = [${?TIMEOUT}, 10]
      supervisor-strategy {
        max-number-of-retries = 10
        within-time-range = 30
      }
    }
  }
  shutdown-hook-time = 30
}
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
    deployment {
      /abcActor {
        router = round-robin-pool
        nr-of-instances = 20
      }
    }
  }
  kafka {
    consumer {
      poll-interval = 30ms
      poll-timeout = 30ms
      stop-timeout = 20s
      close-timeout = 10s
      commit-timeout = 10s
      wakeup-timeout = 10s
      use-dispatcher = "akka.kafka.default-dispatcher"
      kafka-clients {
        enable.auto.commit = true
        security.protocol = "SASL_PLAINTEXT"
         #sasl.kerberos.service.name=kafka
         #ssl.client.auth = "none"
      }
    }
    producer{
      kafka-clients {
         security.protocol = "SASL_PLAINTEXT"
         #sasl.kerberos.service.name=kafka
        # ssl.client.auth = "none"
      }
    }
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
  cluster {
    seed-host = "127.0.0.1"
    seed-host = ${?AKKA_SEED_HOST}
    seed-port = "2551"
    seed-port = ${?AKKA_SEED_PORT}
    seed-nodes = [
      "akka.tcp://"${abc-management.akka.actor.cluster-name}"@"${akka.cluster.seed-host}":"${akka.cluster.seed-port}
    ]
    min-nr-of-members = 1
  }
}
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
circuit-breaker {
  maxFailures = [${?CB_MAX_FAILURES}, 10]
  callTimeout = [${?CB_MAX_FAILURES}, 10000]
  resetTimeout = [${?CB_MAX_FAILURES}, 30000]
}

Below is my jaas file:

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
useKeyTab=true
keyTab="/home/abc/.abc.headless.keytab"
renewTicket=true
serviceName="kafka"
principal="[email protected]"
debug=true
client=true;
};
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  doNotPrompt=true
  useKeyTab=true
  storeKey=true
  useTicketCache=false
  serviceName="zookeeper"
  keyTab="/home/abc/.abc.headless.keytab"
  principal="[email protected]"
  debug=true;
};

Please find below consumer side log:

[xyz@asd12344 lib]$ java -Dconfig.file=application.conf -Djava.security.auth.login.config=kafka_jaas.conf -jar abc-management.jar
[INFO] [06/07/2017 16:29:12.354] [main] [akka.remote.Remoting] Starting remoting
[INFO] [06/07/2017 16:29:12.492] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2551]
[INFO] [06/07/2017 16:29:12.503] [main] [akka.cluster.Cluster(akka://ABCSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Starting up...
[INFO] [06/07/2017 16:29:12.586] [main] [akka.cluster.Cluster(akka://ABCSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [06/07/2017 16:29:12.586] [main] [akka.cluster.Cluster(akka://ABCSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Started up successfully
[INFO] [06/07/2017 16:29:12.619] [ABCSystem-akka.actor.default-dispatcher-15] [akka.cluster.Cluster(akka://ABCSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Metrics collection has started successfully
[INFO] [06/07/2017 16:29:12.629] [ABCSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ABCSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Node [akka.tcp://[email protected]:2551] is JOINING, roles []
[INFO] [06/07/2017 16:29:12.637] [ABCSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ABCSystem)] Cluster Node [akka.tcp://[email protected]:2551] - Leader is moving node [akka.tcp://[email protected]:2551] to [Up]
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
[INFO] [06/07/2017 16:29:44.498] [ABCSystem-akka.actor.default-dispatcher-24] [akka://ABCSystem/user/abcManagementActor/$h/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.498] [ABCSystem-akka.actor.default-dispatcher-30] [akka://ABCSystem/user/abcManagementActor/$c/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.497] [ABCSystem-akka.actor.default-dispatcher-22] [akka://ABCSystem/user/abcManagementActor/$i/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.501] [ABCSystem-akka.actor.default-dispatcher-22] [akka://ABCSystem/user/abcManagementActor/$d/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.501] [ABCSystem-akka.actor.default-dispatcher-28] [akka://ABCSystem/user/abcManagementActor/$b/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.499] [ABCSystem-akka.actor.default-dispatcher-23] [akka://ABCSystem/user/abcManagementActor/$f/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.502] [ABCSystem-akka.actor.default-dispatcher-26] [akka://ABCSystem/user/abcManagementActor/$a/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.502] [ABCSystem-akka.actor.default-dispatcher-25] [akka://ABCSystem/user/abcManagementActor/$e/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.504] [ABCSystem-akka.actor.default-dispatcher-29] [akka://ABCSystem/user/abcManagementActor/$g/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.505] [ABCSystem-akka.actor.default-dispatcher-18] [akka://ABCSystem/user/abcManagementActor/$s/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.507] [ABCSystem-akka.actor.default-dispatcher-44] [akka://ABCSystem/user/abcManagementActor/$q/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.507] [ABCSystem-akka.actor.default-dispatcher-44] [akka://ABCSystem/user/abcManagementActor/$j/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.507] [ABCSystem-akka.actor.default-dispatcher-45] [akka://ABCSystem/user/abcManagementActor/$m/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.508] [ABCSystem-akka.actor.default-dispatcher-45] [akka://ABCSystem/user/abcManagementActor/$l/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.508] [ABCSystem-akka.actor.default-dispatcher-29] [akka://ABCSystem/user/abcManagementActor/$n/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.509] [ABCSystem-akka.actor.default-dispatcher-47] [akka://ABCSystem/user/abcManagementActor/$k/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.509] [ABCSystem-akka.actor.default-dispatcher-18] [akka.tcp://[email protected]:2551/user/abcManagementActor/$f] Member up: akka.tcp://[email protected]:2551
[INFO] [06/07/2017 16:29:44.510] [ABCSystem-akka.actor.default-dispatcher-56] [akka://ABCSystem/user/abcManagementActor/$p/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.510] [ABCSystem-akka.actor.default-dispatcher-2] [akka://ABCSystem/user/abcManagementActor/$r/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.510] [ABCSystem-akka.actor.default-dispatcher-21] [akka://ABCSystem/user/abcManagementActor/$o/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.512] [ABCSystem-akka.actor.default-dispatcher-15] [akka://ABCSystem/user/abcManagementActor/$t/abcEnrichmentActor] The abcEnrichment Actor is ready to receive the requests
[INFO] [06/07/2017 16:29:44.512] [ABCSystem-akka.actor.default-dispatcher-15] [akka.tcp://[email protected]:2551/user/abcManagementActor/$a] Member up: akka.tcp://[email protected]:2551
[INFO] [06/07/2017 16:29:44.513] [ABCSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/abcManagementActor/$i] Member up: akka.tcp://[email protected]:2551
[INFO] [06/07/2017 16:29:44.516] [ABCSystem-akka.actor.default-dispatcher-25] [akka.tcp://[email protected]:2551/user/abcManagementActor/$g] Member up: akka.tcp://[email protected]:2551
[INFO] [06/07/2017 16:29:44.517] [ABCSystem-akka.actor.default-dispatcher-4] [akka.tcp://[email protected]:2551/user/abcManagementActor/$e] Member up: akka.tcp://[email protected]:2551
Debug is  true storeKey false useTicketCache true useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/xyz/.xyz.headless.keytab refreshKrb5Config is false principal is [email protected] tryFirstPass is false useFirstPass is false storePass is false clearPass is false
Acquire TGT from Cache
Principal is [email protected]
Commit Succeeded

[WARN] [06/07/2017 16:30:04.830] [ABCSystem-akka.kafka.default-dispatcher-62] [akka.tcp://[email protected]:2551/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 20000 milliseconds
[WARN] [06/07/2017 16:30:24.919] [ABCSystem-akka.kafka.default-dispatcher-64] [akka.tcp://[email protected]:2551/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 20000 milliseconds
^C16:30:35.253 [shutdownHook1] INFO  c.pqr.asd.abcManagementMain - Terminating... - 2017-06-07T15:30:35.239Z
[INFO] [06/07/2017 16:30:35.265] [ABCSystem-akka.actor.default-dispatcher-27] [akka://ABCSystem/user/abcManagementActor/$b/abcEnrichmentActor] The abcEnrichment Actor is gonna stop and would not entertain any requests
[INFO] [06/07/2017 16:30:35.265] [ABCSystem-akka.actor.default-dispatcher-16] [akka://ABCSystem/user/abcManagementActor/$a/abcEnrichmentActor] The abcEnrichment Actor is gonna stop and would not entertain any requests
o.a.k.c.s.kerberos.KerberosLogin - [[email protected]]: TGT renewal thread has been interrupted and will exit.
16:30:45.020 [shutdownHook1] INFO  c.pqr.asd.abcManagementMain - Terminated... Bye - 2017-06-07T15:30:45.020Z
[xyz@jbt13993 lib]$
1
OK, so you forced a KILL signal (Ctrl-C) to your consumer less than 1s after it got its Kerberos TGT (generic authentication phase); and you did not enable the full Kerberos debug trace that would have shown what happened with the Kerberos service ticket. What does that prove? - Samson Scharfrichter
And by the way, do you have a question?? Then make it explicit... - Samson Scharfrichter
@SamsonScharfrichter: I am having question that is the reason I am posting it here; I am sending messages from producer side on Kafka Broker and these messages are not getting consumed on Kafka consumer side and reason behind this I am unable to find out as per your suggestion I will try enabling full kerberos debug trace - Mahendra Tonape
That is still not a question, it is an assertion. - Samson Scharfrichter
Here I am unable to get messages sent from scala akka kafka producer at consumer side... so I think there may be some mistake in the way I am sending data or configuring I am unable to find out the mistake so I am asking can someone help me to find out what wrong I am doing..? - Mahendra Tonape

1 Answers

0
votes

As I said we were unable to receive Messages at Consumer side on dev cluster and we are able to receive messages on local machine that is because Our dev cluster Kafka version is 0.9 and my local machine kafka version is 0.10 if you checked carefully there are significant differences between Kafka 0.9 and kafka 0.10 versions.After changing our API's and version it worked properly.