参考链接
ELK官网链接:https://www.elastic.co/
产品页面为:Elasticsearch Logstash Kibana
其中logstash的grok语法调试地址:https://grokdebug.herokuapp.com/,建议在编写grok的时候使用该工具先调试语法
ELK系统架构
整个系统各部分功能以及数据流向
logstash和logstash-forwarder负责获取原始日志文件并通过配置文件filter功能进行解析,将数据进行序列化elasticsearch负责将logstash获取的数据存储,供用户搜索,其本身就是搜索引擎kibana通过elasticsearch的接口查询搜索数据,然后利用图展示,即最终用户所查看到的数据,其搜索方面的功能主要是结合elasticsearch
在数据过程中,logstash收集的数据来源比较多,在与elasticsearch进行数据对接的时候可以使用中间缓冲区,防止elasticsearch过载出现503错误,返回给logstash,因此在中间添加缓存队列,logstash直接将数据输出到缓存队列,然后elasticsearch再从缓存队列获取数据,缓存队列可以使用redis或者kafaka,详情可以参考:Output plugin
我们一般的ELK布置是在每一台需要收集日志的服务器上面安装logstash作为日志文件收集客户端,然后将数据发送到缓存队列服务器,然后elasticsearch服务器负责从缓存队列中获取数据保存到索引数据中心,一般该服务器的数据量非常大,需要较大硬盘容量存储,最后安装kibana端,对外开放地址,即可查看和搜索数据.下图为目前的整体架构图:
安装和配置
logstash安装
- 下载地址:链接 选择当前最新版本下载即可
- 解压:
tar -xzvf logstash-1.5.2.tar.gz到目录/srv/logstash - 进入到
bin目录,建立配置文件logstash.conf - 进入到
bin目录,建立grok的过滤文件目录patterns,我们需要多种过滤参数的时候,建立文件将自定义grok文件放到patterns目录下面即可,系统会自动搜索所有的配置,一一对比 - 执行脚本为:
sudo ./run.sh,因为一些日志文件是不同用户产生的,统一使用root用户权限读取
1.文件logstash.conf一般配置参数如下:
1 | input { |
input:读取的是日志文件列表filter:过滤参数解析配置output:数据输出,这里是输出到redis
2.patterns文件夹防止自定义的filter配置参数文件,每一个自定义可以建立一个文件,示例内容如下:
1 | NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{USERNAME:method} %{URIPATH:client_uri}%{URIPARAM:client_param} HTTP/1.1\" %{INT:status:int} %{INT:body_bytes_sent:int} %{QS:http_refer} %{QS:http_user_agent} \"%{IP:client_ip}\" \"%{NUMBER:req_time:float}\" \"%{INT:port:int}\" |
其中NGINX_ACCESS为名称,后面的是内容,使用的时候直接用该名称即可
logstash-fowarder安装
因为logstash所占用内存在客户端过大,避免影响客户机器的服务,增加logstash-forwarder和获取客户端的日志,然后传递到logstash,由logstash保存到kafka,由于logstash-fowarder是go语言编写,所以需要先安装go运行环境,由于当前服务器Ubuntu版本库的go语言版本过低,所以需要到官网下载最新版本:
go下载地址:链接,这里源码安装方式- 安装文档地址:链接
- 解压后安装命令:
cd go/src、./all.bash - 然后把
goPath添加到系统变量PATH:vim /etc/profile,添加下面到最后:export GO_PATH=/srv/go/bin,export PATH=${GO_PATH}:$PATH,然后刷新变量:source /etc/profile
安装logstash-forwarder:
- 下载地址:链接
- 安装命令:
git clone git://github.com/elasticsearch/logstash-forwarder.git、cd logstash-forwarder,go build -o logstash-forwarder
在启动之前需要将logstash和logstash-forwarder配置好,因为之前是通过加密传输的,所以需要证书,首先在logstash所在服务器生成证书:sudo openssl req -subj '/CN=localhost/' -x509 -batch -nodes -days 3650 -newkey rsa:2048 -keyout ssl/logstash-forward.key -out certs/logstash-forwarder.crt -config cert.conf
其中配置文件cert.conf内容如下:
1 | [req] |
注意IP.1的值为服务端的值,之后会生成两个文件logstash-forwarder.crt和logstash-forward.key,将这两个文件拷贝到logstash-forwarder客户端机器,需要修改二者的启动配置参数。
logstash的数据来源于logstash-forwarder,所以input值为:
1 | input { |
服务端开启5043端口,去接收客户端的信息,客户端logstash-forwarder配置信息如下:
1 | { |
配置文件为logstash-forwarder.json,链接到服务端,使用服务端生成的证书,敬爱那个日志文件收集然后传输给logstash服务端,注意在客户端不支持过滤等操作,支持一个type,所以其他的过滤信息都转移到服务端的logstash解析
elasticstash安装
- 下载地址:链接,尽量选择最新版本下载,避免和两外两个软件版本不兼容问题
- 解压:
tar -xzvf elasticsearch-1.6.0.tar.gz到目录/srv/elk/elasticsearch - 配置文件目录:
cd /srv/elk/elasticsearch/config,分别有两个文件elasticsearch.yml和logging.yml,前一个文件配置elasticsearch大部分的参数,后一个文件是日志配置参数,一般默认日志参数即可 - 程序执行目录:
cd /srv/elk/elasticsearch/bin,执行脚本是:./run.sh
elasticsearch.yml文件常用配置项目:
path.logs: /mnt/elasticsearch/log日志文件路径path.data: /mnt/elasticsearch/data索引文件存储目录
kibana安装
- 下载地址:链接,选择最新版版本,避免与
elasticsearch版本不兼容的问题 - 解压:
tar -xzvf kibana-4.1.1-linux-x64.tar.gz到目录/srv/elk/kibana - 配置文件目录:
cd /srv/elk/kibana/config,一般不需要修改,默认端口是5601 - 执行目录:
cd /srv/elk/kibana/bin,执行脚本:./run.sh
run.sh执行文件内容:
1 | PID=$(ps -ef|grep kibana|grep -v grep|awk '{print $2}') |
kafka安装
- 下载地址:链接,选择对应的
scala版本 - 解压:
tar -xzvf kafka_2.9.1-0.8.2.1.tgz到目录/srv/kafka - 配置文件目录是
config,需要修改log4j.properties日志文件,根据需要可以修改日志打印等级,默认是INFO,consumer.properties消费配置文件,需要修改zookeeper.connect地址,默认是本机的2181端口,zookeeper.properties的配置需要修改数据目录dataDir,server.properties是kafka的配置文件,需要修改zookeeper.connect默认是本机的2181端口,log.dirs为日志文件目录,比较大,num.partitions一般设置为2,其他设置一般默认即可。
kafka初始化执行过程
- 启动
zookeeper服务:./bin/zookeeper-server-start.sh config/zookeeper.properties - 启动
kafaka服务:./bin/kafka-server-start.sh config/server.properties - 创建
topic:./bin/kafka-topics.sh --create --zookeeper 10.168.94.240:2181 --replication-factor 1 --partitions 2 --topic logstash,这里创建了logstash话题 - 检测
topic列表:./bin/kafka-topics.sh --list --zookeeper 10.168.94.240:2181 - 查看
topic信息:./bin/kafka-topics.sh --describe --zookeeper 10.168.94.240:2181 --topic logstash - 查看是否有消息数据进入队列:
./bin/kafka-console-consumer.sh --zookeeper 10.168.94.240:2181 --topic logstash
对应的命令脚本放在kafka根目录下:create_topic.sh run.sh check_topic.sh consumer.sh describe_topic.sh
常用logstash解析配置grok语法
logstash最重要的部分是日志的解析,通过语法规则将原始日志文件解析之后,各字段作为elasticsearch的搜索字段。
日志解析的主要配置是filter里面的grok配置,下面是常用的配置:
1.nginx的access日志
1 | NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{USERNAME:method} %{URIPATH:client_uri}%{URIPARAM:client_param} HTTP/1.1\" %{INT:status:int} %{INT:body_bytes_sent:int} %{QS:http_refer} %{QS:http_user_agent} \"%{IP:client_ip}\" \"%{NUMBER:req_time:float}\" \"%{INT:port:int}\" |
2.yuntu_user的common日志
1 | USER_COMMON %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{LOGLEVEL:level} %{DATA:class_name} - %{GREEDYDATA:content} |
3.a8_sport的请求日志
1 | A8_SPORT_API %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{USERNAME:level} %{DATA:job_class} - \[REQUEST\]%{IP:client_ip}: %{URIPATH:api_url} |
4.a8_task的日志
1 | A8_TASK_START %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{USERNAME:level} %{DATA:job_class} - %{USERNAME:job_status} job: %{USERNAME:job_name}... |
1 | A8_TASK_END %{TIMESTAMP_ISO8601:time} \[%{USERNAME:thread}\] %{USERNAME:level} %{DATA:job_class} - %{USERNAME:job_status} job: %{USERNAME:job_name}. time: %{INT:exec_time:int} seconds. |
语法文档参考:官方文档
常用的一些解析字段:Github
语法调试:Debug
后期维护
- 目前使用的是
kafka作为缓存数据队列,定期检测数据队列是否过大,按照实际情况和磁盘大小设置过期时间会自动删除旧数据 elasticsearch索引数据大小是没有限制的,定期检测磁盘容量,增加定时任务删除不必要的索引数据