1
votes

Hello I am using the cloudera flume agent to get tweets and using Kafka as channel to hold the flume events. The structure of event written in kafka channel event(headers Json Tweets) is as follows where ´::::´ are the headers followed by seperators like �M and the json of the tweet as follows: it is a flume event object of structure:

event(headers, jsonTweet) 

The object is as follows:

::::�M{"filter_level":"low","retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","in_reply_to_status_id_str":null,"id":715916168905633792,"in_reply_to_user_id_str":null,"timestamp_ms":"1459522690403","in_reply_to_status_id":null,"created_at":"Fri Apr 01 14:58:10 +0000 2016","favorite_count":0,"place":null,"coordinates":null,"text":"RT @frmikeschmitz: What I wish I had been able to say about #BvS \n\nThe Damage Done - A requiem for an American","contributors":null,"retweeted_status":{"filter_level":"low","retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","in_reply_to_status_id_str":null,"id":715549110728847360,"in_reply_to_user_id_str":null,"in_reply_to_status_id":null,"created_at":"Thu Mar 31 14:39:36 +0000 2016","favorite_count":22,"place":null,"coordinates":null,"text":"What I wish I had been able to say about #BvS \n\nThe Damage Done - A requiem for an American icon.  via @bmoviesd","contributors":null,"geo":null,"entities":{"symbols":[],"urls":[{"expanded_url":"/2016/03/30/superman-and-the-damage-done","indices":[98,121],"display_url":"birthmoviesdeath.com/2016/03/30/sup\u2026","url":""}],"hashtags":[{"text":"BvS","indices":[41,45]}],"user_mentions":[{"id":202668848,"name":"Birth.Movies.Death.","indices":[126,135],"screen_name":"bmoviesd","id_str":"202668848"}]},"is_quote_status":false,"source":"<a href=\"\" rel=\"nofollow\">Twitter for iPhone<\/a>","favorited":false,"in_reply_to_user_id":null,"retweet_count":5,"id_str":"715549110728847360","user":{"location":null,"default_profile":true,"profile_background_tile":false,"statuses_count":3001,"lang":"en","profile_link_color":"0084B4","profile_banner_url":"","id":565216911,"following":null,"protected":false,"favourites_count":1858,"profile_text_color":"333333","verified":false,"description":null,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"fathermikeschmitz","profile_background_color":"C0DEED","created_at":"Sat Apr 28 03:49:18 +0000 2012","default_profile_image":false,"followers_count":17931,"profile_image_url_https":"","geo_enabled":false,"profile_background_image_url":"":"":null,"url":"","utc_offset":-14400,"time_zone":"Eastern Time (US & Canada)","notifications":null,"profile_use_background_image":true,"friends_count":81,"profile_sidebar_fill_color":"DDEEF6","screen_name":"frmikeschmitz","id_str":"565216911","profile_image_url":"","listed_count":138,"is_translator":false}},"geo":null,"entities":{"symbols":[],"urls":[{"expanded_url":"display_url":"birthmoviesdeath.com/2016/03/30/sup\u2026","url":""}],"hashtags":[{"text":"BvS","indices":[60,64]}],"user_mentions":[{"id":565216911,"name":"fathermikeschmitz","indices":[3,17],"screen_name":"frmikeschmitz","id_str":"565216911"},{"id":202668848,"name":"Birth.Movies.Death.","indices":[139,140],"screen_name":"bmoviesd","id_str":"202668848"}]},"is_quote_status":false,"source":"<a href=\"" rel=\"nofollow\">Twitter for iPhone<\/a>","favorited":false,"in_reply_to_user_id":null,"retweet_count":0,"id_str":"715916168905633792","user":{"location":null,"default_profile":true,"profile_background_tile":false,"statuses_count":25,"lang":"en","profile_link_color":"0084B4","id":2987773015,"following":null,"protected":false,"favourites_count":90,"profile_text_color":"333333","verified":false,"description":null,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"Pam Anderson","profile_background_color":"C0DEED","created_at":"Sun Jan 18 02:14:41 +0000 2015","default_profile_image":false,"followers_count":22,"profile_image_ur":"","geo_enabled":false,"profile_background_image_url":"","profile_background_image_url_httpd":"":null,"url":null,"utc_offset":null,"time_zone":null,"notifications":null,"profile_use_background_image":true,"friends_count":59,"profile_sidebar_fill_color":"DDEEF6","screen_name":"pamawah25","id_str":"2987773015","profile_image_url":"","listed_count":0,"is_translator":false}}

The flume agent i am working is as follows:

agent1.sources.twitter-data.type = com.cloudera.flume.source.TwitterSource
agent1.sources.twitter-data.consumerKey = ""
agent1.sources.twitter-data.consumerSecret = ""
agent1.sources.twitter-data.accessToken = ""
agent1.sources.twitter-data.accessTokenSecret = ""
agent1.sources.twitter-data.keywords = superman, batman, iron man, tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql

agent1.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.kafka-channel.capacity = 10000
agent1.channels.kafka-channel.transactionCapacity = 1000
agent1.channels.kafka-channel.brokerList = kafka:9092
agent1.channels.kafka-channel.topic = twitter
agent1.channels.kafka-channel.zookeeperConnect = kafka:2181
#agent1.channels.kafka-channel.parseAsFlumeEvent = false 

agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs:///user/Hadoop/twitter_data
agent1.sinks.hdfs-sink.fileType = DataStream
agent1.sinks.hdfs-sink.writeFormat = Text
agent1.sinks.hdfs-sink.batchSize = 1000
agent1.sinks.hdfs-sink.rollSize = 0
agent1.sinks.hdfs-sink.rollCount = 10000

agent1.sources.twitter-data.channels = kafka-channel

I would like to read this data in spark streaming / spark Sql to process and store it.

So i used the code as follows, but it dosent help because of the flume event:

val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext =  new org.apache.spark.sql.hive.HiveContext(sc)//new org.apache.spark.sql.SQLContext(sc)

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)

// Get the data (tweets) from kafka
val tweets = messages.map(_._2)




tweets.foreachRDD { rdd =>
  val jsonRDD = sqlContext.read.json(rdd)
  val tweetTable = jsonRDD.toDF()
  tweetTable.printSchema()
  tweetTable.show(5)
  tweetTable.write.mode("append").saveAsTable("twitterStream")

}
1
i got something like "map(e => "Event:header:" + e.event.get(0).toString + "body: " + new String(e.event.getBody.array)).print" from another post, trying to figure out how to use it with kafka channel? - Mouzzam Hussain

1 Answers

0
votes

If you're consuming them from a Kafka stream, you'll need to manually parse the value via the separator:

val tweets = messages.map { case (_, tweet) => { 
    val splitTweet = tweet.split("?M") 
    (splitTweet(0), splitTweet(1))
    }
 }

This will yield the concatenated header as the first value of the tuple, and and second value will contain the JSON representing the tweet.