Rules¶
Rules contain data analysis and alerting logic
Rules are written in native Python, not a proprietary language
A Rule can utilize any Python function or library
A Rule can be run against multiple log sources if desired
Rules can be isolated into defined environments/clusters
Rule alerts can be sent to one or more outputs, like S3, PagerDuty or Slack
Rules can be unit and integration tested
Getting Started¶
All rules are located in the rules/
directory.
Within this directory are two folders: community/
and default/
.
community/
rules are publicly shared from StreamAlert contributors.
default/
can be used as a generic container for rules files.
You may create any folder structure desired, as all rules folders are imported recursively. Here are some examples:
rules/intrusion-detection/malware.py
rules/compliance/pci.py
rules/default/infrastructure.py
Note
If you create additional folders within the rules
directory, be sure to include a blank __init__.py
file.
Overview¶
A StreamAlert rule is a Python method that takes a parsed log record (dictionary) and returns True or False.
A return value of True
means an alert will be generated.
Example: The Basics¶
The simplest possible rule looks like this:
from streamalert.shared.rule import rule
@rule(logs=['cloudwatch:events'])
def all_cloudwatch_events(record):
"""Minimal StreamAlert rule: alert on all CloudWatch events"""
return True
This rule will be evaluated against all inbound logs that match the cloudwatch:events
schema defined in a schema file in the conf/schemas
directory, i.e conf/schemas/cloudwatch.json
.
In this case, all CloudWatch events will generate an alert, which will be sent to the alerts Athena table.
Example: Logic & Outputs¶
Let’s modify the rule to page the security team if anyone ever uses AWS root credentials:
from streamalert.shared.rule import rule
@rule(logs=['cloudwatch:events'], outputs=['pagerduty:csirt', 'slack:security'])
def cloudtrail_root_account_usage(record):
"""Page security team for any usage of AWS root account"""
return (record['detail']['userIdentity']['type'] == 'Root'
and record['detail']['userIdentity'].get('invokedBy') is None
and record['detail']['eventType'] != 'AwsServiceEvent')
Now, any AWS root account usage is reported to PagerDuty, Slack, and the aforementioned Athena table. In order for this to work, your datasources and outputs must be configured so that:
CloudTrail logs are being sent to StreamAlert via CloudWatch events
The
pagerduty:csirt
andslack:security
outputs have the proper credentials
Example: More Rule Options¶
The previous example suffers from the following problems:
It only works for CloudTrail data sent via CloudWatch
A single legitimate root login may generate hundreds of alerts in the span of a few minutes
There is no distinction between different AWS account IDs
We can generalize the rule to alleviate these issues:
from rules.helpers.base import get_first_key # Find first key recursively in record
from streamalert.shared.rule import rule
# This could alternatively be defined in matchers/matchers.py to be shareable
_PROD_ACCOUNTS = {'111111111111', '222222222222'}
def prod_account(record):
"""Match logs for one of the production AWS accounts"""
return (
record.get('account') in _PROD_ACCOUNTS or
get_first_key(record, 'userIdentity', {}).get('accountId') in _PROD_ACCOUNTS
)
@rule(
logs=['cloudtrail:events', 'cloudwatch:events'], # Rule applies to these 2 schemas
matchers=[prod_account], # Must be satisfied before rule is evaluated
merge_by_keys=['useragent'], # Merge alerts with the same 'useragent' key-value pair
merge_window_mins=5, # Merge alerts every 5 minutes
outputs=['pagerduty:csirt', 'slack:security'] # Send alerts to these 2 outputs
)
def cloudtrail_root_account_usage(record):
"""Page security team for any usage of AWS root account"""
return (
get_first_key(record, 'userIdentity', {}).get('type') == 'Root' and
not get_first_key(record, 'invokedBy') and
get_first_key(record, 'eventType') != 'AwsServiceEvent'
)
To simplify rule logic, you can extract common routines into custom helper methods.
These helpers are defined in rules/helpers/base.py
and can be called from within a matcher or rule (as shown here).
Since rules are written in Python, you can make them as sophisticated as you want!
Rule Options¶
The following table provides an overview of each rule option, with more details below:
@rule kwarg |
Type |
Description |
|
|
Dynamically configurable context passed to the alert processor |
|
|
List of normalized type names the rule applies to |
|
|
List of log schemas the rule applies to |
|
|
Matcher pre-conditions which must be met before rule logic runs |
|
|
List of key names that must match in value before merging alerts |
|
|
Merge related alerts at this interval rather than sending immediately |
|
|
List of alert outputs |
|
|
List of functions which return valid outputs |
|
|
Subkeys which must be present in the record |
- context
context
can pass extra instructions to the alert processor for more precise routing:# Context provided to the pagerduty-incident output with # instructions to assign the incident to a user. @rule(logs=['osquery:differential'], outputs=['pagerduty:csirt'], context={'pagerduty-incident': {'assigned_user': 'valid_user'}}) def my_rule(record, context): context['pagerduty-incident']['assigned_user'] = record['username'] return True
- datatypes
conf/normalized_types.json
defines data normalization, whereby you can write rules against a common type instead of a specific field or schema:"""These rules apply to several different log types, defined in conf/normalized_types.json""" from streamalert.shared.rule import rule from streamalert.shared.normalize import Normalizer @rule(datatypes=['sourceAddress'], outputs=['aws-sns:my-topic']) def ip_watchlist_hit(record): """Source IP address matches watchlist.""" return '127.0.0.1' in Normalizer.get_values_for_normalized_type(record, 'sourceAddress') @rule(datatypes=['command'], outputs=['aws-sns:my-topic']) def command_etc_shadow(record): """Command line arguments include /etc/shadow""" return any( '/etc/shadow' in cmd.lower() for cmd in Normalizer.get_values_for_normalized_type(record, 'command') )
- logs
logs
define the log schema(s) supported by the rule.Log datasources are defined within the
data_sources
field of a cluster such asconf/clusters/<cluster>.json
and their schemas are defined in one or more files in theconf/schemas
directory.Note
Either
logs
ordatatypes
must be specified for each rule- matchers
matchers
define conditions that must be satisfied in order for the rule to be evaluated. Default matchers are defined inmatchers/matchers.py
but can also be defined in the rules file (see example above).A matcher function should accept a single argument, just like rules. That argument will be the record that is being evaluated by the rule.
Rules can utilize matchers to reduce redundancy of code, allowing you to define the logic once and easily use it across multiple rules.
- merge_by_keys / merge_window_mins
Note
Specify neither or both of these fields, not one of them in isolation
For a better alert triage experience, you can merge alerts whose records share one or more fields in common:
@rule(logs=['your-schema'], merge_by_keys=['alpha', 'beta', 'gamma'], merge_window_mins=5): def merged_rule(record): return True
The alert merger Lambda function will buffer all of these alerts until 5 minutes have elapsed, at which point
{ "alpha": "A", "nested": { "beta": "B" }, "gamma": [1, 2, 3], "timestamp": 123 }
would be automatically merged with
{ "alpha": "A", "nested": { "beta": "B", "extra": "field" }, "gamma": [1, 2, 3], "timestamp": 456 }
A single consolidated alert will be sent showing the common keys and the record differences. All of the specified merge keys must have the same value in order for two records to be merged, but those keys can be nested anywhere in the record structure.
Note
The original (unmerged) alert will always be sent to Athena.
- dynamic_outputs
The
dynamic_outputs
keyword argument defines additional outputs to an Alert which are dynamically generated. See dynamic_outputs for more info- outputs
The
outputs
keyword argument defines the alert destination if the return value of a rule isTrue
. Alerts are always sent to an Athena alerts table which is easy to query. Any number of additional outputs can be specified.- req_subkeys
req_subkeys
defines sub-keys that must exist in the incoming record (with a non-zero value) in order for it to be evaluated.This feature should be used if you have logs with a loose schema defined in order to avoid raising a
KeyError
in rules.# The 'columns' key must contain sub-keys of 'address' and 'hostnames' @rule(logs=['osquery:differential'], outputs=['aws-lambda:my-function'], req_subkeys={'columns':['address', 'hostnames']}) def osquery_host_check(rec): # If all logs did not have the 'address' sub-key, this rule would # throw a KeyError. Using req_subkeys avoids this. return rec['columns']['address'] == '127.0.0.1'
Disabling Rules¶
In the event that a rule must be temporarily disabled, the @disable
decorator can be used.
This allows you to keep the rule definition and tests in place instead of having to remove them entirely:
from streamalert.shared.rule import disable, rule
@disable # TODO: this rule is too noisy!
@rule(logs=['example'], outputs=['slack'])
def example_rule(record):
return True