Monday, November 30, 2015

Parsing ASA logs using ELK and my central syslog archive

Previously, I had setup a centralized syslog archive and directed all of my network devices to report into it. This turned out to be great timing, as there was a network issue, and having the logs in one place made the job that much easier. The next step however, is to put the information into some sort of database for parsing and easy searching.

I did some looking around, specifically at Splunk, but found that it had a fairly high barrier of entry. Instead, I found an open-source solution called ELK (Elasticsearch, Logstash, Kibana). I had heard several people mention this as an entry-level "Big-Data" solution, so I gave it a go.

For this service, I decided to build an Ubuntu 15.04 server because I am more familiar with the packaging/update process, and because the online articles used it for their platforms.

Basic ELK setup:
For the basic ELK install and configure, I followed the directions posted at https://www.digitalocean.com/community/tutorials/how-to-install-elasticsearch-logstash-and-kibana-elk-stack-on-ubuntu-14-04, with a few exceptions.

  • A newer version of the ELK stack had just been released, so I headed to https://www.elastic.co/downloads and downloaded Elasticsearch 2.1, Logstash 2.1, and Kibana 4.3
  • The newer versions appear to be restricted to the local system by default, so I never edited the elasticsearch.yml or kibana.yml to restrict only to localhost
  • I didn't password protect my URL
  • I didnt setup the Logstash Forwarder package (yet)
Bringing in the logs: At this point, we have a fully functional ELK stack, but no data is being delivered to it. Since my syslog server is a RHEL 7 host, I followed the Logstash Forwarder setup directions at https://www.digitalocean.com/community/tutorials/how-to-install-elasticsearch-logstash-and-kibana-elk-stack-on-centos-7. Here, I configured the logstash forwarder to read all logs under /var/log/syslog/*.log, and forward them to my ELK server.

Now we should be getting tons of data into our ELK server, and it should be appearing in Kibana. If the data isnt appearing,

  • Confirm the services are running (on both systems)
  • Confirm the send/receive ports are correct
  • Confirm the firewall isnt blocking the traffic
  • Use TCPDUMP on both systems to ensure the traffic is being sent and received
  • Look in elasticsearch.log and elasticsearch.err for any errors


Making sense of the ASA logs:
Now that we have data appearing in Kibana, we can start parsing it. This can first be done by simply typing queries in the Kibana screen. However, we can also categorize data and have Logstash parse out additional categories. I am going to start with the logs from my Cisco ASA.
There are lots of examples online on how to parse the ASA messages, but each one is either missing something, or is slightly off because of version differences. Ultimately I found https://jackhanington.com/blog/2014/04/21/using-logstash-elasticsearch-and-kibana-for-cisco-asa-syslog-message-analysis/ and https://jackhanington.com/blog/2015/06/16/send-cisco-asa-syslogs-to-elasticsearch-using-logstash/ to be the most complete.
One thing to note - These pages appear to categorize the ASA traffic as a new type. This appears to be important, otherwise some of the field data types will conflict with others, causing your error logs to fill up.

Once the ASA logs are being parsed, we can now begin to query based on source or destination addresses, service types, success/failure, and so on. A little work in Kibana, and we can create a dashboard showing the top utilizers, source locations of traffic, easily identify spikes and so on.




For posterity, below are the configuration files for my environment:
/etc/elasticsearch/elasticsearch.yml  -- no change from default
/etc/elasticsearch/logging.yml  -- no change from default
/etc/logstash/conf.d/01-lumberjack-input.conf
input {
  lumberjack {
    port => 5043
    type => "logs"
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
  }
}


/etc/logstash/conf.d/10-syslog.conf
filter {
  if [type] == "syslog" {
    mutate {
      add_tag => [ "syslog" ]
    }
    grok {
      match => { "message" => "%{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}


/etc/logstash/conf.d/20-cisco-asa.conf
filter {
  if "syslog" in [tags] and "pre-processed" not in [tags] {
    if "%ASA-" in [message] {
      mutate {
        add_tag => [ "pre-processed", "Firewall", "ASA" ]
        replace => { "type" =>  "cisco-fw" }
      }
grok {
        patterns_dir => "/opt/logstash/patterns/custom"
                match => ["message", "%{GREEDYDATA:cisco_message}"]
}

grok {
match => [
"cisco_message", "%{CISCOFW106001}",
"cisco_message", "%{CISCOFW106006_106007_106010}",
"cisco_message", "%{CISCOFW106014}",
"cisco_message", "%{CISCOFW106015}",
"cisco_message", "%{CISCOFW106021}",
"cisco_message", "%{CISCOFW106023}",
"cisco_message", "%{CISCOFW106100}",
"cisco_message", "%{CISCOFW110002}",
"cisco_message", "%{CISCOFW302010}",
"cisco_message", "%{CISCOFW302013_302014_302015_302016}",
"cisco_message", "%{CISCOFW302020_302021}",
"cisco_message", "%{CISCOFW305011}",
"cisco_message", "%{CISCOFW313001_313004_313008}",
"cisco_message", "%{CISCOFW313005}",
"cisco_message", "%{CISCOFW402117}",
"cisco_message", "%{CISCOFW402119}",
"cisco_message", "%{CISCOFW419001}",
"cisco_message", "%{CISCOFW419002}",
"cisco_message", "%{CISCOFW500004}",
"cisco_message", "%{CISCOFW602303_602304}",
"cisco_message", "%{CISCOFW710001_710002_710003_710005_710006}",
"cisco_message", "%{CISCOFW713172}",
"cisco_message", "%{CISCOFW733100}",
"cisco_message", "%{CISCOFW113039}"
]
}

syslog_pri { }

geoip {
                add_tag => [ "GeoIP" ]
                database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
                source => "src_ip"
        }

if [geoip][city_name]      == "" { mutate { remove_field => "[geoip][city_name]" } }
        if [geoip][continent_code] == "" { mutate { remove_field => "[geoip][continent_code]" } }
        if [geoip][country_code2]  == "" { mutate { remove_field => "[geoip][country_code2]" } }
        if [geoip][country_code3]  == "" { mutate { remove_field => "[geoip][country_code3]" } }
        if [geoip][country_name]   == "" { mutate { remove_field => "[geoip][country_name]" } }
        if [geoip][latitude]       == "" { mutate { remove_field => "[geoip][latitude]" } }
        if [geoip][longitude]      == "" { mutate { remove_field => "[geoip][longitude]" } }
        if [geoip][postal_code]    == "" { mutate { remove_field => "[geoip][postal_code]" } }
        if [geoip][region_name]    == "" { mutate { remove_field => "[geoip][region_name]" } }
        if [geoip][time_zone]      == "" { mutate { remove_field => "[geoip][time_zone]" } }

      mutate {
        replace => [ "host", "%{sysloghost}" ]
      }
    }
  }
}


/etc/logstash/conf.d/90-lumberjack-output.conf
output {
  elasticsearch { hosts => ["localhost:9200"] }
#  stdout { codec => rubydebug }
}

3 comments:

Andrew said...

Did you add a custom grok pattern to handle the CISCOFW113039 events? Would you mind sharing?

Nasreen Basu said...

we are offering best splunk online training with job support and high quality training facilities and well expert faculty . to Register you free demo please visit ,splunk training in hyderabad

Nancy Garero said...

This is very common talk for us that we cannot understand the machine language which can be easily understood by the computer to make any program. So for solving this problem, many people from us visit this activewizards.com for hiring the data scientist which are expert in handling the machine language or they can easily convert big data for machine learning usage.