Hello I am using the cloudera flume agent to get tweets and using Kafka as channel to hold the flume events. The structure of event written in kafka channel event(headers Json Tweets) is as follows where ´::::´ are the headers followed by seperators like �M and the json of the tweet as follows: it is a flume event object of structure:
event(headers, jsonTweet)
The object is as follows:
::::�M{"filter_level":"low","retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","in_reply_to_status_id_str":null,"id":715916168905633792,"in_reply_to_user_id_str":null,"timestamp_ms":"1459522690403","in_reply_to_status_id":null,"created_at":"Fri Apr 01 14:58:10 +0000 2016","favorite_count":0,"place":null,"coordinates":null,"text":"RT @frmikeschmitz: What I wish I had been able to say about #BvS \n\nThe Damage Done - A requiem for an American","contributors":null,"retweeted_status":{"filter_level":"low","retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","in_reply_to_status_id_str":null,"id":715549110728847360,"in_reply_to_user_id_str":null,"in_reply_to_status_id":null,"created_at":"Thu Mar 31 14:39:36 +0000 2016","favorite_count":22,"place":null,"coordinates":null,"text":"What I wish I had been able to say about #BvS \n\nThe Damage Done - A requiem for an American icon. via @bmoviesd","contributors":null,"geo":null,"entities":{"symbols":[],"urls":[{"expanded_url":"/2016/03/30/superman-and-the-damage-done","indices":[98,121],"display_url":"birthmoviesdeath.com/2016/03/30/sup\u2026","url":""}],"hashtags":[{"text":"BvS","indices":[41,45]}],"user_mentions":[{"id":202668848,"name":"Birth.Movies.Death.","indices":[126,135],"screen_name":"bmoviesd","id_str":"202668848"}]},"is_quote_status":false,"source":"<a href=\"\" rel=\"nofollow\">Twitter for iPhone<\/a>","favorited":false,"in_reply_to_user_id":null,"retweet_count":5,"id_str":"715549110728847360","user":{"location":null,"default_profile":true,"profile_background_tile":false,"statuses_count":3001,"lang":"en","profile_link_color":"0084B4","profile_banner_url":"","id":565216911,"following":null,"protected":false,"favourites_count":1858,"profile_text_color":"333333","verified":false,"description":null,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"fathermikeschmitz","profile_background_color":"C0DEED","created_at":"Sat Apr 28 03:49:18 +0000 2012","default_profile_image":false,"followers_count":17931,"profile_image_url_https":"","geo_enabled":false,"profile_background_image_url":"":"":null,"url":"","utc_offset":-14400,"time_zone":"Eastern Time (US & Canada)","notifications":null,"profile_use_background_image":true,"friends_count":81,"profile_sidebar_fill_color":"DDEEF6","screen_name":"frmikeschmitz","id_str":"565216911","profile_image_url":"","listed_count":138,"is_translator":false}},"geo":null,"entities":{"symbols":[],"urls":[{"expanded_url":"display_url":"birthmoviesdeath.com/2016/03/30/sup\u2026","url":""}],"hashtags":[{"text":"BvS","indices":[60,64]}],"user_mentions":[{"id":565216911,"name":"fathermikeschmitz","indices":[3,17],"screen_name":"frmikeschmitz","id_str":"565216911"},{"id":202668848,"name":"Birth.Movies.Death.","indices":[139,140],"screen_name":"bmoviesd","id_str":"202668848"}]},"is_quote_status":false,"source":"<a href=\"" rel=\"nofollow\">Twitter for iPhone<\/a>","favorited":false,"in_reply_to_user_id":null,"retweet_count":0,"id_str":"715916168905633792","user":{"location":null,"default_profile":true,"profile_background_tile":false,"statuses_count":25,"lang":"en","profile_link_color":"0084B4","id":2987773015,"following":null,"protected":false,"favourites_count":90,"profile_text_color":"333333","verified":false,"description":null,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"Pam Anderson","profile_background_color":"C0DEED","created_at":"Sun Jan 18 02:14:41 +0000 2015","default_profile_image":false,"followers_count":22,"profile_image_ur":"","geo_enabled":false,"profile_background_image_url":"","profile_background_image_url_httpd":"":null,"url":null,"utc_offset":null,"time_zone":null,"notifications":null,"profile_use_background_image":true,"friends_count":59,"profile_sidebar_fill_color":"DDEEF6","screen_name":"pamawah25","id_str":"2987773015","profile_image_url":"","listed_count":0,"is_translator":false}}
The flume agent i am working is as follows:
agent1.sources.twitter-data.type = com.cloudera.flume.source.TwitterSource
agent1.sources.twitter-data.consumerKey = ""
agent1.sources.twitter-data.consumerSecret = ""
agent1.sources.twitter-data.accessToken = ""
agent1.sources.twitter-data.accessTokenSecret = ""
agent1.sources.twitter-data.keywords = superman, batman, iron man, tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql
agent1.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.kafka-channel.capacity = 10000
agent1.channels.kafka-channel.transactionCapacity = 1000
agent1.channels.kafka-channel.brokerList = kafka:9092
agent1.channels.kafka-channel.topic = twitter
agent1.channels.kafka-channel.zookeeperConnect = kafka:2181
#agent1.channels.kafka-channel.parseAsFlumeEvent = false
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs:///user/Hadoop/twitter_data
agent1.sinks.hdfs-sink.fileType = DataStream
agent1.sinks.hdfs-sink.writeFormat = Text
agent1.sinks.hdfs-sink.batchSize = 1000
agent1.sinks.hdfs-sink.rollSize = 0
agent1.sinks.hdfs-sink.rollCount = 10000
agent1.sources.twitter-data.channels = kafka-channel
I would like to read this data in spark streaming / spark Sql to process and store it.
So i used the code as follows, but it dosent help because of the flume event:
val ssc = new StreamingContext(sc, Seconds(2))
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)//new org.apache.spark.sql.SQLContext(sc)
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the data (tweets) from kafka
val tweets = messages.map(_._2)
tweets.foreachRDD { rdd =>
val jsonRDD = sqlContext.read.json(rdd)
val tweetTable = jsonRDD.toDF()
tweetTable.printSchema()
tweetTable.show(5)
tweetTable.write.mode("append").saveAsTable("twitterStream")
}