背景
使用 ELK Stack + kafka 搭建一个日志系统。
日志是不可预测的。在生产环境发生故障需要看日志时,日志可能突然激增并淹没您的日志记录基础结构。为了保护 Logstash 和Elasticsearch 免受此类数据突发攻击,需要部署缓冲机制以充当消息代理。
Apache Kafka是与ELK Stack一起部署的最常见的代理解决方案。通常,Kafka部署在托运人和索引器之间,用作收集数据的入口点:
logloglogfilebeat1kafkafilebeat2filebeat3logstashelasticsearchkibana
本文将展示如何使用 ELK Stack 和 Kafka 部署建立弹性数据管道所需的所有组件:
- Filebeat –收集日志并将其转发到 Kafka 主题。
- Kafka –代理数据流并将其排队。
- Logstash –汇总来自 Kafka 主题的数据,对其进行处理并将其发送到 Elasticsearch。
- Elasticsearch –索引数据。
- Kibana –用于分析数据
基础环境
基于 CentOS 7.6 ,一个 Spring boot 的日志作为例子。
/data/spring/gateway/nohup.out
ELK Stack 7.6.x
JDK 1.8 请自行安装
Elasticsearch
目录统一在 /data/elasticsearch
安装
wget https://mirrors.huaweicloud.com/elasticsearch/7.6.2/elasticsearch-7.6.2-x86_64.rpm
rpm -ivh elasticsearch-7.6.2-x86_64.rpm
配置
/etc/elasticsearch/jvm.options
[root@es1 elasticsearch]# egrep -v "^#|^$" /etc/elasticsearch/jvm.options
-Xms16g
-Xmx16g
8-13:-XX:+UseConcMarkSweepGC
8-13:-XX:CMSInitiatingOccupancyFraction=75
8-13:-XX:+UseCMSInitiatingOccupancyOnly
14-:-XX:+UseG1GC
14-:-XX:G1ReservePercent=25
14-:-XX:InitiatingHeapOccupancyPercent=30
-Djava.io.tmpdir=${ES_TMPDIR}
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/data/elasticsearch
-XX:ErrorFile=/data/elasticsearch/log/hs_err_pid%p.log
8:-XX:+PrintGCDetails
8:-XX:+PrintGCDateStamps
8:-XX:+PrintTenuringDistribution
8:-XX:+PrintGCApplicationStoppedTime
8:-Xloggc:/data/elasticsearch/log/gc.log
8:-XX:+UseGCLogFileRotation
8:-XX:NumberOfGCLogFiles=32
8:-XX:GCLogFileSize=64m
9-:-Xlog:gc*,gc+age=trace,safepoint:file=/data/elasticsearch/log/gc.log:utctime,pid,tags:filecount=32,filesize=64m
/etc/elasticsearch/elasticsearch.yml
[root@es1 elasticsearch]# egrep -v "^#|^$" /etc/elasticsearch/elasticsearch.yml
cluster.name: smy
node.name: node-1
path.data: /data/elasticsearch
path.logs: /data/elasticsearch/log
network.host: 192.168.252.174
http.port: 9200
discovery.seed_hosts: ["127.0.0.1", "[::1]"]
cluster.initial_master_nodes: ["node-1"]
http.cors.enabled: true
http.cors.allow-origin: /.*/
启动
systemctl start elasticsearch.service
systemctl status elasticsearch.service
检测
浏览器访问http://ip:9200, 可以看到类似如下输出的内容
{
"name" : "node-1",
"cluster_name" : "smy",
"cluster_uuid" : "wsjadseSQkqQQDkVz1gRgw",
"version" : {
"number" : "7.6.2",
"build_flavor" : "default",
"build_type" : "rpm",
"build_hash" : "ef48eb35cf30adf4db14086e8aabd07ef6fb113f",
"build_date" : "2020-03-26T06:34:37.794943Z",
"build_snapshot" : false,
"lucene_version" : "8.4.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
Logstash
接下来, 是 Elk + Logstash 中的 “L” 。
安装
wget https://mirrors.cloud.tencent.com/elasticstack/7.x/yum/7.6.2/logstash-7.6.2.rpm
rpm -ivh logstash-7.6.2.rpm
配置
接下来,我们将配置一个 Logstash 管道,该管道从 Kafka 主题提取日志,处理这些日志并将其运送到 Elasticsearch 进行索引。 让我们创建一个新的配置文件:
vim /etc/logstash/conf.d/gateway.conf
内容如下:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => "apache"
}
}
filter {
grok {
match => [ "message",
"(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}) %{LOGLEVEL:level} %{NUMBER:pid} --- \[(?<thread>[A-Za-z0-9-]+)\] [A-Za-z0-9.]*\.(?<class>[A-Za-z0-9#_]+)\s*:\s+(?<logmessage>.*)",
"message",
"(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}) %{LOGLEVEL:level} %{NUMBER:pid} --- .+? :\s+(?<logmessage>.*)"
]
}
#Parsing out timestamps which are in timestamp field thanks to previous grok section
date {
match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss.SSS" ]
}
}
output {
elasticsearch {
hosts => ["192.168.252.174:9200"]
}
}
我们使用 Logstash Kafka 输入插件来定义 Kafka 主机和我们希望 Logstash 提取的主题。我们正在对日志应用一些筛选,并且将数据运送到本地 Elasticsearch 实例。 保存文件。
Kibana
接下来, 是 Elk + Logstash 中的 “K” 。
安装
wget https://mirrors.huaweicloud.com/kibana/7.6.2/kibana-7.6.2-x86_64.rpm
rpm -ivh ibana-7.6.2-x86_64.rpm
配置
[root@wlj174 ~]# egrep -v "^#|^$" /etc/kibana/kibana.yml
server.port: 5601
server.host: "192.168.252.174"
elasticsearch.hosts: ["http://192.168.252.174:9200"]
kibana.index: ".kibana"
启动
systemctl start kibana
浏览器打开 http://ip:5601,即可打开 Kibana 首页
Filebeat
安装
wget https://mirrors.cloud.tencent.com/elasticstack/7.x/yum/7.6.2/filebeat-7.6.2-x86_64.rpm
rpm -ivh filebeat-7.6.2-x86_64.rpm
配置
[root@iZ1767b3udZ filebeat]# cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /data/spring/gateway/nohup.out
output.kafka:
codec.format:
string: '%{[@timestamp]} %{[message]}'
hosts: ["192.168.252.174:9092"]
topic: apache
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
在输入部分,告诉 Filebeat 要收集哪些日志 gateway 的访问日志。在输出部分,告诉 Filebeat 将数据转发到我们的本地Kafka 服务器和相关主题(将在下一步中安装)。 请注意 codec.format 指令的使用-这是为了确保正确提取 message和 timestamp 字段。否则,这些行将以 JSON 发送到 Kafka。 保存文件。
Kafka
最后来安装我们的 Kafka ,Kafka 使用 zookeeper 来维护配置信息及同步。我们使用现有的 zookeeper
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
tar zxvf kafka_2.12-2.6.0.tgz
只需要修改配置文件 server.properties 的 zookeeper.connect=192.168.252.174:2181,修改为你自己的 zookeeper 地址即可。
启动
nohup bin/kafka-server-start.sh config/server.properties &
你会看到一些输出:
[2020-09-07 21:38:06,559] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-09-07 21:38:06,564] INFO starting (kafka.server.KafkaServer)
[2020-09-07 21:38:06,565] INFO Connecting to zookeeper on 192.168.252.174:2181 (kafka.server.KafkaServer)
[2020-09-07 21:38:06,583] INFO [ZooKeeperClient Kafka server] Initializing a new session to 192.168.252.174:2181. (kafka.zookeeper.ZooKeeperClient)
[2020-09-07 21:38:06,590] INFO Client environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
...
...
接下来为 gateway 日志创建一个主题(topic)
bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1 --partitions 1 --topic apacheCreated topic apache
启动数据管道
启动 Filebeat
systemctl start filebeat
启动 Logstash
systemctl start logstash
需要几分钟以后才能在 Elasticsearch 或 Kibana 看到,我也不知道为什么。。。着急的话,可以先在 Kafka 看
[root@wlj174 bin]# ./kafka-console-consumer.sh --topic apache --bootstrap-server localhost:9092
2020-09-08T14:46:27.308Z 2020-09-08 14:46:17.310 ERROR 59181 --- [or-http-epoll-4] c.w.s.f.e.s.f.StaticRequestDecorator : 解密失败:Decryption error
2020-09-08T14:46:27.308Z java.lang.NullPointerException
2020-09-08T14:46:52.310Z 2020-09-08 14:46:50.415 ERROR 59181 --- [r-http-epoll-13] c.w.s.f.e.s.f.StaticRequestDecorator : 解密失败:Decryption error
2020-09-08T14:46:52.310Z java.lang.NullPointerException
...
...
为了确保 Logstash 正在聚合数据并将其运送到Elasticsearch中,请使用:
curl -X GET "lip:9200/_cat/indices?v"
[root@wlj174 bin]# curl -X GET "192.168.252.174:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .kibana_task_manager_1 ueE-zBPiR6CixvUDfX_9sQ 1 0 2 2 30kb 30kb
green open ilm-history-1-000001 WoJuE9kmQs2aYUAK8GBIHQ 1 0 18 0 25.4kb 25.4kb
green open .apm-agent-configuration k8trtCzLSI2DrLnkqsuSTQ 1 0 0 0 283b 283b
green open .kibana_1 HvyvaQK_RRmZHzQyrjmIDQ 1 0 12 5 47.1kb 47.1kb
yellow open logstash-2020.09.08-000001 WPl0br1CSO2XztOCFcjjCQ 1 1 3265 0 708.3kb 708.3kb