0
votes

I'm having this issue, I have an EKS cluster which sends logs to Cloudwatch, Then Firehose stream the logs to s3 bucket.

My goal is to get these logs from s3 and forward them to elasticsearch in bulks. I wrote a python lambda function and its working perfectly when logs are jsons. My problem is some logs are strings or "kind of" JSON.

Exmaple :

kube-authenticator :

time="2019-09-13T09:30:50Z" level=error msg="Watch channel closed."

kube-apiserver :

E0912 10:19:10.649757 1 watcher.go:208] watch chan error: etcdserver: mvcc: required revision has been compacted

I'm wondering if I should try to wrap these messages and convert them to JSON or there is any way to change the log format to JSON.I thought about writing regex but i don't have enough knowledge with regex .

1
why s3 in between? better to push as a string to lambda function and then to ELK,Adiii
Because of a customer requirement, I know this way is a better one.Amit Baranes
okay so you can read the document from s3 and convert it to string before pusing to ELk some thing liek data.Body.toString('utf-8');Adiii
The logs in s3 are mostly jsons, there are some minor amount of strings that cause me this issue. I need to convert them to JSON , or - try to change the output log to JSON formant if it's possibleAmit Baranes
@Adiii Post my answer. thanks for your comments.Amit Baranes

1 Answers

1
votes

As mentioned in comments, Ended up writing 2 functions that handle the logs and convert them to JSON.

The first one handle kube-apiserver,kube-controller-manager and kube-scheduler logs groups :

def convert_text_logs_to_json_and_add_logGroup(message,logGroup):
    month_and_day = message.split(' ')[0][1:]
    month_and_day = insert_dash(month_and_day,2)
    log_time_regex = r"\s+((?:\d{2})?:\d{1,2}:\d{1,2}.\d{1,})"
    log_time = re.findall(log_time_regex, message)[0]
    currentYear = datetime.now().year
    full_log_datetime = "%s-%sT%sZ" %(currentYear,month_and_day,log_time)
    log_contnet = (re.split(log_time_regex,message)[2])
    message = '{"timestamp": "%s", "message":"%s","logGroup" :"%s"}' %(full_log_datetime,log_contnet.replace('"',''),logGroup)
    return message

the second function handles authenticator log group :

def chunkwise(array, size=2):
    it = iter(array)
    return izip(*[it]*size)

def wrap_text_to_json_and_add_logGroup(message,logGroup):
    regex = r"\".*?\"|\w+"
    matches = re.findall(regex, message)
    key_value_pairs = chunkwise(matches)
    json_message= {}
    for key_value in key_value_pairs:
        key = key_value[0]
        if key == 'time':
            key = 'timestamp'
        value = key_value[1].replace('"','')
        json_message[key] = value
    json_message['logGroup'] = logGroup
    log_to_insert = json.dumps(json_message)
    return log_to_insert

I hope these functions are useful for those who might need to insert logs from cloudwatch to elasticsearch.