背景

使用 ELK Stack + kafka 搭建一个日志系统。

日志是不可预测的。在生产环境发生故障需要看日志时,日志可能突然激增并淹没您的日志记录基础结构。为了保护 Logstash 和Elasticsearch 免受此类数据突发攻击,需要部署缓冲机制以充当消息代理。

Apache Kafka是与ELK Stack一起部署的最常见的代理解决方案。通常,Kafka部署在托运人和索引器之间,用作收集数据的入口点:

logloglogfilebeat1kafkafilebeat2filebeat3logstashelasticsearchkibana

本文将展示如何使用 ELK Stack 和 Kafka 部署建立弹性数据管道所需的所有组件:

  • Filebeat –收集日志并将其转发到 Kafka 主题。
  • Kafka –代理数据流并将其排队。
  • Logstash –汇总来自 Kafka 主题的数据,对其进行处理并将其发送到 Elasticsearch。
  • Elasticsearch –索引数据。
  • Kibana –用于分析数据

基础环境

基于 CentOS 7.6 ,一个 Spring boot 的日志作为例子。

/data/spring/gateway/nohup.out

ELK Stack 7.6.x

JDK 1.8 请自行安装

Elasticsearch

目录统一在 /data/elasticsearch

安装

wget https://mirrors.huaweicloud.com/elasticsearch/7.6.2/elasticsearch-7.6.2-x86_64.rpm
rpm -ivh  elasticsearch-7.6.2-x86_64.rpm

配置

/etc/elasticsearch/jvm.options

[root@es1 elasticsearch]# egrep -v "^#|^$" /etc/elasticsearch/jvm.options 
-Xms16g
-Xmx16g
8-13:-XX:+UseConcMarkSweepGC
8-13:-XX:CMSInitiatingOccupancyFraction=75
8-13:-XX:+UseCMSInitiatingOccupancyOnly
14-:-XX:+UseG1GC
14-:-XX:G1ReservePercent=25
14-:-XX:InitiatingHeapOccupancyPercent=30
-Djava.io.tmpdir=${ES_TMPDIR}
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/data/elasticsearch
-XX:ErrorFile=/data/elasticsearch/log/hs_err_pid%p.log
8:-XX:+PrintGCDetails
8:-XX:+PrintGCDateStamps
8:-XX:+PrintTenuringDistribution
8:-XX:+PrintGCApplicationStoppedTime
8:-Xloggc:/data/elasticsearch/log/gc.log
8:-XX:+UseGCLogFileRotation
8:-XX:NumberOfGCLogFiles=32
8:-XX:GCLogFileSize=64m
9-:-Xlog:gc*,gc+age=trace,safepoint:file=/data/elasticsearch/log/gc.log:utctime,pid,tags:filecount=32,filesize=64m

/etc/elasticsearch/elasticsearch.yml

[root@es1 elasticsearch]# egrep -v "^#|^$" /etc/elasticsearch/elasticsearch.yml
cluster.name: smy
node.name: node-1
path.data: /data/elasticsearch
path.logs: /data/elasticsearch/log
network.host: 192.168.252.174
http.port: 9200
discovery.seed_hosts: ["127.0.0.1", "[::1]"]
cluster.initial_master_nodes: ["node-1"]
http.cors.enabled: true
http.cors.allow-origin: /.*/ 

启动

systemctl start elasticsearch.service 
systemctl status elasticsearch.service

检测

浏览器访问http://ip:9200, 可以看到类似如下输出的内容

{
  "name" : "node-1",
  "cluster_name" : "smy",
  "cluster_uuid" : "wsjadseSQkqQQDkVz1gRgw",
  "version" : {
    "number" : "7.6.2",
    "build_flavor" : "default",
    "build_type" : "rpm",
    "build_hash" : "ef48eb35cf30adf4db14086e8aabd07ef6fb113f",
    "build_date" : "2020-03-26T06:34:37.794943Z",
    "build_snapshot" : false,
    "lucene_version" : "8.4.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

Logstash

接下来, 是 Elk + Logstash 中的 “L” 。

安装

wget https://mirrors.cloud.tencent.com/elasticstack/7.x/yum/7.6.2/logstash-7.6.2.rpm
rpm -ivh logstash-7.6.2.rpm

配置

接下来,我们将配置一个 Logstash 管道,该管道从 Kafka 主题提取日志,处理这些日志并将其运送到 Elasticsearch 进行索引。 让我们创建一个新的配置文件:

vim /etc/logstash/conf.d/gateway.conf

内容如下:

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => "apache"
    }
}
filter {
  grok {
    match => [ "message", 
               "(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME})  %{LOGLEVEL:level} %{NUMBER:pid} --- \[(?<thread>[A-Za-z0-9-]+)\] [A-Za-z0-9.]*\.(?<class>[A-Za-z0-9#_]+)\s*:\s+(?<logmessage>.*)",
               "message",
               "(?<timestamp>%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME})  %{LOGLEVEL:level} %{NUMBER:pid} --- .+? :\s+(?<logmessage>.*)"
             ]
  }

  #Parsing out timestamps which are in timestamp field thanks to previous grok section
  date {
    match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss.SSS" ]
  }
}
output {
  elasticsearch {
    hosts => ["192.168.252.174:9200"]
  }
}

我们使用 Logstash Kafka 输入插件来定义 Kafka 主机和我们希望 Logstash 提取的主题。我们正在对日志应用一些筛选,并且将数据运送到本地 Elasticsearch 实例。 保存文件。

Kibana

接下来, 是 Elk + Logstash 中的 “K” 。

安装

wget https://mirrors.huaweicloud.com/kibana/7.6.2/kibana-7.6.2-x86_64.rpm
rpm -ivh ibana-7.6.2-x86_64.rpm

配置

[root@wlj174 ~]# egrep -v "^#|^$" /etc/kibana/kibana.yml 
server.port: 5601
server.host: "192.168.252.174"
elasticsearch.hosts: ["http://192.168.252.174:9200"]
kibana.index: ".kibana"

启动

systemctl start kibana

浏览器打开 http://ip:5601,即可打开 Kibana 首页

Filebeat

安装

wget https://mirrors.cloud.tencent.com/elasticstack/7.x/yum/7.6.2/filebeat-7.6.2-x86_64.rpm
rpm -ivh filebeat-7.6.2-x86_64.rpm 

配置

[root@iZ1767b3udZ filebeat]# cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /data/spring/gateway/nohup.out
output.kafka:
  codec.format:
    string: '%{[@timestamp]} %{[message]}'
  hosts: ["192.168.252.174:9092"]
  topic: apache
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

在输入部分,告诉 Filebeat 要收集哪些日志 gateway 的访问日志。在输出部分,告诉 Filebeat 将数据转发到我们的本地Kafka 服务器和相关主题(将在下一步中安装)。 请注意 codec.format 指令的使用-这是为了确保正确提取 message和 timestamp 字段。否则,这些行将以 JSON 发送到 Kafka。 保存文件。

Kafka

最后来安装我们的 Kafka ,Kafka 使用 zookeeper 来维护配置信息及同步。我们使用现有的 zookeeper

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz
tar zxvf kafka_2.12-2.6.0.tgz

只需要修改配置文件 server.properties 的 zookeeper.connect=192.168.252.174:2181,修改为你自己的 zookeeper 地址即可。

启动

nohup bin/kafka-server-start.sh config/server.properties &

你会看到一些输出:

[2020-09-07 21:38:06,559] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-09-07 21:38:06,564] INFO starting (kafka.server.KafkaServer)
[2020-09-07 21:38:06,565] INFO Connecting to zookeeper on 192.168.252.174:2181 (kafka.server.KafkaServer)
[2020-09-07 21:38:06,583] INFO [ZooKeeperClient Kafka server] Initializing a new session to 192.168.252.174:2181. (kafka.zookeeper.ZooKeeperClient)
[2020-09-07 21:38:06,590] INFO Client environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
...
...

接下来为 gateway 日志创建一个主题(topic)

bin/kafka-topics.sh --create --zookeeper localhost:2181--replication-factor 1 --partitions 1 --topic apacheCreated topic apache

启动数据管道

启动 Filebeat

systemctl start filebeat

启动 Logstash

systemctl start logstash

需要几分钟以后才能在 Elasticsearch 或 Kibana 看到,我也不知道为什么。。。着急的话,可以先在 Kafka 看

[root@wlj174 bin]# ./kafka-console-consumer.sh  --topic apache --bootstrap-server localhost:9092
2020-09-08T14:46:27.308Z 2020-09-08 14:46:17.310 ERROR 59181 --- [or-http-epoll-4] c.w.s.f.e.s.f.StaticRequestDecorator     : 解密失败:Decryption error
2020-09-08T14:46:27.308Z java.lang.NullPointerException
2020-09-08T14:46:52.310Z 2020-09-08 14:46:50.415 ERROR 59181 --- [r-http-epoll-13] c.w.s.f.e.s.f.StaticRequestDecorator     : 解密失败:Decryption error
2020-09-08T14:46:52.310Z java.lang.NullPointerException
...
...

为了确保 Logstash 正在聚合数据并将其运送到Elasticsearch中,请使用:

curl -X GET "lip:9200/_cat/indices?v"
[root@wlj174 bin]# curl -X GET "192.168.252.174:9200/_cat/indices?v"
health status index                      uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana_task_manager_1     ueE-zBPiR6CixvUDfX_9sQ   1   0          2            2       30kb           30kb
green  open   ilm-history-1-000001       WoJuE9kmQs2aYUAK8GBIHQ   1   0         18            0     25.4kb         25.4kb
green  open   .apm-agent-configuration   k8trtCzLSI2DrLnkqsuSTQ   1   0          0            0       283b           283b
green  open   .kibana_1                  HvyvaQK_RRmZHzQyrjmIDQ   1   0         12            5     47.1kb         47.1kb
yellow open   logstash-2020.09.08-000001 WPl0br1CSO2XztOCFcjjCQ   1   1       3265            0    708.3kb        708.3kb

转自:https://www.toutiao.com/i6908227250047730190/