k8s logstash多管道配置
背景
采用的是标准的ELK+filebeat架构
ES版本:7.17.15
logstash版本:7.17.15
filebeat版本: 7.17.15
helm版本:7.17.3,官方地址:elastic/helm-charts
说一下为什么会想到使用多管道的原因
我们刚开始部署的是单管道,里面有多种类型的日志需要传输,比如埋点日志、系统日志、日志推送至kafka、日志推送至阿里云sls
后来在系统运行中,开发人员不够细心,在配置埋点日志的时候,出现了部分语法错误,导致整个日志系统受到影响,logstash整个推送都不可用,相当于没有隔离性,所以,在调研的时候,看到了多管道,多实例,所以来进行研究一下。
为什么使用多管道?
优点
- 隔离性:每个管道可以独立处理不同类型的日志或数据流,有助于数据的分类和管理。
- 灵活性:可以在同一实例中灵活配置和管理多个管道,以适应不同的需求。
- 资源共享:多个管道共享同一个Logstash实例的资源,提高资源利用率。
缺点
- 复杂性:配置和管理多个管道会增加复杂性,特别是在调试和维护时。
- 性能瓶颈:当管道数量过多或数据量很大时,可能导致性能瓶颈,影响整体处理效率。
适用于需要在同一Logstash实例中处理不同类型数据的场景,提高资源利用率,但可能增加配置和管理的复杂性。
相较于单管道
单管道配置简单,适用于数据流较少或需求简单的场景,并且他的处理流程单一,性能更容易预测和管理
同时,他的缺点也很明显,不具备隔离性,正如我背景里面遇到的问题,灵活性,拓展性也比较差。
适用于简单数据流的处理
相较于多实例
优点:
- 高可用性:通过部署多个Logstash实例,可以提高系统的高可用性,减少单点故障的风险。
- 扩展性强:可以根据需求增加实例,水平扩展系统处理能力。
- 独立性:每个实例可以独立处理特定的数据流或任务,减少相互影响。
缺点:
- 资源开销大:需要更多的资源(内存、CPU等)来运行多个实例,增加运维成本。
- 管理复杂:需要配置和管理多个实例,增加了运维的复杂性。
适用于高可用性和高扩展性需求的场景,独立性强,但资源开销大,管理复杂
helm的方式配置
整体分为两部分,主管道和子管道
主管道用来接受数据来源,以及公共的处理,通过filter处理之后,在将信息分发给子管道,子管道来控制输出源
在logstash需要配置两个地方:
- 一个是pipeline.yml文件,配置需要加载的管道文件以及id;
- 一个是管道文件配置内容,确定输入源以及输出源头;
下面的方式采用的helm的方式,docker方式或者其他方式可参考一下
示例如下:
filebeat配置
filebeat.inputs: - type: log paths: - /tmp/logs/biz/*.log fields: fb_collect_app: xxx-xxx-test-biz fb_collect_type: bizlog system_env: dev # 如果不包含 "prod",设置为test send_kafka: "false" fields_under_root: true - type: log paths: - /tmp/logs/sys/*.log multiline.pattern: '^\s|^"|^Caused by:' multiline.match: after fields: fb_collect_app: xxx-xxx-test-sys fb_collect_type: syslog system_env: dev # 如果不包含 "prod",设置为test fields_under_root: true output.logstash: hosts: - "xxx"
logstash配置
values.yaml
# 配置文件 logstashConfig: logstash.yml: | # 如果处理的字符中含有\t\n等字符,是不生效的,我们需要开启logstash的字符转义功能,config.support_escapes: true config.support_escapes: true http.host: 0.0.0.0 pipeline.ecs_compatibility: v1 pipelines.yml: | - pipeline.id: base-processing path.config: "/usr/share/logstash/pipeline/base-processing.conf" - pipeline.id: syslog-processing path.config: "/usr/share/logstash/pipeline/syslog-processing.conf" - pipeline.id: bizlog-processing path.config: "/usr/share/logstash/pipeline/bizlog-processing.conf" # 管道内容 base-processing.conf: | input { beats{ port => "5055" } } output { if [fb_collect_type] == "bizlog" { pipeline { send_to => bizlogs } if [send_kafka] == "true" { stdout { codec => rubydebug } pipeline { send_to => kafkalogs } } }else if [fb_collect_type] == "syslog" { pipeline { send_to => syslogs } } } bizlog-processing.conf: | input { pipeline { address => bizlogs } } filter { ruby { code => "event.cancel if (Time.now.to_f - event.get('@timestamp').to_f) > (60 * 60 * 24 * 3)" } json { source => "message" skip_on_invalid_json => true } date { match => ["business_time","yyyy-MM-dd HH:mm:ss.SSS"] target => "@timestamp" } } output { elasticsearch { hosts => ["elasticsearch-master.business:9200"] index => "%{fb_collect_app}-%{+YYYY.MM.dd}" user => elastic password => "xxx" } } syslog-processing.conf: | input { pipeline { address => syslogs } } filter { ruby { code => "event.cancel if (Time.now.to_f - event.get('@timestamp').to_f) > (60 * 60 * 24 * 3)" } mutate{ strip => ["message"] gsub => [ "message", "\r", " " ] gsub => [ "message", "\t", " " ] gsub => [ "message", "\n", " " ] gsub => [ "message", "\u0000", " " ] } json { source => "message" skip_on_invalid_json => true } date { match => ["timestamp","yyyy-MM-dd HH:mm:ss.SSS"] target => "@timestamp" } } output { elasticsearch { hosts => ["elasticsearch-master.business:9200"] index => "%{fb_collect_app}-%{+YYYY.MM.dd}" user => elastic password => "xxx" } } kafka-processing.conf: | input { pipeline { address => kafkalogs } } output { stdout { codec => rubydebug } }
其中,base-processing.conf为主管道,用来确定接收源的,然后在根据条件,将数据输入到某个pipeline中,
最后pipeline来觉得输出到es那个索引下
最后重启logstash下即可生效
踩坑点
ECS Compatibility mode
部署之后,logstash一直报错:
Relying on default value of pipeline.ecs_compatibility, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode
这个是因为Logstash 正在使用默认的 ECS(Elastic Common Schema)兼容性模式,这可能在未来的版本中发生变化。为了避免升级时出现意外变化,你可以在 Logstash 的配置文件中显式声明所需的 ECS 兼容性模式。
解决方案:
在 logstash.yml 文件中添加或修改 pipeline.ecs_compatibility 参数。你可以选择以下几种模式之一:
- disabled: 不使用 ECS 兼容性模式。
- v1: 使用 ECS 1.0 兼容性模式。
- v8: 使用 ECS 8.0 兼容性模式(如果你的 Logstash 版本支持)。
例如,设置 ECS 兼容性模式为 disabled:
pipeline.ecs_compatibility: disabled
或者设置为 ECS 1.0 兼容性模式:
pipeline.ecs_compatibility: v1
我的logstash版本是7.17.15,选择的是ECS 1.0兼容模式
配置完成后,重启logstash