通过实时监控日志里的敏感信息来实现对应用的监控报警

背景

项目有一些非常重要的后台应用是跑在 AWS 的 lambda 上的,老板和产品非常关注这些应用的执行情况,一旦出错,都会是很严重的故障。

方案变迁

前面做过一些基础设施级别的监控报警,如:监控 AWS 的 RDS 并通过企业微信来报警,那个完全是利用基础设施自动打到 CloudWatch 的基础的 metrics 来做的。

于是,我这里下意识的就想利用现有的 CloudWatch 里现成的 Metrics 来做这个事情,于是我就弄了三个(种)监控:

  • 利用官方文档 里提到的 lambda 的 Errors 这个基础 Metric 做了一个
  • 利用官方文档 里提到的 API Gateway 的基础 Metrics: 4XXError 和 5XXError 做了一个(其实是两个,4XX、5XX 各一个)
  • 然后又用给 lambda function 的日志新建 metric filter 的方法,新建了一个 metric,用来统计日志里敏感信息信息的次数,最后在 CloudWatch 里用这个新建的 metric 来做的报警

用这三种方案有一个好处,就是可以沿用之前已有的监控 AWS 的 RDS 并通过企业微信来报警里的 CloudWatch->SNS->Lambda function 这种现成的框架。

但等做完了,测试过报警信息了,才发现这三种方案都有一个共同的缺点:这三种都是基于 metrics 来做的报警,但是 metrics 其实只关注 metrics 的数量,报警依赖的是数量跟阈值的比较,报出来的上下文信息也只能是老状态是啥、新状态是啥;为什么触发报警(metric 次数超过阈值什么的),完完全全不能带出原始日志里的信息。所以,往往关心出错具体信息的技术收到报警以后也会一头雾水,完全不知道哪里出错。

所以,改方案了,在重要的 lambda function 的日志里,新建一个 Lambda subscribtion filter,设置当在日志里发现敏感信息后,触发一个 Lambda function: A。在这个名叫 A 的 Lambda function 里,实现报警的功能。这个方案神似当年我们在实体机时代通过 tail -f xxxx.log | grep "xxxx" | /path/to/a.py 来检测重要程序的日志里的关键词“xxxx”来报警的套路。

具体步骤

编写 Lambda function

名叫 “A” 的报警 Lambda function 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import json
import os
import gzip
import base64
import http.client
import urllib.parse

WEBHOOK_TOKEN = os.environ['WX_TOKEN'] # 你的企业微信 Webhook Token
WEBHOOK_URL = "qyapi.weixin.qq.com"

def send_to_wechat_work(account_id, region_name, log_group_name, log_stream_name, message):
# URL编码日志组和日志流名称
log_group_encoded = urllib.parse.quote(log_group_name, safe='')
log_stream_encoded = urllib.parse.quote(log_stream_name, safe='')
# 构造CloudWatch日志链接
log_url = f"https://console.aws.amazon.com/cloudwatch/home?region={region_name}#logsV2:log-groups/log-group/{log_group_encoded}/log-events/{log_stream_encoded}"
# 创建企业微信消息格式
wechat_msg = {
"msgtype": "markdown",
"markdown": {
"content": f"#### :rotating_light: Lambda Error Alert :rotating_light:\n"
f"**Account ID:**{account_id}\n\n"
f"**Log Group:**{log_group_name}\n\n"
f"**Log Stream:**{log_stream_name}\n\n"
f"**Message:**\n\n"
f"```\n{message}\n```\n\n" # Markdown code block
f"[Click here to view the log]({log_url})"
}
}
# 发送POST请求至企业微信
conn = http.client.HTTPSConnection(WEBHOOK_URL)
headers = {'Content-Type': 'application/json'}
body = json.dumps(wechat_msg)
conn.request("POST", f"/cgi-bin/webhook/send?key={WEBHOOK_TOKEN}", body, headers)
response = conn.getresponse()
data = response.read()
conn.close()
return data

def lambda_handler(event, context):
try:
# 解压缩日志数据
log_data = base64.b64decode(event['awslogs']['data'])
uncompressed_data = gzip.decompress(log_data)
log_events_data = json.loads(uncompressed_data)

except Exception as e:
print(f"Error processing log data: {str(e)}")
return {'statusCode': 500, 'body': json.dumps('Error processing log data')}

# 获取日志组和流信息,并提取账户 ID 和区域(region)名
log_group_name = log_events_data.get('logGroup', 'Unknown log group')
log_stream_name = log_events_data.get('logStream', 'Unknown log stream')
account_id = context.invoked_function_arn.split(":")[4]
region_name = context.invoked_function_arn.split(":")[3]

# 过滤并发送每个有效的日志消息
messages_sent = 0
for log_event in log_events_data.get('logEvents', []):
message = log_event.get('message').strip()
if message: # 检查消息不是空的或者只包含空白字符
print("message:", message)
send_to_wechat_work(account_id, region_name, log_group_name, log_stream_name, message)
messages_sent += 1

if messages_sent == 0:
print("No valid log events to send.")

return {'statusCode': 200, 'body': json.dumps('Messages sent to WeChat Work')}

建立 Lambda subscribtion filter 并指向上一步创建的 Lambda function

在 Lambda 的页面里,找到需要监控报警的那个“重要”的后台应用:”B”,点击进入 “B” 的页面,点击 “Configration” 这个 tab,然后在左侧栏里点击 “Monitoring and operations tools”,这时,在中间栏的 “Logging configuration” 部分就能看到 “CloudWatch log group” 了,大概是像这样:“/aws/lambda/B”,同时这也还是一个链接,直接点击就会到 B 在 CloudWatch 里的 Log group 了。

在这个具体的 Log group 的页面里,点击 “Subscription filters” 这个 tab,点击右侧叫 “Create” 的下拉按钮,在弹出来的菜单里点击 “Create Lambda subscription filter”,然后

  • 在 “Lambda function” 里选择之前的那个叫 “A” 的 Lambda function
  • “Subscription filter pattern” 里填上匹配日志的信息,比如我用的是“%xxxx%”
  • “Subscription filter name” 随便写

然后,点击右下按钮 “Start streaming” 就可以了。

总结

都很简单是吧,的确很简单。但是没用过 AWS 或对 AWS 不熟悉的人来说,估计是想不到 AWS 还有这些“奇奇怪怪”的功能的。顺便说一个冷知识,在 AWS 的 Serverless Application Repository 这个服务下的 Available applications 里,有个叫 “WeChat-Notifier” 的应用,直接可以用来做报警!直接支持微信和企业微信。所以呀,其实就报警到(企业)微信这事儿,原本都不用写代码的,直接调用这个就好了!下次有时间再写写关于 “WeChat-Notifier” 的内容。