In my previous post, I explained the fundamental purpose and use cases of pipelines in Graylog – now let's move towards some more advanced topics.
Now that you have normalized your data in an early stage pipeline, you can craft enrichment pipelines that can now expect predictable field names for standard data types such as IP addresses, domain names, file hashes, and so on.
Enrichment of Observables
Let's explore a basic example of enrichment to wrap our heads around what we are trying to achieve here. Let's say you have firewall logs flowing into Graylog and you would like to automatically enrich the events to include more information about the IP addresses contained in the logs. There are several data points we might hope to add such as geolocation, whois information, whether or not the IPs are contained in any blocklists, etc.
Now, there is a very important concept to understand before we rush off and start using fancy threat intel plugins like the one offered by the Graylog team.
When performing enrichment on a large amount of bulk messages coming into your Graylog instance, you might consider having two separate enrichment pipelines: one for "cheap" locally hosted resources, and one for "expensive" externally hosted resources. The primary reason for this is speed, but also the fact that even free API-based services such as VirusTotal or OTX have API request limits. Once you exceed them, you'll be heavily rate-limited, or even blocked, which may cause a pile-up in your pipelines.
A great example of inexpensive enrichment is geolocation, which we cover extensively in this post, so let's tackle another example.
Let's say we wanted to tag each IP address as either "internal" or "not internal" according to whether or not it falls within RFC1918 guidelines for locally routable IP space.
Why would we care to do this? Simple! Imagine the value of a threat hunt query looking for connections that should never originate from outside the local network such as Remote Desktop; now that query is as simple as
dst_port:3389 and NOT src_ip_is_internal:true.
This can be accomplished with a rule similar to the one below, but notice we'll need another similar rule for
src_ip as well.
rule "enrichment_rfc1918_dst_ip" when has_field("dst_ip") AND NOT ( // This will toss out IPv6 addresses contains(to_string($message.dst_ip), ":" , true) ) AND ( // rfc1918 cidr_match("10.0.0.0/8", to_ip($message.dst_ip)) OR cidr_match("172.16.0.0/12", to_ip($message.dst_ip)) OR cidr_match("192.168.0.0/16", to_ip($message.dst_ip)) OR cidr_match("127.0.0.0/8", to_ip($message.dst_ip)) ) then set_field("dst_ip_is_internal", true); end
The above rule becomes useful not only in future queries you might craft, but also in subsequent pipelines. For instance, if I wanted to check IP addresses against threat intelligence blocklists, it would not make sense to waste CPU cycles checking internal IP addresses against those lists. This is why we like running inexpensive enrichment rules in earlier stages, so that we can optimize what we are sending to the more expensive enrichment steps later on.
Let's try a slightly more complex but still inexpensive enrichment by leveraging a locally hosted lookup table.
In this example, we have locally hosted a CSV formatted copy of this Cisco Top 1000 domains list hosted by the MISP Project. Once this lookup table is locally hosted somewhere our Graylog server can access it, we can easily perform very fast and inexpensive lookups against this table to enrich logs containing domain names.
rule "enrichment_cisco_top1000_dst_domain" when has_field("dst_domain") AND NOT ( // exclude internal IPs has_field("dst_ip_is_internal") AND to_bool($message.dst_ip_is_internal) ) then let cisco_top1000_check = lookup_value("cisco-top1000", to_string($message.dst_domain)); set_field("dst_domain_cisco_top1k", cisco_top1000_check); end
Notice that even though this is an inexpensive lookup, we still add optimization steps to make sure we're not performing lookups against
dst_domain fields where the
dst_ip_is_internal value is set to
true, because that means it's an internally hosted resource and could never be on Cisco's Top 1000 domain list. Really study the logic of our
when block of conditions. This will help you understand how you can quickly write rule conditions leveraging fields that only contain
false values by simply evaluating the boolean state of the field with the
Now that we've enriched our events with a new field that will state if the
dst_domain is in Cisco's Top 1000 list, we can easily leverage that in subsequent threat intel rules by not performing more expensive enrichments on domains like
google.com and so on.
There are tons more of these types of lists--all you have to do is look for them!
If you have a locally hosted MISP instance, you're in for a real treat as it pertains to inexpensive lookups. MISP can act as a locally hosted aggregator of many externally hosted threat intel feeds allowing you to perform rapid, local lookups against enormous lists of IOCs. You can find MISP's locally hosted lists at this URL:
https://misp.yourdomain.com/events/automation/1 under "Text export". Craft up a quick python script to pull those text lists and host them locally as Graylog lookup tables, and you're in business.
Another very easy Lookup Table approach is checking for known Tor nodes. The Tor Project provides a list of current exit node IP addresses. This list can almost be ingested right from where it sits! That said, I still recommend writing a script that will periodically download it and store it locally for use in a Lookup Table. Keep in mind, lists like these can change hourly, so update them often.
I'm sure you're starting to get the hang of this, but here's a rule for this lookup.
rule "enrichment_tor_dst_ip" when has_field("dst_ip") AND NOT ( // exclude internal IPs has_field("dst_ip_is_internal") AND to_bool($message.dst_ip_is_internal) ) then let tor_check = tor_lookup(to_string($message.dst_ip)); set_field("dst_ip_tor_node", tor_check.threat_indicated); end
Now that we've performed some fast and cheap enrichment in earlier stages, we can craft some later stages that only further process enrichments on noteworthy or uncommon observables, such as domains not contained in Top 1000 lists, or IP addresses found to be on one more more blocklists.
These enrichments are considered "expensive" because they often rely on API calls to free services such as AlienVault OTX, Greynoise, etc.
I will cover the most immediately available option for API-based lookups since there's already a plugin for it. Since you can already read extensively about how to deploy this plugin and it's pipeline rules, I will simply cover the modifications that I would recommend you make in order to prevent an API rate-limit and pipeline pile up!
Here is a sanitized version of the rule we use for enriching
dst_ip fields with only the OTX portion of the threat intel plugin.
rule "enrichment_otx_dst_ip" // https://github.com/graylog-labs/graylog-plugin-threatintel when has_field("dst_ip") // skip internal IP addresses AND NOT to_bool($message.dst_ip_is_internal) // skip if dst_domain is cisco top 1k") AND NOT has_field("dst_domain_cisco_top1k") // skip a long list of other wasteful lookups AND NOT ( contains(to_string($message.dst_ip), ":") OR is_null($message.dst_ip) OR (to_string($message.dst_ip) == "255.255.255.255") OR (to_string($message.dst_ip) == "-") OR // common OTX false positivies (to_string($message.dst_ip) == "188.8.131.52") OR (to_string($message.dst_ip) == "184.108.40.206") // // our list of exceptions is much longer, but you get the idea // ) then let dst_ip_intel = threat_intel_lookup_ip(to_string($message.dst_ip), "dst_ip"); set_fields(dst_ip_intel); let intel = otx_lookup_ip(to_string($message.dst_ip)); set_field("dst_ip_otx_threat_detected", intel.otx_threat_indicated); set_field("dst_ip_otx_threat_ids", intel.otx_threat_ids); set_field("dst_ip_otx_threat_names", intel.otx_threat_names); end
I can hear you asking, "Why all the hassle? The Graylog documentation example is much simpler!" Because free OSINT repositories like OTX will happily start ignoring your API requests on a daily basis if you blast them uncontrollably. Before we started optimizing our enrichment pipelines, OTX would stop responding to us by about 9am each day. Furthermore, OTX once experienced an API outage which caused our pipelines to get severely backed up because the API calls were hanging for a very long time before timing out. The fewer events you expose to an API-based method, the better.
Let's improve upon the previous example with another OTX-based lookup for MD5 hashes that has a customizable threshold for what we would consider a threat.
Caution: this is not something we should be doing for every single MD5 hash we have. There should be some logic in place in an earlier pipeline that tags events worth looking up, such as processes running from suspicious locations like
Let's first perform a lookup of MD5 hashes for relevant OTX Pules, then later we'll assign a threat score based on the number of pulses that were returned.
In Stage 10, we perform the initial lookup:
rule "enrichment_otx_md5" // will expand to additional fields in // stage 20 threat rule: enrichment_otx_md5_detection when has_field("md5") then let evilmd5 = lookup_value("otx-md5", to_string($message.md5), 0); set_field("otx_pulses", evilmd5); end
And then later, in Stage 20, we perform additional checks against events that were enriched by the previous rule:
rule "enrichment_otx_md5_detection" // an MD5 returned greater than zero OTX Pules // depends on otx_md5* rules in stage 10 when has_field("md5") AND ( // when otx_pulses is greater than 0 has_field("otx_pulses") AND to_long($message.otx_pulses) > 0 ) then set_field("threat_detected", true); set_field("md5_otx_threat_detected", true); set_field("threat_desc", "OTX matches this MD5 against one or more threat pulses."); end
In the next post of this series, we'll further expand on this concept of detecting, escalating and deescalating threats based on similar logic.