1.安装配置nginx(nginx安装请看这里 传送门)
2.配置nginx日志格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
log_format main_json '{"project":"${level1domain}","domain":"${level1domain}_${level2domain}","real_ip":"$real_ip","http_x_forwarded_for":"$http_x_forwarded_for","time_local":"$time_iso8601"," request":"$request_short","request_body":"$request_body_short","status":$status,"body_bytes_sent":"$body_bytes_sent","http_referer":"$http_referer_short","upstream_response_time":"$upstream_re sponse_time","request_time":"$request_time","http_user_agent":"$http_user_agent"}'; 2.或者用这个,logstash 配置文件里 output都不用配置模板文件,更简单 log_format main '{"@timestamp":"$time_iso8601",' '"host":"$server_addr",' '"clientip":"$remote_addr",' '"size":"$body_bytes_sent" ,' '"respnsetime":"$request_time",' '"upstremtime":"$upstream_response_time",' '"upstremhost":"$upstream_addr",' '"httphost":"$host",' '"referer":"$http_referer",' '"xff":"$http_x_forwarded_for",' '"agent":"$http_user_agent",' '"clientip":"$remote_addr",' '"request":"$request",' '"uri":"$uri",' '"status":"$status"}'; server里面直接丢即可 access_log /usr/local/nginx/logs/access.log main; error_log /var/log/nginx/error.log; |

网站域名配置文件(注意我的配置文件放在/usr/local/nginx/conf/vhost/下)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
server { listen 80; server_name www.test.com; location / { root /www/www.test.com/; index index.html index.htm; } if ( $request ~ "^(.{0,750})" ) { set $request_short $1; } if ( $request_body ~ "^(.{0,750})" ) { set $request_body_short $1; } set $http_referer_short "-"; if ( $http_referer ~ "^(.{1,100})" ) { set $http_referer_short $1; } set $real_ip $remote_addr; if ( $http_x_forwarded_for ~ "^(\d+\.\d+\.\d+\.\d+)" ) { set $real_ip $1; } set $level1domain unparse; set $level2domain unparse; if ( $server_name ~ "^(.+)\.([0-9a-zA-Z]+)\.(com|cn)$" ) { set $level1domain $2; set $level2domain $1; } if ( $server_name ~ "^([0-9a-zA-Z]+)\.(com|cn)$" ) { set $level1domain $1; set $level2domain none; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } access_log /usr/local/nginx/logs/access.log main_json; } |
配置文件解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
#取前750个字节 if ( $request ~ "^(.{0,750})" ) { set $request_short $1; } #取前750个字节 if ( $request_body ~ "^(.{0,750})" ) { set $request_body_short $1; } #取前100个字节 set $http_referer_short "-"; if ( $http_referer ~ "^(.{1,100})" ) { set $http_referer_short $1; } #从$http_x_forward_for中获取第一个IP,作为客户端实际IP set $real_ip $remote_addr; if ( $http_x_forwarded_for ~ "^(\d+\.\d+\.\d+\.\d+)" ) { set $real_ip $1; } #server_name的格式是:N级域名.……三级域名.二级域名.一级域名.com或cn,或者一级域名.com或cn; #解析一级域名部分为$level1domain #解析一级域名之前的部分为$level2domain set $level1domain unparse; set $level2domain unparse; if ( $server_name ~ "^(.+)\.([0-9a-zA-Z]+)\.(com|cn)$" ) { set $level1domain $2; set $level2domain $1; } if ( $server_name ~ "^([0-9a-zA-Z]+)\.(com|cn)$" ) { set $level1domain $1; set $level2domain none; } |
重启nginx , 用浏览器访问你的域名看是否正常
3.配置logstash
1 |
vim /opt/logstash-5.6.4/config/simple.conf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
#input负责从数据源提取数据,由于我提取的是日志文件,所以使用的是file插件 input { file { type =>"nginxlog" #指定一个自定义名称,设置type后,可以在后面的filter和output中对不同的type做不同的处理,适用于需要消费多个日志文件的场景 path =>"/usr/local/nginx/logs/access.log" #指定日志文件路径 start_position => "beginning" #指定起始读取位置,beginning 表示第一次启动从文件头开始读取,后面动态读取;end表示从文件尾开始(类似tail -f)。 sincedb_path => "/dev/null" #sincedb_path 指定sincedb文件的路径。sincedb保存每个日志文件已经被读取到的位置,如果Logstash重启,对于同一个文件,会继续从上次记录的位置开始读取。如果想重新从头读取文件,需要删除sincedb文件。如果设置为“/dev/null”,即不保存位置信息。 codec => multiline { #logstash编码插件(Codec) pattern => "^\d" negate => true what => "previous" } } } filter { #在json化之前,使用mutte对\\x字符串进行替换,防止以下错误:ParserError: Unrecognized character escape 'x' (code 120) mutate { gsub => ["message", "\\x", "\\\x"] } json { source => "message" #删除无用字段,节约空间 remove_field => "message" remove_field => "severity" remove_field => "pid" remove_field => "logsource" remove_field => "timestamp" remove_field => "facility_label" remove_field => "type" remove_field => "facility" remove_field => "@version" remove_field => "priority" remove_field => "severity_label" } date { #用nginx请求时间替换logstash生成的时间 match => ["time_local", "ISO8601"] target => "@timestamp" } grok { #从时间中获取day match => { "time_local" => "(?<day>.{10})" } } grok { #将request解析成2个字段:method\url match => { "request" => "%{WORD:method} (?<url>.* )" } } grok { #截取http_referer问号前的部分,问号后的信息无价值,浪费空间 match => { "http_referer" => "(?<referer>-|%{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?)" } } mutate { #解析出新的字段后,原字段丢弃 remove_field => "request" remove_field => "http_referer" rename => { "http_user_agent" => "agent" } rename => { "upstream_response_time" => "response_time" } rename => { "host" => "log_source" } rename => { "http_x_forwarded_for" => "x_forwarded_for" } #以下2个字段以逗号分隔后,以数组形式入库 split => { "x_forwarded_for" => ", " } split => { "response_time" => ", " } } alter { #不满足elasticsearch索引模型的,入库会失败,因此做以下数据转换 condrewrite => [ "x_forwarded_for", "-", "0.0.0.0", "x_forwarded_for", "unknown", "0.0.0.0", "response_time", "-", "0", "real_ip", "", "0.0.0.0" ] } } #(将日志文件按template指定个数输出到nginx_to_logs索引) output { #入库,以template指定的模型作为索引模型 elasticsearch { hosts => ["192.168.6.123:9200"] action=>"index" #es要执行的动作 index, delete, create, update document_id=> "igshooter" #为索引提供document id,对重写elasticsearch中相同id词目很有用 index => "nginx_to_logs-%{+YYYY.MM.dd}" #事件要被写进的索引,这里的索引要es已创建,没创建是没用的es也搜不到的 user => elastic password => changeme manage_template => true #一个默认的es mapping 模板将启用(除非设置为false,他就会用自己的template) template_overwrite => true template_name => "mynginx" #在es内部模板的名字(这个名字可以随意取) #有效的filepath 设置自己的template文件路径,不设置就用已有的 (这个文件里的template:索引名称必须和 上面index设置的索引名称一致,或者包含*) template => "/opt/logstash-5.6.4/template/mynginxtemplate.json" codec => json #使用codec的json格式输出 } #本地文件放一份,作为ELK的补充 file { flush_interval => 600 path => '/home/nginxlog/%{day}/%{domain}.log' codec => line { format => '<%{time_local}> <%{real_ip}> <%{method}> <%{url}> <%{status}> <%{request_time}> <%{response_time}> <%{body_bytes_sent}> <%{request_body}> <%{referer}> <%{x_f orwarded_for}> <%{log_source}> <%{agent}>'} } } #这里需要十分注意的一个问题是,document_id尽量保证值得唯一,这样会解决你面即将面临的ES数据重复问题,切记切记! |
logstash编码插件(Codec)
编码插件(codec)可以在logstash输入或输出时处理不同类型的数据,同时,还可以更好更方便的与其他自定义格式的数据产品共存,比如:fluent、netflow、collectd等通用数据格式的其他产品。因此,logstash不只是一个input–>filter–>output的数据流,而且是一个input–>decode–>filter–>encode–>output的数据流。
codec支持的编码格式常见有plain、json、json_lines等,下面依次介绍:
1、codec插件之plain(原样输出)
plain是最简单的编码插件,你输入什么信息,就返回什么信息,诸如你配置文件中定义的timestamp、type等都不会带有:
1 2 3 4 5 6 7 8 9 10 11 |
input { stdin { } } output { stdout { codec => plain } } |
2. codec插件之json、json_lines
有时候logstash采集的日志是JSON格式,那我们可以在input字段加入codec => json来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。如果想让logstash输出为json格式,可以在output字段加入codec=>json。下面是一个包含json编码的时间配置文件:
1 2 3 4 5 6 7 8 9 10 |
input { stdin { } } output { stdout { codec => json } } |
下面是我自己的配置代码(无注释版):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
input { file { type =>"nginxlog" path =>"/usr/local/nginx/logs/access.log" start_position => "beginning" sincedb_path => "/dev/null" codec => multiline { pattern => "^\d" negate => true what => "previous" } } } filter { mutate { gsub => ["message", "\\x", "\\\x"] } json { source => "message" remove_field => "message" remove_field => "severity" remove_field => "pid" remove_field => "logsource" remove_field => "timestamp" remove_field => "facility_label" remove_field => "type" remove_field => "facility" remove_field => "@version" remove_field => "priority" remove_field => "severity_label" } date { match => ["time_local", "ISO8601"] target => "@timestamp" } grok { match => { "time_local" => "(?<day>.{10})" } } grok { match => { "request" => "%{WORD:method} (?<url>.* )" } } grok { match => { "http_referer" => "(?<referer>-|%{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?)" } } mutate { remove_field => "request" remove_field => "http_referer" rename => { "http_user_agent" => "agent" } rename => { "upstream_response_time" => "response_time" } rename => { "host" => "log_source" } rename => { "http_x_forwarded_for" => "x_forwarded_for" } split => { "x_forwarded_for" => ", " } split => { "response_time" => ", " } } #alter { # condrewrite => [ # "x_forwarded_for", "-", "0.0.0.0", # "x_forwarded_for", "unknown", "0.0.0.0", # "response_time", "-", "0", # "real_ip", "", "0.0.0.0" # ] #} } output { elasticsearch { hosts => ["192.168.6.123:9200"] action=>"index" index => "nginx_to_logs-%{+YYYY.MM.dd}" user => elastic password => changeme manage_template => true template_overwrite => true template_name => "mynginx" template => "/opt/logstash-5.6.4/template/mynginxtemplate.json" codec => json } file { flush_interval => 600 path => '/home/nginxlog/%{day}/%{domain}.log' codec => line { format => '<%{time_local}> <%{real_ip}> <%{method}> <%{url}> <%{status}> <%{request_time}> <%{response_time}> <%{body_bytes_sent}> <%{request_body}> <%{referer}> <%{x_f orwarded_for}> <%{log_source}> <%{agent}>'} } } |
还有一个文件不能忘,就是输出到elastsearch ,以template指定的模型作为索引模型,这个模型我们需要创建
mynginxtemplate.json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
{ "template": "nginx_*", "settings": { "index.number_of_shards": 8, "number_of_replicas": 0, "analysis": { "analyzer": { #自定义stop关键字,不收集http等字段的索引 "stop_url": { "type": "stop", "stopwords": ["http","https","www","com","cn","net"] } } } }, "mappings" : { "doc" : { "properties" : { # index:true 分词、生产搜索引擎 # analyzer:指定索引分析器 "referer": { "type": "text", "norms": false, "index": true, "analyzer": "stop_url" }, "agent": { "type": "text", "norms": false, "index": true }, # IP字段类型 "real_ip": { "type": "ip" }, "x_forwarded_for": { "type": "ip" }, # keyword,作为完整字段索引,不可分词索引 "status": { "type": "keyword" }, "method": { "type": "keyword" }, "url": { "type": "text", "norms": false, "index": true, "analyzer": "stop_url" }, "status": { "type": "keyword" }, "response_time": { "type": "half_float" }, "request_time": { "type": "half_float" }, "domain": { "type": "keyword" }, "project": { "type": "keyword" }, "request_body": { "type": "text", "norms": false, "index": true }, "body_bytes_sent": { "type": "long" }, "log_source": { "type": "ip" }, "@timestamp" : { "type" : "date", "format" : "dateOptionalTime", "doc_values" : true }, "time_local": { "enabled": false }, "day": { "enabled": false } } } } } |
配置完这些后,重启logstash
关于使用配置文件启动logstash,这里有话要说
1 |
nohup ../bin/logstash -f ../config/simple.conf & #这里每次好像只支持一个配置文件,不能用-f指定多个,就算指定多个,也是以最后一个配置文件为准。所以不要浪,一种日志就统一指定一个索引即可。多个索引没研究,也没找到资料,暂时先这样 |
- 本文固定链接: https://www.yoyoask.com/?p=656
- 转载请注明: shooter 于 SHOOTER 发表