OpenAMログをElasticSearchに送信する方法

  •  
 
ホス2019年5月29日 - 15:05 に投稿

関連ページ

目的

  • OpenAMのログ(access、authentication、config、activity)を、見やすく可視化するために、ElasticSearchを活用します。

手順

1.ElasticSearchインストール

2.Kibanaインストール

3.logstashインストール

4.Syslog設定(受信側・logstash)

概要

  • 公式で提供されているプラグインを利用します。
  • Syslog input plugin

    • Plugin version: v3.4.1
    • https://www.elastic.co/guide/en/logstash/current/plugins-inputs-syslog.html
  • ただし、以下の注意点があります。

    • OpenAMから送られてきたRFC5424フォーマットのSyslogデータは、logstash側でそのまま解釈することができず、logstashのfilterで工夫して、RFC5424フォーマットとして解釈できるようにする必要があります。
    • 以下の、LogstashのISSUEページで提供されている策が活用できます。
      https://github.com/logstash-plugins/logstash-input-syslog/issues/15#issuecomment-395859464
製品 OpenAM ver13.0 Syslog input plugin
対応フォーマット RFC5424 RFC3164

手順

  1. パターンを保存するディレクトリを作成する(ディレクトリ名は適当)
mkdir /etc/logstash/patterns
  • Syslogのパターンファイルを作成する vim /etc/logstash/patterns/syslog_format.conf

    # This is a flexable grok pattern file for syslog. By default, it attempts to be
    # relaxed and accomodate implimentation variations.
    
    # valid priority range from 0 to 191, but 00 or 001 technically not legitimate
    # according to RFC 3164.
    SYSLOGPRINUMSTRICT (?:0|(?:(?:[1-9][0-9])|(?:1[0-8][0-9])|(?:19[0-1])))
    # the example below is less precise but hopefully faster. Rather use range
    # checking logic in conf.
    SYSLOGPRINUMRELAXED [0-9]{1,3}
    SYSLOGPRISTRICT <%{SYSLOGPRINUMSTRICT:priority:int}>
    SYSLOGPRIRELAXED <%{SYSLOGPRINUMRELAXED:priority:int}>
    SYSLOGPRI %{SYSLOGPRIRELAXED}
    
    # RFC3164
    SYSLOG3164TIMESTAMPSTRICT (?:(?:Jan)|(?:Feb)|(?:Mar)|(?:Apr)|(?:May)|(?:Jun)|(?:Jul)|(?:Aug)|(?:Sep)|(?:Oct)|(?:Nov)|(?:Dec)) (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01][0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
    # Try be even more flexable then RFC3164 and also allow ISO8601 timestamps.
    SYSLOG3164TIMESTAMPRELAXED (?:%{MONTH} +%{MONTHDAY} %{TIME})|%{TIMESTAMP_ISO8601}
    SYSLOG3164TIMESTAMP %{SYSLOG3164TIMESTAMPRELAXED:timestamp_source}
    # Hostname or IP allowed in RFC 3164, but not supposed to be FQDN. Can be
    # flexable and allow it.
    HOSTNAMEONLY (?!-)[a-zA-Z0-9-]{1,63}(?<!-)
    SYSLOG3164HOSTNAMESTRICT (?:%{HOSTNAMEONLY}|%{IP})
    SYSLOG3164HOSTNAMERELAXED %{IPORHOST}
    SYSLOG3164HOSTNAME %{SYSLOG3164HOSTNAMERELAXED:host_source}
    # For the RFC3164 header, avoid matching RFC 5424 with a negative lookhead for a
    # 5424 version number. Also assume that given a timestamp, a hostname aught
    # to follow
    SYSLOG3164HDR ^(?:%{SYSLOGPRI}(?!%{SYSLOG5424VER} ))?(?:%{SYSLOG3164TIMESTAMP} (:?%{SYSLOG3164HOSTNAME} )?)?
    # The pattern below is bit stricter than the RFC definiton for tags. Technically
    # the tag is supposed to be only alphanumeric and terminate on first
    # non-alphanum character. However, many programs don't obey that. Generally
    # a colon or left sqaure bracket terminates the tag. In addition, exclude '<'
    # character as not appropriate for a program name, given it can cause confusion
    # with a syslog priority header
    SYSLOG3164TAG [^:\[<]{1,32}
    SYSLOG3164PID \[%{POSINT:pid}\]
    SYSLOG3164CONTENT %{GREEDYDATA:message_content}
    SYSLOG3164MSG (%{SYSLOG3164TAG:program}(?:%{SYSLOG3164PID})?: ?)?%{SYSLOG3164CONTENT}
    SYSLOG3164 %{SYSLOG3164HDR}%{SYSLOG3164MSG:message_syslog}
    
    # RFC5424
    SYSLOG5424VER [0-9]{1,2}
    # Timestamp is ISO8601 - the version in grok-patterns wasn't as strict as it was defined in the RFC
    SYSLOG5424TIMESTAMPSTRICT [0-9]{4}-(?:0[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])T(?:[01][0-9]|2[0123]):(?:[0-5][0-9]):(?:[0-5][0-9])(?:[.][0-9]{1,6})?(?:Z|[+-](?:[01][0-9]|2[0123]):[0-5][0-9])
    SYSLOG5424TIMESTAMPRELAXED %{TIMESTAMP_ISO8601}
    SYSLOG5424TIMESTAMP %{SYSLOG5424TIMESTAMPRELAXED}
    # Hostname can be FQDN, DNS label/hostname only or IP
    SYSLOGRFC5424HOSTNAME %{IPORHOST}
    SYSLOG5424PRINTASCII [!-~]+
    SYSLOG5424APPNAME [!-~]{1,48}
    SYSLOG5424PROCID [!-~]{1,128}
    SYSLOG5424MSGID [!-~]{1,32}
    # Practically, only one version for now, and trying to parse future versions
    # would be unwise. So 1 'hardcoded'.
    SYSLOG5424HDR ^%{SYSLOGPRI}1 (?:%{SYSLOG5424TIMESTAMP:timestamp_source}|-) (?:%{SYSLOGRFC5424HOSTNAME:host_source}|-) (?:%{SYSLOG5424APPNAME:program}|-) (?:%{SYSLOG5424PROCID:pid}|-) (?:%{SYSLOG5424MSGID:msgid}|-)
    # Replace the 1 above with %{SYSLOG5424VER:syslog_version} to cater for
    # additional versions.
    SYSLOG5424STRUCTDATA \[%{DATA}(?<!\\)\]+
    SYSLOG5424MSG %{GREEDYDATA:message_content}
    SYSLOG5424 %{SYSLOG5424HDR} (?<message_syslog>(?:%{SYSLOG5424STRUCTDATA:structured_data}|-)( ?%{SYSLOG5424MSG})?)
    
    # Try match and capture RFC 5424 first, given RFC 3164 allows messages without any syslog header.
    # Otherwise, RFC 3164 could accidentally capture an RFC 5424 priority and header as the tag or host of a raw message
    SYSLOG %{SYSLOG5424}|%{SYSLOG3164}
    
  • Syslogで受信して整形してElasticSearchに送る設定ファイルを作成する
    vim /etc/logstash/conf.d/syslogToElasticSearch.conf

    input {
    tcp {
      host => "0.0.0.0"
      port => "50005"
      type => "syslog"
    }
    udp {
      host => "0.0.0.0"
      port => "50005"
      type => "syslog"
    }
    }
    
    # determine and parse type of syslog message
    filter {
    if [type] == "syslog" {
    
      # look for and, if found, decode syslog priority
      if [message] =~ "^<[0-9]{1,3}>" {
        grok {
          match => [ "message", "^<%{NONNEGINT:priority:int}>" ]
        }
        if [priority] <= 191 {
          # check for RFC 3164 vs RFC 5424
          if [message] =~ "^<[0-9]{1,3}>[0-9]{1,2} " {
            mutate {
              add_tag => ["syslog_rfc5424"]
            }
          }
          else {
      mutate {
              add_tag =>  ["syslog_rfc3164"]
            }
          }
        }
        else {
          mutate {
            add_tag => ["syslog_priority_invalid"]
          }
        }
      }
      else {
        # only RFC 3164 allows a message to specify no priority
        mutate {
    add_tag => [ "syslog_rfc3164", "syslog_priority_missing" ]
        }
      }
      # RFC 3164 suggests adding priority if it's missing.
      # Even if missing, syslog_pri filter adds the default priority.
      syslog_pri {
        syslog_pri_field_name => "priority"
      }
      mutate {
        # follow convention used by logstash syslog input plugin
        rename => { "syslog_severity_code" => "severity" }
        rename => { "syslog_facility_code" => "facility" }
        rename => { "syslog_facility" => "facility_label" }
        rename => { "syslog_severity" => "severity_label" }
      }
    
      # parse both RFC 3164 and 5424
      grok {
        #patterns_dir => "../patterns"
        patterns_dir => "/etc/logstash/patterns"
        match => [ "message", "%{SYSLOG}" ]
        tag_on_failure => [ "_grokparsefailure_syslog" ]
      }
    
      # Check if a timestamp source was found and work out elapsed time recieving log
      # Note, mutate filter will convert a date object to a string not in ISO8601 format, so rather use ruby filter
      # TEST commentout
      #ruby {
      #    code => "event['timestamp_logstash'] = event['@timestamp']"
      #}
      if [timestamp_source] {
        date {
          locale => en
          # assume timezone for cases where it isn't provided
          timezone => "Africa/Johannesburg"
          match => [ "timestamp_source", "MMM d H:m:s", "MMM d H:m:s", "ISO8601" ]
        }
        # add a field for delta (in seconds) between logsource and logstash
        # TEST commentout
        #ruby {
        #  code => "event['time_elapsed_logstash'] = event['timestamp_logstash'] - event['@timestamp']"
        #}
      }
      else {
        mutate {
    add_tag => ["syslog_timestamp_source_missing"]
        }
      }
    
      # Check if a host source was found
      if ! [host_source] {
        mutate {
    add_tag => ["syslog_host_source_missing"]
        }
      }
    
      # discard redundant info
      mutate {
        remove_field => [ "priority" ] #redundant and less useful once severity and facility are decoded
        replace => { "message" => "%{message_content}" }
        remove_field => [ "message_syslog", "message_content" ] #already in content message
      }
    }
    
    # OpenAM CUSTOMIZE
    # ↓OpenAMのログを解析する↓
    # 先頭と最後の[]を削除
    # https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
    mutate {
      gsub => [
        "structured_data", "^\[", "",
        "structured_data", "\]$", ""
      ]
    }
    dissect{
      mapping => {
        "structured_data" => "%{OpenAM_LogName} %{OpenAM_Data}"
      }
    }
    # https://www.elastic.co/guide/en/logstash/current/plugins-filters-kv.html
    kv{
      source => "OpenAM_Data"
      field_split => " "
      #prefix => "OpenAM_"
      target => "OpenAM"
      tag_on_failure => "OpenAM_Data_kv_error"
    }
    mutate{
      remove_field => ["OpenAM_Data"]
    }
    }
    
    output {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "syslog_openam"
    }
    }
    
  • logstashでSyslog入力からの入力を受け付けます。

    /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/syslogToElasticSearch.conf
    

Syslog設定(送信側・OpenAM13.0)

  1. OpenAMのTop Level Realmのログインページにアクセスして、amAdmin (管理者ユーザ)でログインしてください。
    http://~~~:8080/openam/XUI/#login/
  • Configuration > グローバル > Audit Logging の順に画面にアクセスします。

  • 「Audit Event Handlers」の「New...」ボタンをクリックします。

  • 「Select the event handler to create」の「Syslog」を選択します。

    • Syslog設定
  • 「Next」 ボタンをクリックします。

  • 以下の通り設定項目を入力します。

    • Event handler name:
      • 任意の設定名 (例: Global Syslog Handler )
    • Server hostname:
      • logstashをインストールしたサーバのIPアドレスまたはホスト名を入力します。
      • logstashとOpenAMが同じマシンにインストールされている場合は、127.0.0.1を指定します。
      • ホスト名を指定する場合は、OpenAMのサーバが名前解決できるように「/etc/hosts」やDNSなどに記述しておきます。
    • Server port:
      • logstashがSyslog受信を受け付けているポートを指定します。
      • 本手順に従った場合は、50005 になります。
    • Transport Protocol:
      • logstashがSyslog受信を受け付けているプロトコルを指定します。
      • 本手順に従った場合は、どちらでも可能です。
    • Connection timeout:
      • 任意の秒数を指定します。 (例: 120)
    • Facility:
      • Syslogの「ファシリティ」というものを設定します。
      • 任意のもので構いません。 (例: LOCAL0 )
    • Syslog設定
  • 「Add...」ボタンをクリックします。

  • OpenAMの適当な画面にアクセスしたり、ログイン・ログアウトを行い、適当にログを出力させます。

Kibanaでの設定とログ確認

  1. ブラウザからKibanaにアクセスします。
    • http://[KibanaをインストールしたサーバのIPアドレス]:5601
  • 「Manage and Administer the Elastic Stack」の「Index Patterns」をクリックする。

  • 「Create Index Pattern」ボタンをクリックします。

  • 「Step 1 of 2: Define index pattern」 の「Index pattern」に、Index名を入力します。

    • 本手順に従った場合は、 syslog_openam と入力します。
    • 本手順で/etc/logstash/conf.d/helloworld.conf の中の、output 階層内の index => "syslog_openam"と記載された部分がIndexです。
  • 「> Next Step」ボタンをクリックします。

  • ログを時系列に並べるときに使用するfield nameを指定します。

    • 本手順に従った場合は、 @timestamp を選択します。
  • 「Create Index Pattern」ボタンをクリックします。

  • Indexの設定が完了したら、左のメニューの「Discover」をクリックします。

  • ログが表示されていることを確認します。

    • インデックスが複数ある場合は、表示するインデックスを先の手順で作成したインデックスに切り替えます。
    • Syslog設定
    • 各ログの左にある > をクリックすると、ログをTableもしくはJSON形式で見やすく表示できます。
      Syslog設定
  • OpenAMのログが出力されていれば成功です。

  • ログが出力されていないときに確認すること

    • KibanaのDiscover画面で表示されるログが、「Last 15 minutes」、つまり直近15分間になっていますが、その期間にOpenAMを操作しましたか?
    • OpenAMのAudit Loggingの設定で、logstashのサーバをIPアドレス以外で指定した場合は、正しく名前解決できていますか?
    • logstashでSyslogからの入力を受け付けるコマンドを実行後、最後に出力される以下のようなメッセージのあとに、何らかのエラーメッセージが出力されていませんか?
      [INFO ] 2019-05-29 14:22:19.291 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}

コメントを追加

プレーンテキスト

  • HTMLタグは利用できません。
  • 行と段落は自動的に折り返されます。
  • ウェブページのアドレスとメールアドレスは自動的にリンクに変換されます。
CAPTCHA
この質問はあなたが人間の訪問者であるかどうかをテストし、自動化されたスパム送信を防ぐためのものです。
画像
OpenAM Audit Logging Config
OpenAM Audit Logging Config - Syslog
Discover画面
Discover画面 - ログ展開