input { beats { port => 5044 type => beats } tcp { port => 5000 type => syslog } } filter { if [fields][appid] == "nginx" { if [fields][scope] == "access" { grok { match => { "message" => ["%{IPORHOST:clientip} \[%{HTTPDATE:time}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:http_status_code} %{NUMBER:bytes} \"(?\S+)\" \"%{DATA:ua}\" \"(?\S+)\""] } remove_field => "message" } mutate { add_field => { "read_timestamp" => "%{@timestamp}" } } date { match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ] remove_field => "[nginx][access][time]" } useragent { source => "[nginx][access][agent]" target => "[nginx][access][user_agent]" remove_field => "[nginx][access][agent]" } geoip { source => "[nginx][access][remote_ip]" target => "[nginx][access][geoip]" } } else if [fields][scope] == "error" { grok { match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] } remove_field => "message" } mutate { rename => { "@timestamp" => "read_timestamp" } } date { match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ] remove_field => "[nginx][error][time]" } } } if [fields][appid] == "bitdata-web_rr" { grok { match => { "message" => ["\[\] %{DATA:rr}$", "\"ret\":%{DATA:response}}$"] } } json { source => "rr" } mutate { remove_field => ["rr", "[ret][data]"] } } } output { file { path => "/var/log/%{type}-%{[fields][appid]}-%{[fields][scope]}-%{+YYYY.MM.dd}.log" } elasticsearch { hosts => ["elasticsearch:9200"] index => "logstash-%{+YYYY.MM.dd}" } }