k8s logstash多管道配置

07-21 1077阅读

背景

采用的是标准的ELK+filebeat架构

k8s logstash多管道配置
(图片来源网络,侵删)

ES版本:7.17.15

logstash版本:7.17.15

filebeat版本: 7.17.15

helm版本:7.17.3,官方地址:elastic/helm-charts

说一下为什么会想到使用多管道的原因

我们刚开始部署的是单管道,里面有多种类型的日志需要传输,比如埋点日志、系统日志、日志推送至kafka、日志推送至阿里云sls

后来在系统运行中,开发人员不够细心,在配置埋点日志的时候,出现了部分语法错误,导致整个日志系统受到影响,logstash整个推送都不可用,相当于没有隔离性,所以,在调研的时候,看到了多管道,多实例,所以来进行研究一下。

为什么使用多管道?

优点

  • 隔离性:每个管道可以独立处理不同类型的日志或数据流,有助于数据的分类和管理。
  • 灵活性:可以在同一实例中灵活配置和管理多个管道,以适应不同的需求。
  • 资源共享:多个管道共享同一个Logstash实例的资源,提高资源利用率。

    缺点

    • 复杂性:配置和管理多个管道会增加复杂性,特别是在调试和维护时。
    • 性能瓶颈:当管道数量过多或数据量很大时,可能导致性能瓶颈,影响整体处理效率。

      适用于需要在同一Logstash实例中处理不同类型数据的场景,提高资源利用率,但可能增加配置和管理的复杂性。

      相较于单管道

      单管道配置简单,适用于数据流较少或需求简单的场景,并且他的处理流程单一,性能更容易预测和管理

      同时,他的缺点也很明显,不具备隔离性,正如我背景里面遇到的问题,灵活性,拓展性也比较差。

      适用于简单数据流的处理

      相较于多实例

      优点:

      1. 高可用性:通过部署多个Logstash实例,可以提高系统的高可用性,减少单点故障的风险。
      2. 扩展性强:可以根据需求增加实例,水平扩展系统处理能力。
      3. 独立性:每个实例可以独立处理特定的数据流或任务,减少相互影响。

      缺点:

      1. 资源开销大:需要更多的资源(内存、CPU等)来运行多个实例,增加运维成本。
      2. 管理复杂:需要配置和管理多个实例,增加了运维的复杂性。

      适用于高可用性和高扩展性需求的场景,独立性强,但资源开销大,管理复杂

      helm的方式配置

      整体分为两部分,主管道和子管道

      主管道用来接受数据来源,以及公共的处理,通过filter处理之后,在将信息分发给子管道,子管道来控制输出源

      在logstash需要配置两个地方:

      • 一个是pipeline.yml文件,配置需要加载的管道文件以及id;
      • 一个是管道文件配置内容,确定输入源以及输出源头;

        下面的方式采用的helm的方式,docker方式或者其他方式可参考一下

        示例如下:

        filebeat配置

        filebeat.inputs:
        - type: log
          paths:
          - /tmp/logs/biz/*.log
          fields:
            fb_collect_app: xxx-xxx-test-biz
            fb_collect_type: bizlog
            system_env: dev  # 如果不包含 "prod",设置为test
            send_kafka: "false"
          fields_under_root: true
        - type: log
          paths:
          - /tmp/logs/sys/*.log
          multiline.pattern: '^\s|^"|^Caused by:'
          multiline.match: after
          fields:
            fb_collect_app: xxx-xxx-test-sys
            fb_collect_type: syslog
            system_env: dev  # 如果不包含 "prod",设置为test
          fields_under_root: true
        output.logstash:
          hosts:
          - "xxx"
        

        logstash配置

        values.yaml

        # 配置文件
        logstashConfig:
          logstash.yml: |
            # 如果处理的字符中含有\t\n等字符,是不生效的,我们需要开启logstash的字符转义功能,config.support_escapes: true
            config.support_escapes: true
            http.host: 0.0.0.0
            pipeline.ecs_compatibility: v1
          pipelines.yml: |
            - pipeline.id: base-processing
              path.config: "/usr/share/logstash/pipeline/base-processing.conf"
            - pipeline.id: syslog-processing
              path.config: "/usr/share/logstash/pipeline/syslog-processing.conf"
            - pipeline.id: bizlog-processing
              path.config: "/usr/share/logstash/pipeline/bizlog-processing.conf"
              
        # 管道内容
          base-processing.conf: |
            input {
               beats{
                 port => "5055"
               }
            }
            output {
              if [fb_collect_type] == "bizlog" {
                pipeline {
                  send_to => bizlogs
                }
                if [send_kafka] == "true" {
                  stdout { codec => rubydebug }
                  pipeline {
                    send_to => kafkalogs
                  }
                }
              }else if [fb_collect_type] == "syslog" {
                pipeline {
                  send_to => syslogs
                }
              }
            }
          bizlog-processing.conf: |
            input {
               pipeline {
                 address => bizlogs
               }
            }
            filter {
                ruby {
                  code => "event.cancel if (Time.now.to_f - event.get('@timestamp').to_f) > (60 * 60 * 24 * 3)"
                }
                json {
                  source => "message"
                  skip_on_invalid_json => true
                }
                date {
                  match => ["business_time","yyyy-MM-dd HH:mm:ss.SSS"]
                  target => "@timestamp"
                }
            }  
            output {
                elasticsearch {
                  hosts => ["elasticsearch-master.business:9200"]
                  index => "%{fb_collect_app}-%{+YYYY.MM.dd}"
                  user => elastic
                  password => "xxx"
               }
            }
          syslog-processing.conf: |
            input {
               pipeline {
                 address => syslogs
               }
            }
            filter {
                ruby {
                  code => "event.cancel if (Time.now.to_f - event.get('@timestamp').to_f) > (60 * 60 * 24 * 3)"
                }
                mutate{
                  strip => ["message"]
                  gsub => [ "message", "\r", " " ]
                  gsub => [ "message", "\t", " " ]
                  gsub => [ "message", "\n", " " ]
                  gsub => [ "message", "\u0000", " " ]
                }
                json {
                  source => "message"
                  skip_on_invalid_json => true
                }
                date {
                  match => ["timestamp","yyyy-MM-dd HH:mm:ss.SSS"]
                  target => "@timestamp"
                }
            }  
            output {
                elasticsearch {
                  hosts => ["elasticsearch-master.business:9200"]
                  index => "%{fb_collect_app}-%{+YYYY.MM.dd}"
                  user => elastic
                  password => "xxx"
               }
            }
          kafka-processing.conf: |
              input {
                   pipeline {
                     address => kafkalogs
                   }
                }
              output {
                stdout { codec => rubydebug }
              }
        

        其中,base-processing.conf为主管道,用来确定接收源的,然后在根据条件,将数据输入到某个pipeline中,

        最后pipeline来觉得输出到es那个索引下

        最后重启logstash下即可生效

        踩坑点

        ECS Compatibility mode

        部署之后,logstash一直报错:

        Relying on default value of pipeline.ecs_compatibility, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode
        

        这个是因为Logstash 正在使用默认的 ECS(Elastic Common Schema)兼容性模式,这可能在未来的版本中发生变化。为了避免升级时出现意外变化,你可以在 Logstash 的配置文件中显式声明所需的 ECS 兼容性模式。

        解决方案:

        在 logstash.yml 文件中添加或修改 pipeline.ecs_compatibility 参数。你可以选择以下几种模式之一:

        • disabled: 不使用 ECS 兼容性模式。
        • v1: 使用 ECS 1.0 兼容性模式。
        • v8: 使用 ECS 8.0 兼容性模式(如果你的 Logstash 版本支持)。

          例如,设置 ECS 兼容性模式为 disabled:

          pipeline.ecs_compatibility: disabled
          

          或者设置为 ECS 1.0 兼容性模式:

          pipeline.ecs_compatibility: v1
          

          我的logstash版本是7.17.15,选择的是ECS 1.0兼容模式

          配置完成后,重启logstash

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]