まえがき
2018年Elastic Stackアドベントカレンダーの20日目の記事で代理で書きます!
今回はLogstash6.1.0から6.5.4までで面白そうなアップデートを簡単に紹介していきます。
Logstash Release notes
DNS Filterの更新について
DNS FilterはIPを逆引きしてフィールドに追加することができます。
例えば、Webサーバ等のアクセスログのx-forwarded-forを逆引きでホスト名を引いた情報をElasticsearchに投入することで、アクセス元を簡易的にわかるようにしたりします。
これはセキュリティを意識するときとかに、アクセス元がAWSだったり、海外の全く関係なさそうなところだったらBAN判定の一つの要素にできるときがあります。
アップデート内容について
Logstash 6.3.1からDNSの問い合わせ結果でキャッシュを有効化したときに、クエリがネームサーバに同期的に問い合わせていたため、応答待ちで他の処理待ちがあった状況を解決しているみたいです。
DNS Filterをキャッシュ機能を有効化にして6.3.0以下のバージョンを使っている場合は試してみるとよいかもしれません。
File input pluginの更新について
こちらのプラグインはおなじみの、ファイルを読み込むためのプラグインですね。
アップデート内容について
Logstash 6.4.0からgzipファイルをそのままで読み込めるようになりました🎉
この機能のおかげで、例えばログサーバ上においてあるような過去のgzipファイルを解凍せずにそのままデータ読込できるようになります。
Logstashの実行モードについて
アップデート内容について
JRubyによる実行以外にJavaで実行できるようになり、
Logstash 6.3.0からは本番運用まであと一歩になりました。
Elasticユーザ会では本番でも使えるっぽいと発言しちゃいましたが、よく見たらあと一歩みたいでした、すいません。。。
We’re happy to announce that the new Java execution engine in Logstash has reached the production candidate stage.
設定方法は「--experimental-java-execution」オプションを起動時に付けるだけ。
GitHubには6.0.0と6.3.0で1.4倍ほど早くなったとか🤔
Pipeline-to-Pipeline Communication (Beta)について
パイプライン同士でデータを受け渡しができるようになりました。
まだベータバージョンですが、以下のような記述で設定できるようです。
ただ、pipeline.ymlが肥大化するのでそれはそれで困るかもという声もあるので、
ここをいい感じに分割できるようになることを期待したいですね🤔
# config/pipelines.yml - pipeline.id: upstream config.string: input { stdin {} } output { pipeline { send_to => [myVirtualAddress] } } - pipeline.id: downstream config.string: input { pipeline { address => myVirtualAddress } }
Architectural patternについて
アップデートとは違うのですが、
いくつか公式でおすすめの設定パターンがあるようです。
The distributor pattern
単一のinputからデータタイプに応じて処理パターンがあるときにifとかで長くなるところを短縮できるようになるから良いよ!というものみたいです。
# config/pipelines.yml - pipeline.id: beats-server config.string: | input { beats { port => 5044 } } output { if [type] == apache { pipeline { send_to => weblogs } } else if [type] == system { pipeline { send_to => syslog } } else { pipeline { send_to => fallback } } } - pipeline.id: weblog-processing config.string: | input { pipeline { address => weblogs } } filter { # Weblog filter statements here... } output { elasticsearch { hosts => [es_cluster_a_host] } } - pipeline.id: syslog-processing config.string: | input { pipeline { address => syslog } } filter { # Syslog filter statements here... } output { elasticsearch { hosts => [es_cluster_b_host] } } - pipeline.id: fallback-processing config.string: | input { pipeline { address => fallback } } output { elasticsearch { hosts => [es_cluster_b_host] } }
The output isolator pattern
出力先を複数に分けることでどちらかに障害が起きてもデータロストすることを防ぐためのもので、
ディスクにキューイングするpersisted typeにすることでより強固にするためのもののようです。
# config/pipelines.yml - pipeline.id: intake queue.type: persisted config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [es, http] } } - pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => es } } output { elasticsearch { } } - pipeline.id: buffered-http queue.type: persisted config.string: | input { pipeline { address => http } } output { http { } }
The forked path pattern
今まではcloneやif/elseを使ってLogstashでデータ加工をおこなっていた箇所をストリームをつなげて設定を簡素化できるようになるパターンのようです。
以下の例では、Elasticsearchには未加工でデータを送信し、S3にアップロードするときだけセンシティブなフィールドを削除してからアップロードするようです。
# config/pipelines.yml - pipeline.id: intake queue.type: persisted config.string: | input { beats { port => 5044 } } output { pipeline { send_to => ["internal-es", "partner-s3"] } } - pipeline.id: buffered-es queue.type: persisted config.string: | input { pipeline { address => "internal-es" } } # Index the full event output { elasticsearch { } } - pipeline.id: partner queue.type: persisted config.string: | input { pipeline { address => "partner-s3" } } filter { # Remove the sensitive data mutate { remove_field => 'sensitive-data' } } output { s3 { } } # Output to partner's bucket
The collector pattern
The distributor patternとは反対に複数のデータソースがあるときに、
output前に共通の処理をはさみたいときに以下のように設定すると設定を簡易化することができるようにしているようです。
# config/pipelines.yml - pipeline.id: beats config.string: | input { beats { port => 5044 } } output { pipeline { send_to => [commonOut] } } - pipeline.id: kafka config.string: | input { kafka { ... } } output { pipeline { send_to => [commonOut] } } - pipeline.id: partner # This common pipeline enforces the same logic whether data comes from Kafka or Beats config.string: | input { pipeline { address => commonOut } } filter { # Always remove sensitive data from all input sources mutate { remove_field => 'sensitive-data' } } output { elasticsearch { } }