Logstashの最近のアップデートで面白そうなものをピックアップしてみた

まえがき

2018年Elastic Stackアドベントカレンダーの20日目の記事で代理で書きます!
今回はLogstash6.1.0から6.5.4までで面白そうなアップデートを簡単に紹介していきます。

qiita.com

Logstash Release notes

www.elastic.co

DNS Filterの更新について

DNS FilterはIPを逆引きしてフィールドに追加することができます。
例えば、Webサーバ等のアクセスログのx-forwarded-forを逆引きでホスト名を引いた情報をElasticsearchに投入することで、アクセス元を簡易的にわかるようにしたりします。
これはセキュリティを意識するときとかに、アクセス元がAWSだったり、海外の全く関係なさそうなところだったらBAN判定の一つの要素にできるときがあります。

アップデート内容について

github.com

Logstash 6.3.1からDNSの問い合わせ結果でキャッシュを有効化したときに、クエリがネームサーバに同期的に問い合わせていたため、応答待ちで他の処理待ちがあった状況を解決しているみたいです。
DNS Filterをキャッシュ機能を有効化にして6.3.0以下のバージョンを使っている場合は試してみるとよいかもしれません。

File input pluginの更新について

こちらのプラグインはおなじみの、ファイルを読み込むためのプラグインですね。

アップデート内容について

github.com

Logstash 6.4.0からgzipファイルをそのままで読み込めるようになりました🎉
この機能のおかげで、例えばログサーバ上においてあるような過去のgzipファイルを解凍せずにそのままデータ読込できるようになります。

Logstashの実行モードについて

アップデート内容について

www.elastic.co

JRubyによる実行以外にJavaで実行できるようになり、
Logstash 6.3.0からは本番運用まであと一歩になりました。
Elasticユーザ会では本番でも使えるっぽいと発言しちゃいましたが、よく見たらあと一歩みたいでした、すいません。。。

We’re happy to announce that the new Java execution engine in Logstash has reached the production candidate stage.

設定方法は「--experimental-java-execution」オプションを起動時に付けるだけ。

GitHubには6.0.0と6.3.0で1.4倍ほど早くなったとか🤔

Pipeline-to-Pipeline Communication (Beta)について

パイプライン同士でデータを受け渡しができるようになりました。
まだベータバージョンですが、以下のような記述で設定できるようです。
ただ、pipeline.ymlが肥大化するのでそれはそれで困るかもという声もあるので、
ここをいい感じに分割できるようになることを期待したいですね🤔

# config/pipelines.yml
- pipeline.id: upstream
  config.string: input { stdin {} } output { pipeline { send_to => [myVirtualAddress] } }
- pipeline.id: downstream
  config.string: input { pipeline { address => myVirtualAddress } }

www.elastic.co

Architectural patternについて

www.elastic.co

アップデートとは違うのですが、
いくつか公式でおすすめの設定パターンがあるようです。

The distributor pattern

単一のinputからデータタイプに応じて処理パターンがあるときにifとかで長くなるところを短縮できるようになるから良いよ!というものみたいです。

# config/pipelines.yml
- pipeline.id: beats-server
  config.string: |
    input { beats { port => 5044 } }
    output {
        if [type] == apache {
          pipeline { send_to => weblogs }
        } else if [type] == system {
          pipeline { send_to => syslog }
        } else {
          pipeline { send_to => fallback }
        }
    }
- pipeline.id: weblog-processing
  config.string: |
    input { pipeline { address => weblogs } }
    filter {
       # Weblog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_a_host] }
    }
- pipeline.id: syslog-processing
  config.string: |
    input { pipeline { address => syslog } }
    filter {
       # Syslog filter statements here...
    }
    output {
      elasticsearch { hosts => [es_cluster_b_host] }
    }
- pipeline.id: fallback-processing
    config.string: |
    input { pipeline { address => fallback } }
    output { elasticsearch { hosts => [es_cluster_b_host] } }

The output isolator pattern

出力先を複数に分けることでどちらかに障害が起きてもデータロストすることを防ぐためのもので、
ディスクにキューイングするpersisted typeにすることでより強固にするためのもののようです。

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [es, http] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => es } }
    output { elasticsearch { } }
- pipeline.id: buffered-http
  queue.type: persisted
  config.string: |
    input { pipeline { address => http } }
    output { http { } }

The forked path pattern

今まではcloneやif/elseを使ってLogstashでデータ加工をおこなっていた箇所をストリームをつなげて設定を簡素化できるようになるパターンのようです。
以下の例では、Elasticsearchには未加工でデータを送信し、S3にアップロードするときだけセンシティブなフィールドを削除してからアップロードするようです。

# config/pipelines.yml
- pipeline.id: intake
  queue.type: persisted
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => ["internal-es", "partner-s3"] } }
- pipeline.id: buffered-es
  queue.type: persisted
  config.string: |
    input { pipeline { address => "internal-es" } }
    # Index the full event
    output { elasticsearch { } }
- pipeline.id: partner
  queue.type: persisted
  config.string: |
    input { pipeline { address => "partner-s3" } }
    filter {
      # Remove the sensitive data
      mutate { remove_field => 'sensitive-data' }
    }
    output { s3 { } } # Output to partner's bucket

The collector pattern

The distributor patternとは反対に複数のデータソースがあるときに、
output前に共通の処理をはさみたいときに以下のように設定すると設定を簡易化することができるようにしているようです。

# config/pipelines.yml
- pipeline.id: beats
  config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: kafka
  config.string: |
    input { kafka { ... } }
    output { pipeline { send_to => [commonOut] } }
- pipeline.id: partner
  # This common pipeline enforces the same logic whether data comes from Kafka or Beats
  config.string: |
    input { pipeline { address => commonOut } }
    filter {
      # Always remove sensitive data from all input sources
      mutate { remove_field => 'sensitive-data' }
    }
    output { elasticsearch { } }