重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置,比如某个服务器或者文件。
成都创新互联公司是一家专业提供钟山企业网站建设,专注与成都网站制作、成都网站建设、H5建站、小程序制作等业务。10年已为钟山众多企业、政府机构等服务。创新互联专业网站设计公司优惠进行中。logstash功能很强大。从 logstash 1.5.0 版本开始,logstash 将所有的插件都独立拆分成 gem 包。这样,每个插件都可以独立更新,不用等待 logstash 自身做整体更新的时候才能使用了。为了达到这个目标,logstash 配置了专门的 plugins 管理命令
logstash插件安装(本地安装)
logstash处理事件有三个阶段:input ---> filter ---> output。input产生事件,filter 对事件进行修改,output输出到其它地方。
filter是logstash管道中间处理的设备。可以结合条件语句对符合标准的事件进行处理。这里只介绍filter的插件:
你可以通过 bin/plugin list 查看本机现在有多少插件可用。(其实就在 vendor/bundle/jruby/1.9/gems/ 目录下)
插件本地安装的方法
bin/logstash-plugin install logstash-filter-kv Ignoring ffi-1.9.13 because its extensions are not built. Try: gem pristine ffi --version 1.9.13 Validating logstash-filter-kv Installing logstash-filter-kv Installation successful更新插件:bin/logstash-plugin update logstash-filter-kv
插件介绍
这里只介绍几种常用的
grok: 解析和结构化任何文本(前面单独介绍过就不重复了)
http://irow10.blog.51cto.com/2425361/1828077
geoip: 添加有关IP地址地理位置信息。
geoip这个插件非常重要,而且很常用。他能分析访问的ip分析出访问者的地址信息。举例如下:
filter { geoip { source => "message" } }message你输入个IP地址,结果如下:
{ "message" => "183.60.92.253", "@version" => "1", "@timestamp" => "2016-07-07T10:32:55.610Z", "host" => "raochenlindeMacBook-Air.local", "geoip" => { "ip" => "183.60.92.253", "country_code2" => "CN", "country_code3" => "CHN", "country_name" => "China", "continent_code" => "AS", "region_name" => "30", "city_name" => "Guangzhou", "latitude" => 23.11670000000001, "longitude" => 113.25, "timezone" => "Asia/Chongqing", "real_region_name" => "Guangdong", "location" => [ [0] 113.25, [1] 23.11670000000001 ] } }实际应用中我们可以把grok获取的request_ip传给geoip处理。
filter { if [type] == "apache" { grok { patterns_dir => "/usr/local/logstash-2.3.4/ownpatterns/patterns" match => { "message" => "%{APACHE_LOG}" } remove_field => ["message"] } geoip { source => "request_ip" } }在logstash分析完数据到output阶段输出到其它地方时,数据中就有访问者的地理信息。
date:用来转换你的日志记录中的时间字符串
date 插件是日期插件,这个插件,常用而重要。
该插件必须是用 date 包裹,如下所示:
date {
}
可用的配置选项如下表所示:
add_field
add_tag
locale
match 匹配日志格式
periodic_flush 按时间间隔调用
remove_field
remove_tag
tag_on_failure 如果标签匹配失败,则默认为_grokparsefailure
target 把 match 的时间字段保存到指定字段。若为指定,默认更新到 @timestamp。
timezone
备注:
add_field、remove_field、add_tag、remove_tag 是所有 Logstash 插件都有。
tag 作用是,当你对字段处理期间,还期望进行后续处理,就先作个标记。Logstash 有个内置 tags 数组,包含了期间产生的 tag,无论是 Logstash 自己产生的,还是你添加的,比如,你用 grok 解析日志,但是错了,那么 Logstash 自己就会自己添加一个 _grokparsefailure 的 tag。这样,你在 output 时,可以对解析失败的日志不做任何处理;
field 作用是,对字段的操作。
举例:
filter { if [type] == "apache" { grok { patterns_dir => "/usr/local/logstash-2.3.4/ownpatterns/patterns" match => { "message" => "%{APACHE_LOG}" } remove_field => ["message"] } date { match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ] } } }apache日志中的时间戳是:[19/Jul/2016:16:28:52 +0800] 。match的时间格式要和日志中的匹配对应。如果你的时间字段可能有多个格式,则可指定多个可能的日期格式:
match => [ "timestamp", "MMM dd YYY HH:mm:ss", "MMM d YYY HH:mm:ss", "ISO8601" ]
apache日志的grok表达式:
APACHE_LOG %{IPORHOST:addre} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:http_method} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:status} (?:%{NUMBER:bytes}|-) \"(?:%{URI:http_referer}|-)\" \"%{GREEDYDATA:User_Agent}\"在apache日志中已经有[%{HTTPDATE:timestamp}\],为什么还要经过date插件再处理下呢?
我们将访问时间作为logstash的时间戳,有了这个,我们就可以以时间为区分,查看分析某段时间的请求是怎样的,如果没有匹配到这个时间的话,logstash将以当前时间作为该条记录的时间戳。所以需要再filter里面定义时间戳的格式。如果不用 date 插件,那么 Logstash 将处理时间作为时间戳。时间戳字段是 Logstash 自己添加的内置字段 @timestamp,在ES中关于时间的相关查询,必须使用该字段,你当然也可以修改该字段的值。
备注:@timestamp 比我们晚了 8 个小时,在kibana显示也是如此。如何能让日志收集时间能和@timestamp一致呢?在date插件中添加如下字段:timezone =>"Asia/Chongqing"
useragent:用来处理分析访问者使用的浏览器及操作系统
在apache日志中会发现有这么一段日志:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36
我们在用grok分割数据时也是使用%{GREEDYDATA:User_Agent}。
备注:GREEDYDATA这个grok表达式是匹配任何类型的数据。 GREEDYDATA .*
通过这条数据,即使我们显示出来意义也不大,但我们可以通过useragent挖掘它的信息
filter { if [type] == "apache" { grok { patterns_dir => "/usr/local/logstash-2.3.4/ownpatterns/patterns" match => { "message" => "%{APACHE_LOG}" } remove_field => ["message"] } useragent { source => "User_Agent" target => "ua" } date { match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ] } } }显示结果:
从上图我们可以看到访问者浏览器及操作系统的信息。比那一大串信息更加有意义。
mutate:它提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。
可以设置的转换类型包括:"integer","float" 和 "string"。
可用的配置选项如下表所示:
add_field 添加新的字段
add_tag 添加新的标签
convert 数据类型转换
gsub 字符串替换。用正则表达式和字符串都行
join 用分隔符连接数组. 如果字段不是数组,那什么都不做
lowercase 把字符串转换成小写
merge 合并两个数组或散列字段
periodic_flush 按时间间隔调用
remove_field 移除字段
remove_tag 移除标识
rename 重命名一个或多个字段
replace 用一个新的值替换掉指定字段的值
split 用分隔符或字符分割一个字符串。只能应用在字符串上
strip 去掉字段首尾的空格
update 更新字段的值。如果该字段不存在,则什么都不做
uppercase 把字符串转换成大写
简单优化数据
logstash采集数据加上date,geoip,useragent等插件会使我们获取的信息更加详细,但是也更加臃肿。所有我们要踢掉一些没有意义的数据,简化传输给elasticsearch的数据。
remove_field能很好的完成这个任务。上面也有用到。
remove_field => ["message"]
在grok中我们已经发message分成了很多段小数据,如果在把message传输给elasticsearch就重复了。
当然在传输的小数据中也有很多我们用不到或者毫无意义。我们就可以使用remove_field来清除。
mutate{ remove_field => ["Syslog_Timestamp"] remove_field => ["message"]参考:https://zengjice.gitbooks.io/logstash-best-practice-cn/content/filter/mutate.html
drop: 完全丢弃事件,如debug事件
filter { if [loglevel] == "debug" { drop { } } }参考:https://www.elastic.co/guide/en/logstash/current/plugins-filters-drop.html
另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。