0%

ELK平台搭建

参考链接

ELK官网链接:https://www.elastic.co/

产品页面为:Elasticsearch Logstash Kibana

其中logstashgrok语法调试地址:https://grokdebug.herokuapp.com/,建议在编写grok的时候使用该工具先调试语法

ELK系统架构

整个系统各部分功能以及数据流向

  • logstashlogstash-forwarder负责获取原始日志文件并通过配置文件filter功能进行解析,将数据进行序列化
  • elasticsearch负责将logstash获取的数据存储,供用户搜索,其本身就是搜索引擎
  • kibana通过elasticsearch的接口查询搜索数据,然后利用图展示,即最终用户所查看到的数据,其搜索方面的功能主要是结合elasticsearch

在数据过程中,logstash收集的数据来源比较多,在与elasticsearch进行数据对接的时候可以使用中间缓冲区,防止elasticsearch过载出现503错误,返回给logstash,因此在中间添加缓存队列,logstash直接将数据输出到缓存队列,然后elasticsearch再从缓存队列获取数据,缓存队列可以使用redis或者kafaka,详情可以参考:Output plugin

我们一般的ELK布置是在每一台需要收集日志的服务器上面安装logstash作为日志文件收集客户端,然后将数据发送到缓存队列服务器,然后elasticsearch服务器负责从缓存队列中获取数据保存到索引数据中心,一般该服务器的数据量非常大,需要较大硬盘容量存储,最后安装kibana端,对外开放地址,即可查看和搜索数据.下图为目前的整体架构图:
ELK Struct

安装和配置

logstash安装

  • 下载地址:链接 选择当前最新版本下载即可
  • 解压:tar -xzvf logstash-1.5.2.tar.gz 到目录 /srv/logstash
  • 进入到bin目录,建立配置文件logstash.conf
  • 进入到bin目录,建立grok的过滤文件目录patterns,我们需要多种过滤参数的时候,建立文件将自定义grok文件放到patterns目录下面即可,系统会自动搜索所有的配置,一一对比
  • 执行脚本为:sudo ./run.sh,因为一些日志文件是不同用户产生的,统一使用root用户权限读取

1.文件logstash.conf一般配置参数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
input {
file {
path => "/srv/java/a8_sport/logs/a8_sport.log"
type => "a8_sport_req_1"
}

file {
path => "/var/log/nginx/access.log"
type => "a8_sport_nginx_access_1"
}

file {
path => "/var/log/nginx/error.log"
type => "a8_sport_nginx_error_1"
}
}

filter {
grok {
patterns_dir => "./patterns"
match => [
"message", "%{A8_SPORT_API}",
"message", "%{NGINX_ACCESS}",
"message", "%{NGINX_ACCESS_PROXY}"
]
}

geoip {
source => "client_ip"
}
}

output {
redis {
host => "10.168.94.240"
data_type => list
db => 2
key => "logstash"
port => 6379
}
}
  • input:读取的是日志文件列表
  • filter:过滤参数解析配置
  • output:数据输出,这里是输出到redis

2.patterns文件夹防止自定义的filter配置参数文件,每一个自定义可以建立一个文件,示例内容如下:

1
NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{USERNAME:method} %{URIPATH:client_uri}%{URIPARAM:client_param} HTTP/1.1\" %{INT:status:int} %{INT:body_bytes_sent:int} %{QS:http_refer} %{QS:http_user_agent} \"%{IP:client_ip}\" \"%{NUMBER:req_time:float}\" \"%{INT:port:int}\"

其中NGINX_ACCESS为名称,后面的是内容,使用的时候直接用该名称即可

logstash-fowarder安装

因为logstash所占用内存在客户端过大,避免影响客户机器的服务,增加logstash-forwarder和获取客户端的日志,然后传递到logstash,由logstash保存到kafka,由于logstash-fowardergo语言编写,所以需要先安装go运行环境,由于当前服务器Ubuntu版本库的go语言版本过低,所以需要到官网下载最新版本:

  • go下载地址:链接,这里源码安装方式
  • 安装文档地址:链接
  • 解压后安装命令:cd go/src./all.bash
  • 然后把goPath添加到系统变量PATHvim /etc/profile,添加下面到最后:export GO_PATH=/srv/go/bin,export PATH=${GO_PATH}:$PATH,然后刷新变量:source /etc/profile

安装logstash-forwarder:

  • 下载地址:链接
  • 安装命令:git clone git://github.com/elasticsearch/logstash-forwarder.gitcd logstash-forwarder,go build -o logstash-forwarder

在启动之前需要将logstashlogstash-forwarder配置好,因为之前是通过加密传输的,所以需要证书,首先在logstash所在服务器生成证书:
sudo openssl req -subj '/CN=localhost/' -x509 -batch -nodes -days 3650 -newkey rsa:2048 -keyout ssl/logstash-forward.key -out certs/logstash-forwarder.crt -config cert.conf

其中配置文件cert.conf内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
[req]
distinguished_name = req_distinguished_name
x509_extensions = v3_req
prompt = no

[req_distinguished_name]
C = TG
ST = Togo
L = Lome
O = Private company
CN = *

[v3_req]
subjectKeyIdentifier = hash
authorityKeyIdentifier = keyid,issuer
basicConstraints = CA:TRUE
subjectAltName = @alt_names

[alt_names]
DNS.1 = *
DNS.2 = *.*
DNS.3 = *.*.*
DNS.4 = *.*.*.*
DNS.5 = *.*.*.*.*
DNS.6 = *.*.*.*.*.*
DNS.7 = *.*.*.*.*.*.*
IP.1 = 10.117.110.199

注意IP.1的值为服务端的值,之后会生成两个文件logstash-forwarder.crtlogstash-forward.key,将这两个文件拷贝到logstash-forwarder客户端机器,需要修改二者的启动配置参数。

logstash的数据来源于logstash-forwarder,所以input值为:

1
2
3
4
5
6
7
input {
lumberjack {
port => 5043
ssl_certificate => "/srv/logstash/bin/certs/logstash-forwarder.crt"
ssl_key => "/srv/logstash/bin/certs/logstash-forward.key"
}
}

服务端开启5043端口,去接收客户端的信息,客户端logstash-forwarder配置信息如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"network": {
"servers": [ "10.1.1.1:5043" ],
"ssl key": "./certs/logstash-forward.key",
"ssl ca": "./certs/logstash-forwarder.crt",
"timeout": 15
},
"files": [
{
"paths": [ "/var/log/yuntu_user/yuntu_user_error.log" ],
"fields": { "type": "yuntu_user_error_1" }
},
{
"paths": [ "/var/log/yuntu_user/yuntu_user_common.log" ],
"fields": { "type": "yuntu_user_common_1" }
},
{
"paths": [ "/var/log/nginx/access.log"],
"fields": { "type": "yuntu_user_nginx_access_1" }
},
{
"path": ["/var/log/nginx/error.log"],
"fields": {"type": "yuntu_user_nginx_error_1"}
}
]
}

配置文件为logstash-forwarder.json,链接到服务端,使用服务端生成的证书,敬爱那个日志文件收集然后传输给logstash服务端,注意在客户端不支持过滤等操作,支持一个type,所以其他的过滤信息都转移到服务端的logstash解析

elasticstash安装

  • 下载地址:链接,尽量选择最新版本下载,避免和两外两个软件版本不兼容问题
  • 解压:tar -xzvf elasticsearch-1.6.0.tar.gz到目录/srv/elk/elasticsearch
  • 配置文件目录:cd /srv/elk/elasticsearch/config,分别有两个文件elasticsearch.ymllogging.yml,前一个文件配置elasticsearch大部分的参数,后一个文件是日志配置参数,一般默认日志参数即可
  • 程序执行目录:cd /srv/elk/elasticsearch/bin,执行脚本是:./run.sh

elasticsearch.yml文件常用配置项目:

  • path.logs: /mnt/elasticsearch/log 日志文件路径
  • path.data: /mnt/elasticsearch/data 索引文件存储目录

kibana安装

  • 下载地址:链接,选择最新版版本,避免与elasticsearch版本不兼容的问题
  • 解压:tar -xzvf kibana-4.1.1-linux-x64.tar.gz到目录/srv/elk/kibana
  • 配置文件目录:cd /srv/elk/kibana/config,一般不需要修改,默认端口是5601
  • 执行目录:cd /srv/elk/kibana/bin,执行脚本:./run.sh

run.sh执行文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
PID=$(ps -ef|grep kibana|grep -v grep|awk '{print $2}')
if [ $PID ]
then
echo "kill old Kibana: $PID ..."
kill -9 $PID
fi

echo "run Kibana ..."
nohup ./kibana &

echo "check Kibana proces ..."
ps aux|grep kibana

kafka安装

  • 下载地址:链接,选择对应的scala版本
  • 解压:tar -xzvf kafka_2.9.1-0.8.2.1.tgz到目录/srv/kafka
  • 配置文件目录是config,需要修改log4j.properties日志文件,根据需要可以修改日志打印等级,默认是INFOconsumer.properties消费配置文件,需要修改zookeeper.connect地址,默认是本机的2181端口,zookeeper.properties的配置需要修改数据目录dataDirserver.propertieskafka的配置文件,需要修改zookeeper.connect默认是本机的2181端口,log.dirs为日志文件目录,比较大,num.partitions一般设置为2,其他设置一般默认即可。

kafka初始化执行过程

  • 启动zookeeper服务:./bin/zookeeper-server-start.sh config/zookeeper.properties
  • 启动kafaka服务:./bin/kafka-server-start.sh config/server.properties
  • 创建topic:./bin/kafka-topics.sh --create --zookeeper 10.168.94.240:2181 --replication-factor 1 --partitions 2 --topic logstash,这里创建了logstash话题
  • 检测topic列表:./bin/kafka-topics.sh --list --zookeeper 10.168.94.240:2181
  • 查看topic信息:./bin/kafka-topics.sh --describe --zookeeper 10.168.94.240:2181 --topic logstash
  • 查看是否有消息数据进入队列:./bin/kafka-console-consumer.sh --zookeeper 10.168.94.240:2181 --topic logstash

对应的命令脚本放在kafka根目录下:create_topic.sh run.sh check_topic.sh consumer.sh describe_topic.sh

常用logstash解析配置grok语法

logstash最重要的部分是日志的解析,通过语法规则将原始日志文件解析之后,各字段作为elasticsearch的搜索字段。

日志解析的主要配置是filter里面的grok配置,下面是常用的配置:

1.nginxaccess日志

1
NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{USERNAME:method} %{URIPATH:client_uri}%{URIPARAM:client_param} HTTP/1.1\" %{INT:status:int} %{INT:body_bytes_sent:int} %{QS:http_refer} %{QS:http_user_agent} \"%{IP:client_ip}\" \"%{NUMBER:req_time:float}\" \"%{INT:port:int}\"

2.yuntu_usercommon日志

1
USER_COMMON %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{LOGLEVEL:level} %{DATA:class_name} - %{GREEDYDATA:content}

3.a8_sport的请求日志

1
A8_SPORT_API %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{USERNAME:level} %{DATA:job_class} - \[REQUEST\]%{IP:client_ip}: %{URIPATH:api_url}

4.a8_task的日志

1
A8_TASK_START %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{USERNAME:level}  %{DATA:job_class} - %{USERNAME:job_status} job: %{USERNAME:job_name}...
1
A8_TASK_END %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{USERNAME:level}  %{DATA:job_class} - %{USERNAME:job_status} job: %{USERNAME:job_name}. time: %{INT:exec_time:int} seconds.

语法文档参考:官方文档
常用的一些解析字段:Github
语法调试:Debug

后期维护

  • 目前使用的是kafka作为缓存数据队列,定期检测数据队列是否过大,按照实际情况和磁盘大小设置过期时间会自动删除旧数据
  • elasticsearch索引数据大小是没有限制的,定期检测磁盘容量,增加定时任务删除不必要的索引数据