I'm having this issue, I have an EKS cluster which sends logs to Cloudwatch, Then Firehose stream the logs to s3 bucket.
My goal is to get these logs from s3 and forward them to elasticsearch in bulks. I wrote a python lambda function and its working perfectly when logs are jsons. My problem is some logs are strings or "kind of" JSON.
Exmaple :
kube-authenticator :
time="2019-09-13T09:30:50Z" level=error msg="Watch channel closed."
kube-apiserver :
E0912 10:19:10.649757 1 watcher.go:208] watch chan error: etcdserver: mvcc: required revision has been compacted
I'm wondering if I should try to wrap these messages and convert them to JSON or there is any way to change the log format to JSON.I thought about writing regex but i don't have enough knowledge with regex .
As mentioned in comments, Ended up writing 2 functions that handle the logs and convert them to JSON.
The first one handle kube-apiserver,kube-controller-manager and kube-scheduler
logs groups :
def convert_text_logs_to_json_and_add_logGroup(message,logGroup):
month_and_day = message.split(' ')[0][1:]
month_and_day = insert_dash(month_and_day,2)
log_time_regex = r"\s+((?:\d{2})?:\d{1,2}:\d{1,2}.\d{1,})"
log_time = re.findall(log_time_regex, message)[0]
currentYear = datetime.now().year
full_log_datetime = "%s-%sT%sZ" %(currentYear,month_and_day,log_time)
log_contnet = (re.split(log_time_regex,message)[2])
message = '{"timestamp": "%s", "message":"%s","logGroup" :"%s"}' %(full_log_datetime,log_contnet.replace('"',''),logGroup)
return message
the second function handles authenticator
log group :
def chunkwise(array, size=2):
it = iter(array)
return izip(*[it]*size)
def wrap_text_to_json_and_add_logGroup(message,logGroup):
regex = r"\".*?\"|\w+"
matches = re.findall(regex, message)
key_value_pairs = chunkwise(matches)
json_message= {}
for key_value in key_value_pairs:
key = key_value[0]
if key == 'time':
key = 'timestamp'
value = key_value[1].replace('"','')
json_message[key] = value
json_message['logGroup'] = logGroup
log_to_insert = json.dumps(json_message)
return log_to_insert
I hope these functions are useful for those who might need to insert logs from cloudwatch to elasticsearch.