3
votes

i developped storm topology to receive JSONArray data from kafka brokers on hortonworks,

I don't know why my kafkaSpout doesn't consume messages from Kafka Brokers in HDP, however the storm topology is successfully submited but when i visualize the topology : 0% data has been consumed!!

topology visualisation

This is my Scheme class :

public class ClientInfosSheme implements Scheme{
private static final long serialVersionUID = -2990121166902741545L;
private static final Logger LOG = Logger.getLogger(ClientInfosSheme.class);
public String codeBanque;
public String codeAgence;
public String codeGuichet;
public String devise;
public String numCompte;
public String codeClient;
public String codeOperation;
public String sensOperation;
public String montantOperation;
public String dateValeur;
public String dateComptable;
public String utilisateur;

public static final String CODEBANQUE="codeBanque";
public static final String CODEAGENCE="codeAgence";
public static final String CODEGUICHET="codeGuichet";
public static final String DEVISE="devise";
public static final String NUMCOMPTE="numCompte";
public static final String CODECLIENT="codeClient";
public static final String CODEOPERATION="codeOperation";
public static final String SENSOPERATION="sensOperation";
public static final String MONTANTOPERATION="montantOperation";
public static final String DATEVALEUR="dateValeur";
public static final String DATECOMPTABLE="dateComptable";
public static final String UTILISATEUR="utilisateur";

public List<Object> deserialize(byte[] bytes) {

        try{
            String clientInfos = new String(bytes, "UTF-8");
               JSONArray JSON = new JSONArray(clientInfos);
                for(int i=0;i<JSON.length();i++) {
                    JSONObject object_clientInfos=JSON.getJSONObject(i);   
                try{     

                    //Récupérations des données

                        this.codeBanque=object_clientInfos.getString("codeBanque");
                        this.codeAgence=object_clientInfos.getString("codeAgence");
                        this.codeGuichet=object_clientInfos.getString("codeGuichet");
                        this.devise=object_clientInfos.getString("devise");
                        this.numCompte=object_clientInfos.getString("numCompte");
                        this.codeClient=object_clientInfos.getString("codeClient");
                        this.codeOperation=object_clientInfos.getString("codeOperation");
                        this.sensOperation=object_clientInfos.getString("sensOperation");
                        this.montantOperation=object_clientInfos.getString("montantOperation");
                        this.dateValeur=object_clientInfos.getString("dateValeur");
                        this.dateComptable=object_clientInfos.getString("dateComptable");
                        this.utilisateur=object_clientInfos.getString("utilisateur");

                    }
                    catch(Exception e) 
                              {
                                  e.printStackTrace(); 
                              }


    }// End For Loop



      } catch (JSONException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    } catch (UnsupportedEncodingException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }
         return new Values(codeBanque, codeAgence, codeGuichet, devise, numCompte, codeClient, codeOperation, sensOperation,
                 montantOperation,dateValeur, dateComptable,utilisateur); 

}// End Function deserialize

public Fields getOutputFields() {
        return new Fields(CODEBANQUE,CODEAGENCE,CODEGUICHET,DEVISE,NUMCOMPTE,
                CODECLIENT,CODEOPERATION, SENSOPERATION,MONTANTOPERATION,DATEVALEUR,DATECOMPTABLE,UTILISATEUR);
    }


}

and the properties file :

#Broker host
kafka.zookeeper.host.port=sandbox.hortonworks.com

#Kafka topic to consume.
kafka.topic=INFOCLIENT

#Location in ZK for the Kafka spout to store state.
kafka.zkRoot=/client_infos_sprout

#Kafka Spout Executors.
spout.thread.count=1

When i use another consumer the data storted in Kafka Brokers like :

[{"codeBanque":"xx","codeAgence":"yy","codeGuichet":"zz","devise":"tt"..},
{"codeBanque":"xx1","codeAgence":"yy1","codeGuichet":"zz1","devise":"tt1"..},
{"codeBanque":"xx2","codeAgence":"yy2","codeGuichet":"zz2","devise":"tt2"..}]

so my problem Why it doesn't consume messages from Kafka Brokers ?

Please i need help

1
Did you double check correct topic name, IP/Hostname etc? Did you check Storm and Kafka logs for error messages?Matthias J. Sax
Hi @MatthiasJ.Sax i double checked and i found that when i changed #Broker host to : kafka.zookeeper.host.port=192.168.1.78:2181 i got this problem : java.lang.RuntimeException: java.lang.IllegalArgumentException: a || b || c || calculCleRib(a,b,c) does not exist at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)Massi Issar
Hi when i check the STORM UI I saw that the mssgs are emitted and transferred but no acked ! i got this msg: The number of Tuple that were explicitly failed or imed out before acking was completed. a value of 0 is expected no acking is doneMassi Issar

1 Answers

1
votes

As you've discovered in the logs, your Spout doesn't "consume" messages, because the topology has an error and does not ack the tuples - hence the Spout will replay them. This is working as designed.

Once your topology is stable, you will observe the offset being incremented. Until then, the Spout will send messages into the topology, but you won't be able to observe results.

Without seeing the calculCleRib method, and how it i integrated into your topology, we can't help you with debugging that aspect.