<source>
@type tail
path /var/log/auth.log,/var/log/secure
pos_file /var/lib/fluent/log_pos_tail_*.auth.log
format json
time_key time
time_format %Y-%m-%dT%Y:%M:%S.%L%:z
tag syslog.*
read_from_head false
<parse>
@type json
time_key time
time_format %b %d %H:%M:%S
<time_parser>
@type parser
format "[%d/%b/%Y:%H:%M:%S %z] "
</time_parser>
</parse>
</source>
<filter syslog.*>
@type record_transformer
<record>
type ${record["message"].split(" ")[0]}
message ${record["message"]}
</record>
</filter>
<match syslog.failed_login>
@type null
</match>
<match syslog.**>
@type copy
<store>
@type file
format json
path /var/log/fluent/*.log
</store>
</match>
<source>
@type tail
path /var/log/secure,/var/log/messages
pos_file /var/lib/fluent/log_pos_tail_*.log
format none # 不使用预定义的格式,我们将手动解析
tag syslog.*
read_from_head false
</source>
<filter syslog.*>
@type record_transformer
enable_ruby true
<record>
# 解析日志格式
# 示例日志格式:
# Oct 14 12:00:01 hostname sshd[1234]: Failed password for invalid_user from 192.168.0.1 port 12345 ssh2
# 或者对于 messages:
# Oct 14 12:00:01 hostname kernel: [ 119.440000] audit: type=1400 audit(1634220001.737:1): apparmor="DENIED" ...
time ${record['message'].scan(/\[(\d{2}:\d{2}:\d{2})/)[0]}
date ${record['message'].scan(/(\w{3}\s+\d{1,2}\s+\d{4})/)[0]}
host ${record['message'].scan(/\s+(\S+)\s+/)[0]}
program ${record['message'].scan(/\s+(\S+)\s+\S+\[\d+\]:\s+/)[0]}
message ${record['message'].gsub(/^\S+\s+\S+\s+\S+\s+\S+\s+\S+\[\d+\]:/, '')}
log_type ${record['message'].include?('sshd') ? 'ssh' : 'system'}
</record>
</filter>
<match syslog.*>
@type elasticsearch
hosts ["localhost:9200"] # 如果Elasticsearch在远程主机上,请替换为主机地址和端口
index_name syslog-%Y.%m.%d # 按天分割索引
include_tag_key true
tag_key "@tag"
flush_interval 5s
# 如果Elasticsearch需要身份验证,请添加用户名和密码
# user_name "your_username"
# password "your_password"
# 如果Elasticsearch使用SSL,请启用以下行
# ssl_verify_mode none # 不验证SSL证书,仅在测试环境中使用
# ssl_version TLSv1_2
</match>