EKS logs to CloudWatch stream as string

9/13/2019

I'm having this issue, I have an EKS cluster which sends logs to Cloudwatch, Then Firehose stream the logs to s3 bucket.

My goal is to get these logs from s3 and forward them to elasticsearch in bulks. I wrote a python lambda function and its working perfectly when logs are jsons. My problem is some logs are strings or "kind of" JSON.

Exmaple :

kube-authenticator :

time="2019-09-13T09:30:50Z" level=error msg="Watch channel closed."

kube-apiserver :

E0912 10:19:10.649757 1 watcher.go:208] watch chan error: etcdserver: mvcc: required revision has been compacted

I'm wondering if I should try to wrap these messages and convert them to JSON or there is any way to change the log format to JSON.I thought about writing regex but i don't have enough knowledge with regex .

-- Amit Baranes
amazon-eks
amazon-web-services
elasticsearch
kubernetes
python

1 Answer

9/17/2019

As mentioned in comments, Ended up writing 2 functions that handle the logs and convert them to JSON.

The first one handle kube-apiserver,kube-controller-manager and kube-scheduler logs groups :

def convert_text_logs_to_json_and_add_logGroup(message,logGroup):
    month_and_day = message.split(' ')[0][1:]
    month_and_day = insert_dash(month_and_day,2)
    log_time_regex = r"\s+((?:\d{2})?:\d{1,2}:\d{1,2}.\d{1,})"
    log_time = re.findall(log_time_regex, message)[0]
    currentYear = datetime.now().year
    full_log_datetime = "%s-%sT%sZ" %(currentYear,month_and_day,log_time)
    log_contnet = (re.split(log_time_regex,message)[2])
    message = '{"timestamp": "%s", "message":"%s","logGroup" :"%s"}' %(full_log_datetime,log_contnet.replace('"',''),logGroup)
    return message

the second function handles authenticator log group :

def chunkwise(array, size=2):
    it = iter(array)
    return izip(*[it]*size)

def wrap_text_to_json_and_add_logGroup(message,logGroup):
    regex = r"\".*?\"|\w+"
    matches = re.findall(regex, message)
    key_value_pairs = chunkwise(matches)
    json_message= {}
    for key_value in key_value_pairs:
        key = key_value[0]
        if key == 'time':
            key = 'timestamp'
        value = key_value[1].replace('"','')
        json_message[key] = value
    json_message['logGroup'] = logGroup
    log_to_insert = json.dumps(json_message)
    return log_to_insert

I hope these functions are useful for those who might need to insert logs from cloudwatch to elasticsearch.

-- Amit Baranes
Source: StackOverflow