3
votes

I'm trying to make a local Orion+Cygnus persist Orion's data on a local HDFS through WebHDFS.

On Cygnus' instructions on gitub, very little is mentioned about WebHDFS, as the configuration is more about HttpFS. On the .md OrionHDFSsink it's said that hdfs_port=50070 is for WebHDFS, as indeed my HDFS is. So I would expect by setting the port this way cygnus would automatically use WebHDFS, but on my case it doesn't seem to be working this way.

So, here's my agent_1.conf:

cygnusagent.sources = http-source
cygnusagent.sinks = hdfs-sink
cygnusagent.channels = hdfs-channel

# source configuration
cygnusagent.sources.http-source.channels = hdfs-channel
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnusagent.sources.http-source.port = 5050
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler
cygnusagent.sources.http-source.handler.notification_target = /notify
cygnusagent.sources.http-source.handler.default_service = def_serv
cygnusagent.sources.http-source.handler.default_service_path = def_servpath
cygnusagent.sources.http-source.handler.events_ttl = 4
cygnusagent.sources.http-source.interceptors = ts gi
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
cygnusagent.sources.http-source.interceptors.gi.grouping_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf
    
# OrionHDFSSink configuration
cygnusagent.sinks.hdfs-sink.channel = hdfs-channel
cygnusagent.sinks.hdfs-sink.type = com.telefonica.iot.cygnus.sinks.OrionHDFSSink
cygnusagent.sinks.hdfs-sink.hdfs_host = localHDFS.ip
cygnusagent.sinks.hdfs-sink.hdfs_port = 50070
cygnusagent.sinks.hdfs-sink.hdfs_username = HDFSrootUser
cygnusagent.sinks.hdfs-sink.attr_persistence = column

# hdfs-channel configuration
cygnusagent.channels.hdfs-channel.type = memory
cygnusagent.channels.hdfs-channel.capacity = 1000
cygnusagent.channels.hdfs-channel.transactionCapacity = 100

When I update an Entity on Orion, to whom Cygnus is subbed, Cygnus logs the following:

02 Sep 2015 20:09:12,353 INFO  [2055470757@qtp-1523539038-0] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:150)  - Starting transaction (1441217314-956-0000000000)
02 Sep 2015 20:09:12,362 INFO  [2055470757@qtp-1523539038-0] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:236)  - Received data ({  "subscriptionId" : "55e735c9b89e8535f8ca5ef2",  "originator" : "localhost",  "contextResponses" : [    {      "contextElement" : {        "type" : "Reading",        "isPattern" : "false",        "id" : "Reading1.1",        "attributes" : [          {            "name" : "Cost",            "type" : "double",            "value" : "32"          },          {            "name" : "Reading_ID",            "type" : "integer",            "value" : "14"          },          {            "name" : "Threshold",            "type" : "double",            "value" : "30"          },          {            "name" : "email",            "type" : "string",            "value" : "arthurmvieira@hotmail.com"          }        ]      },      "statusCode" : {        "code" : "200",        "reasonPhrase" : "OK"      }    }  ]})
02 Sep 2015 20:09:12,366 INFO  [2055470757@qtp-1523539038-0] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:258)  - Event put in the channel (id=2020008711, ttl=4)
02 Sep 2015 20:09:12,432 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:128)  - Event got from the channel (id=2020008711, headers={fiware-servicepath=def_servpath, destination=reading1.1_reading, content-type=application/json, fiware-service=def_serv, ttl=4, transactionId=1441217314-956-0000000000, timestamp=1441217352368}, bodyLength=812)
02 Sep 2015 20:09:12,549 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionHDFSSink.persist:356)  - [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (def_serv/def_servpath/reading1.1_reading/reading1.1_reading.txt), Data ({"recvTime":"2015-09-02T18:09:12.368Z","Cost":"32", "Cost_md":[],"Reading_ID":"14", "Reading_ID_md":[],"Threshold":"30", "Threshold_md":[],"email":"arthurmvieira@hotmail.com", "email_md":[]})
02 Sep 2015 20:09:12,557 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:143)  - Persistence error (The /user/root/def_serv/def_servpath/reading1.1_reading directory could not be created in HDFS. HttpFS response: 503 Service unavailable)
02 Sep 2015 20:09:12,558 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:173)  - An event was put again in the channel (id=2020008711, ttl=3)
02 Sep 2015 20:09:12,558 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:193)  - Finishing transaction (1441217314-956-0000000000)
02 Sep 2015 20:09:13,560 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:128)  - Event got from the channel (id=2020008711, headers={fiware-servicepath=def_servpath, destination=reading1.1_reading, content-type=application/json, fiware-service=def_serv, ttl=3, transactionId=1441217314-956-0000000000, timestamp=1441217352368}, bodyLength=812)
02 Sep 2015 20:09:13,574 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionHDFSSink.persist:356)  - [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (def_serv/def_servpath/reading1.1_reading/reading1.1_reading.txt), Data ({"recvTime":"2015-09-02T18:09:12.368Z","Cost":"32", "Cost_md":[],"Reading_ID":"14", "Reading_ID_md":[],"Threshold":"30", "Threshold_md":[],"email":"arthurmvieira@hotmail.com", "email_md":[]})
02 Sep 2015 20:09:13,574 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:143)  - Persistence error (The /user/root/def_serv/def_servpath/reading1.1_reading directory could not be created in HDFS. HttpFS response: 503 Service unavailable)
02 Sep 2015 20:09:13,575 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:173)  - An event was put again in the channel (id=2020008711, ttl=2)
02 Sep 2015 20:09:13,575 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:193)  - Finishing transaction (1441217314-956-0000000000)
02 Sep 2015 20:09:15,576 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:128)  - Event got from the channel (id=2020008711, headers={fiware-servicepath=def_servpath, destination=reading1.1_reading, content-type=application/json, fiware-service=def_serv, ttl=2, transactionId=1441217314-956-0000000000, timestamp=1441217352368}, bodyLength=812)
02 Sep 2015 20:09:15,590 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionHDFSSink.persist:356)  - [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (def_serv/def_servpath/reading1.1_reading/reading1.1_reading.txt), Data ({"recvTime":"2015-09-02T18:09:12.368Z","Cost":"32", "Cost_md":[],"Reading_ID":"14", "Reading_ID_md":[],"Threshold":"30", "Threshold_md":[],"email":"arthurmvieira@hotmail.com", "email_md":[]})
02 Sep 2015 20:09:15,599 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:143)  - Persistence error (The /user/root/def_serv/def_servpath/reading1.1_reading directory could not be created in HDFS. HttpFS response: 503 Service unavailable)
02 Sep 2015 20:09:15,600 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:173)  - An event was put again in the channel (id=2020008711, ttl=1)
02 Sep 2015 20:09:15,600 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:193)  - Finishing transaction (1441217314-956-0000000000)
02 Sep 2015 20:09:18,601 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:128)  - Event got from the channel (id=2020008711, headers={fiware-servicepath=def_servpath, destination=reading1.1_reading, content-type=application/json, fiware-service=def_serv, ttl=1, transactionId=1441217314-956-0000000000, timestamp=1441217352368}, bodyLength=812)
02 Sep 2015 20:09:18,615 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionHDFSSink.persist:356)  - [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (def_serv/def_servpath/reading1.1_reading/reading1.1_reading.txt), Data ({"recvTime":"2015-09-02T18:09:12.368Z","Cost":"32", "Cost_md":[],"Reading_ID":"14", "Reading_ID_md":[],"Threshold":"30", "Threshold_md":[],"email":"arthurmvieira@hotmail.com", "email_md":[]})
02 Sep 2015 20:09:18,618 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:143)  - Persistence error (The /user/root/def_serv/def_servpath/reading1.1_reading directory could not be created in HDFS. HttpFS response: 503 Service unavailable)
02 Sep 2015 20:09:18,621 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:173)  - An event was put again in the channel (id=2020008711, ttl=0)
02 Sep 2015 20:09:18,621 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:193)  - Finishing transaction (1441217314-956-0000000000)
02 Sep 2015 20:09:22,622 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:128)  - Event got from the channel (id=2020008711, headers={fiware-servicepath=def_servpath, destination=reading1.1_reading, content-type=application/json, fiware-service=def_serv, ttl=0, transactionId=1441217314-956-0000000000, timestamp=1441217352368}, bodyLength=812)
02 Sep 2015 20:09:22,635 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionHDFSSink.persist:356)  - [hdfs-sink] Persisting data at OrionHDFSSink. HDFS file (def_serv/def_servpath/reading1.1_reading/reading1.1_reading.txt), Data ({"recvTime":"2015-09-02T18:09:12.368Z","Cost":"32", "Cost_md":[],"Reading_ID":"14", "Reading_ID_md":[],"Threshold":"30", "Threshold_md":[],"email":"arthurmvieira@hotmail.com", "email_md":[]})
02 Sep 2015 20:09:22,635 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:143)  - Persistence error (The /user/root/def_serv/def_servpath/reading1.1_reading directory could not be created in HDFS. HttpFS response: 503 Service unavailable)
02 Sep 2015 20:09:22,635 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:163)  - The event TTL has expired, it is no more re-injected in the channel (id=2020008711, ttl=0)
02 Sep 2015 20:09:22,635 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:193)  - Finishing transaction (1441217314-956-0000000000)

So you can see it's trying to use HttpFS, as it logs the response:

HttpFS response: 503 Service unavailable

...on each writing try.

How should I configure the agent to use WebHDFS?

Thank you

2

2 Answers

1
votes

I don't know what was happening, but the configuration mentioned is correct and is working now.

After several tries at rebooting the instance, rewriting the config files and other log errors than the one mentioned, it worked. At some point Cygnus was trying to write to localhost:50075, instead of {localHDFS.ip}:50070, but that was gone after rebooting cygnus.

All instances are at their latest version (important).

1
votes

Cygnus configuration for WebHDFS is just about setting the port to 50070, nothing else is required.

Regarding the connections you mention to 50075, they are correct as well, since that's the behaviour of WebHDFS: when you want to upload data to HDFS, first the client (in this case, Cygnus) accesses the Namenode through TCP/50070 port, then the namenode responds with a redirection location pointing to the datanode where the data will be effectively uploaded; such a redirection uses the TCP/50075 port, and thus that datanode:50075 must be accessible by the client (Cygnus). That's why we are using HttpFS in the global instance of Cosmos at FIWARE Lab: HttpFS works as a gateway hiding the details of the datanodes, and a single entry point and port (14000) is required.