参考链接
ELK官网链接:https://www.elastic.co/
产品页面为:Elasticsearch Logstash Kibana
其中logstash
的grok
语法调试地址:https://grokdebug.herokuapp.com/,建议在编写grok
的时候使用该工具先调试语法
ELK系统架构
整个系统各部分功能以及数据流向
logstash
和logstash-forwarder
负责获取原始日志文件并通过配置文件filter
功能进行解析,将数据进行序列化elasticsearch
负责将logstash
获取的数据存储,供用户搜索,其本身就是搜索引擎kibana
通过elasticsearch
的接口查询搜索数据,然后利用图展示,即最终用户所查看到的数据,其搜索方面的功能主要是结合elasticsearch
在数据过程中,logstash
收集的数据来源比较多,在与elasticsearch
进行数据对接的时候可以使用中间缓冲区,防止elasticsearch
过载出现503
错误,返回给logstash
,因此在中间添加缓存队列,logstash
直接将数据输出到缓存队列,然后elasticsearch
再从缓存队列获取数据,缓存队列可以使用redis
或者kafaka
,详情可以参考:Output plugin
我们一般的ELK
布置是在每一台需要收集日志的服务器上面安装logstash
作为日志文件收集客户端,然后将数据发送到缓存队列服务器,然后elasticsearch
服务器负责从缓存队列中获取数据保存到索引数据中心,一般该服务器的数据量非常大,需要较大硬盘容量存储,最后安装kibana
端,对外开放地址,即可查看和搜索数据.下图为目前的整体架构图:
安装和配置
logstash安装
- 下载地址:链接 选择当前最新版本下载即可
- 解压:
tar -xzvf logstash-1.5.2.tar.gz
到目录/srv/logstash
- 进入到
bin
目录,建立配置文件logstash.conf
- 进入到
bin
目录,建立grok
的过滤文件目录patterns
,我们需要多种过滤参数的时候,建立文件将自定义grok
文件放到patterns
目录下面即可,系统会自动搜索所有的配置,一一对比 - 执行脚本为:
sudo ./run.sh
,因为一些日志文件是不同用户产生的,统一使用root
用户权限读取
1.文件logstash.conf
一般配置参数如下:
1 | input { |
input
:读取的是日志文件列表filter
:过滤参数解析配置output
:数据输出,这里是输出到redis
2.patterns
文件夹防止自定义的filter
配置参数文件,每一个自定义可以建立一个文件,示例内容如下:
1 | NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{USERNAME:method} %{URIPATH:client_uri}%{URIPARAM:client_param} HTTP/1.1\" %{INT:status:int} %{INT:body_bytes_sent:int} %{QS:http_refer} %{QS:http_user_agent} \"%{IP:client_ip}\" \"%{NUMBER:req_time:float}\" \"%{INT:port:int}\" |
其中NGINX_ACCESS
为名称,后面的是内容,使用的时候直接用该名称即可
logstash-fowarder安装
因为logstash
所占用内存在客户端过大,避免影响客户机器的服务,增加logstash-forwarder
和获取客户端的日志,然后传递到logstash
,由logstash
保存到kafka
,由于logstash-fowarder
是go
语言编写,所以需要先安装go
运行环境,由于当前服务器Ubuntu
版本库的go
语言版本过低,所以需要到官网下载最新版本:
go
下载地址:链接,这里源码安装方式- 安装文档地址:链接
- 解压后安装命令:
cd go/src
、./all.bash
- 然后把
goPath
添加到系统变量PATH
:vim /etc/profile
,添加下面到最后:export GO_PATH=/srv/go/bin
,export PATH=${GO_PATH}:$PATH
,然后刷新变量:source /etc/profile
安装logstash-forwarder
:
- 下载地址:链接
- 安装命令:
git clone git://github.com/elasticsearch/logstash-forwarder.git
、cd logstash-forwarder
,go build -o logstash-forwarder
在启动之前需要将logstash
和logstash-forwarder
配置好,因为之前是通过加密传输的,所以需要证书,首先在logstash
所在服务器生成证书:sudo openssl req -subj '/CN=localhost/' -x509 -batch -nodes -days 3650 -newkey rsa:2048 -keyout ssl/logstash-forward.key -out certs/logstash-forwarder.crt -config cert.conf
其中配置文件cert.conf
内容如下:
1 | [req] |
注意IP.1
的值为服务端的值,之后会生成两个文件logstash-forwarder.crt
和logstash-forward.key
,将这两个文件拷贝到logstash-forwarder
客户端机器,需要修改二者的启动配置参数。
logstash
的数据来源于logstash-forwarder
,所以input
值为:
1 | input { |
服务端开启5043
端口,去接收客户端的信息,客户端logstash-forwarder
配置信息如下:
1 | { |
配置文件为logstash-forwarder.json
,链接到服务端,使用服务端生成的证书,敬爱那个日志文件收集然后传输给logstash
服务端,注意在客户端不支持过滤等操作,支持一个type
,所以其他的过滤信息都转移到服务端的logstash
解析
elasticstash安装
- 下载地址:链接,尽量选择最新版本下载,避免和两外两个软件版本不兼容问题
- 解压:
tar -xzvf elasticsearch-1.6.0.tar.gz
到目录/srv/elk/elasticsearch
- 配置文件目录:
cd /srv/elk/elasticsearch/config
,分别有两个文件elasticsearch.yml
和logging.yml
,前一个文件配置elasticsearch
大部分的参数,后一个文件是日志配置参数,一般默认日志参数即可 - 程序执行目录:
cd /srv/elk/elasticsearch/bin
,执行脚本是:./run.sh
elasticsearch.yml
文件常用配置项目:
path.logs: /mnt/elasticsearch/log
日志文件路径path.data: /mnt/elasticsearch/data
索引文件存储目录
kibana安装
- 下载地址:链接,选择最新版版本,避免与
elasticsearch
版本不兼容的问题 - 解压:
tar -xzvf kibana-4.1.1-linux-x64.tar.gz
到目录/srv/elk/kibana
- 配置文件目录:
cd /srv/elk/kibana/config
,一般不需要修改,默认端口是5601
- 执行目录:
cd /srv/elk/kibana/bin
,执行脚本:./run.sh
run.sh
执行文件内容:
1 | PID=$(ps -ef|grep kibana|grep -v grep|awk '{print $2}') |
kafka安装
- 下载地址:链接,选择对应的
scala
版本 - 解压:
tar -xzvf kafka_2.9.1-0.8.2.1.tgz
到目录/srv/kafka
- 配置文件目录是
config
,需要修改log4j.properties
日志文件,根据需要可以修改日志打印等级,默认是INFO
,consumer.properties
消费配置文件,需要修改zookeeper.connect
地址,默认是本机的2181
端口,zookeeper.properties
的配置需要修改数据目录dataDir
,server.properties
是kafka
的配置文件,需要修改zookeeper.connect
默认是本机的2181
端口,log.dirs
为日志文件目录,比较大,num.partitions
一般设置为2,其他设置一般默认即可。
kafka
初始化执行过程
- 启动
zookeeper
服务:./bin/zookeeper-server-start.sh config/zookeeper.properties
- 启动
kafaka
服务:./bin/kafka-server-start.sh config/server.properties
- 创建
topic
:./bin/kafka-topics.sh --create --zookeeper 10.168.94.240:2181 --replication-factor 1 --partitions 2 --topic logstash
,这里创建了logstash
话题 - 检测
topic
列表:./bin/kafka-topics.sh --list --zookeeper 10.168.94.240:2181
- 查看
topic
信息:./bin/kafka-topics.sh --describe --zookeeper 10.168.94.240:2181 --topic logstash
- 查看是否有消息数据进入队列:
./bin/kafka-console-consumer.sh --zookeeper 10.168.94.240:2181 --topic logstash
对应的命令脚本放在kafka
根目录下:create_topic.sh
run.sh
check_topic.sh
consumer.sh
describe_topic.sh
常用logstash解析配置grok语法
logstash
最重要的部分是日志的解析,通过语法规则将原始日志文件解析之后,各字段作为elasticsearch
的搜索字段。
日志解析的主要配置是filter
里面的grok
配置,下面是常用的配置:
1.nginx
的access
日志
1 | NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{USERNAME:method} %{URIPATH:client_uri}%{URIPARAM:client_param} HTTP/1.1\" %{INT:status:int} %{INT:body_bytes_sent:int} %{QS:http_refer} %{QS:http_user_agent} \"%{IP:client_ip}\" \"%{NUMBER:req_time:float}\" \"%{INT:port:int}\" |
2.yuntu_user
的common
日志
1 | USER_COMMON %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{LOGLEVEL:level} %{DATA:class_name} - %{GREEDYDATA:content} |
3.a8_sport
的请求日志
1 | A8_SPORT_API %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{USERNAME:level} %{DATA:job_class} - \[REQUEST\]%{IP:client_ip}: %{URIPATH:api_url} |
4.a8_task
的日志
1 | A8_TASK_START %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{USERNAME:level} %{DATA:job_class} - %{USERNAME:job_status} job: %{USERNAME:job_name}... |
1 | A8_TASK_END %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{USERNAME:level} %{DATA:job_class} - %{USERNAME:job_status} job: %{USERNAME:job_name}. time: %{INT:exec_time:int} seconds. |
语法文档参考:官方文档
常用的一些解析字段:Github
语法调试:Debug
后期维护
- 目前使用的是
kafka
作为缓存数据队列,定期检测数据队列是否过大,按照实际情况和磁盘大小设置过期时间会自动删除旧数据 elasticsearch
索引数据大小是没有限制的,定期检测磁盘容量,增加定时任务删除不必要的索引数据