4
votes

Using Docker, I've set up three containers: one for Elasticsearch, one for fluentd, and one for Kibana. Elasticsearch is on port 9200, fluentd on 24224, and Kibana on 5600. My fluentd config file is:

# INJECTED VIA DOCKER COMPOSE
<source>
  @type forward
  port 24224
  format json
</source>

#<filter **>
#  @type stdout
#</filter>

#<filter **>
 # @type parser
 # format json
 # key_name log
 # hash_value_field log
 # reserve_data true
#</filter>

<match **>
  @type copy
  <store>
    @type elasticsearch
    hosts 172.18.0.1:9200
    logstash_format true
    logstash_prefix chris.risley
    logstash_dateformat %Y%m%d
    include_tag_key true
    flush_interval 1s
  </store>
  <store>
    @type stdout
  </store>
</match>

Using this fluent-logger library https://github.com/fluent/fluent-logger-python, I've added the fluentd handler to Python's native logging function and have been able to successfully use the logger in sending information to fluentd. Once it has been sent to fluentd, it should be being posted to elastic search while also showing up in terminal. In terminal everything seems to be working and I see the following:

fluentd_1         | 2018-01-17 14:22:15.000000000 +0000 test-logger: "{\"json\":\"message\", \"log\":\"work dammit\"}"

However, when checking Kibana, no data shows up in Elasticsearch. As you can see above in the fluentd configuration, I've tried using a filter as I thought it could potentially be some sort of formatting issue and I usually receive the following error message when messing around with the config:

2018-01-16 16:50:13.641919751 +0000 fluent.warn: {"retry_time":2,"next_retry_seconds":"2018-01-16 16:50:13 +0000","chunk":"562e785fe25f67fcd8919dfd02992af2","error":"#<Fluent::ElasticsearchOutput::ConnectionFailure: Can not reach Elasticsearch cluster ({:host=>\"docker.for.mac.localhost\", :port=>9200, :scheme=>\"http\"})!>","message":"failed to flush the buffer. retry_time=2 next_retry_seconds=2018-01-16 16:50:13 +0000 chunk=\"562e785fe25f67fcd8919dfd02992af2\" error_class=Fluent::ElasticsearchOutput::ConnectionFailure error=\"Can not reach Elasticsearch cluster ({:host=>\\"docker.for.mac.localhost\\", :port=>9200, :scheme=>\\"http\\"})!\""}'

2018-01-17 14:59:03 +0000 [error]: #0 Could not push log to Elasticsearch: {"took"=>0, "errors"=>true, "items"=>[{"index"=>{"_index"=>"chris.risley-20180117", "_type"=>"fluentd", "_id"=>"AWEEoW6ECtI_aQclhe0C", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [log]", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:439"}}}}]}

Any idea what's happening? I really wish there was more documentation around issues.

And in case this is the issue, here's how I'm using the fluentd logging library in Python:

from fluent import handler
from src.directlog import DirectLog
import logging.config
import yaml
import datetime
import msgpack
from io import BytesIO


class ElasticLogger(object):

def __init__(self, tag, host, port, base_level, config=False, path='src/logging.yaml'):
    """
    Constructs and initializes a logging facility that has a fluentd handler
    :param tag: The logger's tag or name
    :param host: The host url/ip of fluentd
    :param port: The port of fluentd
    :param base_level: The base level logging priority
    :param config: Boolean to determine whether or not you configure with a dictionary
    :param path: Path to dictionary
    """
    self.tag = tag
    self.host = host
    self.port = port
    self.base_level = base_level
    self.logger = self.config(config, path)
    # TODO: Abstract Out Authentication
    self.back_up_logger = DirectLog(self.host, self.tag + str(datetime.datetime.now().strftime("%Y%M%d")),
                                    '#####', '#####', 10000, self.base_level, False)

# Public
def build(self)->logging:
    """
    Builds the logger and returns it for use
    :return: The logger
    """
    return self.logger

# Private
def config(self, dict_config=False, path='src/logging.yaml')->logging:
    """
    Configures the fluentd logger with the fields initialized in the constructor
    :param dict_config: Boolean to determine whether or not to configure it with a dictionary
    :param path: The path to the dictionary
    :return: The logger
    """
    logger = logging.getLogger(self.tag)
    if dict_config:
        with open(path) as fd:
            conf = yaml.load(fd)
        logging.config.dictConfig(conf['logging'])
    else:
        logging.basicConfig(level=self.base_level)
    h = handler.FluentHandler(self.tag, host=self.host, port=self.port, buffer_overflow_handler=self.overflow_handler)
    logger.addHandler(h)
    return logger

# Private
def overflow_handler(self, pendings):
    """
    Used to save data that overflowed the buffer
    :param pendings: The pending data?
    :return: Nothing
    """
    unpacker = msgpack.Unpacker(BytesIO(pendings))
    for unpacked in unpacker:
        print(unpacked)
        # Uncomment to have data that overflowed the buffer be posted directly to elasticsearch
        # self.back_up_logger.log(unpacked[2])


if __name__ == '__main__':
    local_host = '####'
    el = ElasticLogger('test-logger', local_host, 24224, logging.NOTSET, False).build()
    el.error('{"json":"message", "log":"work dammit"}')
1

1 Answers

0
votes

Change: hosts 172.18.0.1:9200 with host 172.18.0.1 port 9200