Skip to content

Commit

Permalink
Added != operator to replace NOT conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
slincoln-systemtwo committed Nov 4, 2024
1 parent 611a276 commit bd52d13
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 27 deletions.
155 changes: 132 additions & 23 deletions sigma/backends/secops/secops.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
from contextlib import contextmanager
from dataclasses import dataclass
from importlib import resources
from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, Union

Expand All @@ -17,7 +19,27 @@
from sigma.pipelines.secops.yara_l import yara_l_pipeline
from sigma.processing.pipeline import ProcessingPipeline
from sigma.rule import SigmaRule
from sigma.types import SigmaCompareExpression, SigmaString, SpecialChars
from sigma.types import (
SigmaBool,
SigmaCasedString,
SigmaCIDRExpression,
SigmaCompareExpression,
SigmaExists,
SigmaExpansion,
SigmaFieldReference,
SigmaNull,
SigmaNumber,
SigmaQueryExpression,
SigmaRegularExpression,
SigmaString,
SpecialChars,
)


@dataclass
class ExpressionPair:
positive: Optional[str]
negative: Optional[str]


class SecOpsBackend(TextQueryBackend):
Expand Down Expand Up @@ -60,15 +82,16 @@ class SecOpsBackend(TextQueryBackend):
and_token: ClassVar[str] = "AND"
not_token: ClassVar[str] = "NOT"
eq_token: ClassVar[str] = "="
ne_token: ClassVar[str] = "!="

eq_expression: ClassVar[str] = "{field} {backend.eq_token} {value}" # Expression for field = value

ne_expression: ClassVar[str] = "{field} {backend.ne_token} {value}" # Expression for field != value
str_quote: ClassVar[str] = '"'
escape_char: ClassVar[str] = "\\"
wildcard_multi: ClassVar[str] = "*"
wildcard_single: ClassVar[str] = "?"
add_escaped: ClassVar[str] = "\\"

re_not_expression: ClassVar[str] = "{field} != /{regex}/ nocase"
re_expression: ClassVar[str] = "{field} = /{regex}/ nocase"
re_escape_char: ClassVar[str] = "\\"
re_escape: ClassVar[Tuple[str]] = ('"', "/")
Expand Down Expand Up @@ -100,8 +123,14 @@ class SecOpsBackend(TextQueryBackend):
# String matching operators. if none is appropriate eq_token is used.
# Since we are using regex, we need to add '.*' where appropriate, but this is done in the convert_value_str method.
startswith_expression: ClassVar[Optional[str]] = "{field} = /^{value}/ nocase"
case_sensitive_startswith_expression: ClassVar[Optional[str]] = "{field} = /^{value}/"
not_startswith_expression: ClassVar[Optional[str]] = "{field} != /^{value}/ nocase"
endswith_expression: ClassVar[Optional[str]] = "{field} = /{value}$/ nocase"
case_sensitive_endswith_expression: ClassVar[Optional[str]] = "{field} = /{value}$/"
not_endswith_expression: ClassVar[Optional[str]] = "{field} != /{value}$/ nocase"
contains_expression: ClassVar[Optional[str]] = "{field} = /{value}/ nocase"
case_sensitive_contains_expression: ClassVar[Optional[str]] = "{field} = /{value}/"
not_contains_expression: ClassVar[Optional[str]] = "{field} != /{value}/ nocase"
wildcard_match_expression: ClassVar[Optional[str]] = (
None # Special expression if wildcards can't be matched with the eq_token operator
)
Expand Down Expand Up @@ -164,6 +193,38 @@ def convert_value_str(self, s: SigmaString, state: ConversionState, quote_string
return self.quote_string(converted)
return converted

@contextmanager
def _negated_expressions(self, negation: bool = False):
"""Context manager to temporarily swap expressions with their negated versions."""
if not negation:
yield
return

# Store original expressions
original_expressions = {
"eq_expression": self.eq_expression,
"re_expression": self.re_expression,
"startswith_expression": self.startswith_expression,
"endswith_expression": self.endswith_expression,
"contains_expression": self.contains_expression,
}

# Swap to negated versions
try:
self.eq_expression = self.ne_expression
self.re_expression = self.re_not_expression
self.startswith_expression = self.not_startswith_expression
self.endswith_expression = self.not_endswith_expression
self.contains_expression = self.not_contains_expression
yield
finally:
# Restore original expressions
self.eq_expression = original_expressions["eq_expression"]
self.re_expression = original_expressions["re_expression"]
self.startswith_expression = original_expressions["startswith_expression"]
self.endswith_expression = original_expressions["endswith_expression"]
self.contains_expression = original_expressions["contains_expression"]

def convert_condition(
self, cond: ConditionType, state: ConversionState, parent_cond: Optional[ConditionType] = None
) -> Any:
Expand Down Expand Up @@ -194,12 +255,74 @@ def convert_condition(
elif isinstance(cond, ConditionNOT):
return self.convert_condition_not(cond, state)
elif isinstance(cond, ConditionFieldEqualsValueExpression):
return self.convert_condition_field_eq_val(cond, state)
negation = True if isinstance(parent_cond, ConditionNOT) else False
return self.convert_condition_field_eq_val(cond, state, negation)
elif isinstance(cond, ConditionValueExpression):
return self.convert_condition_val(cond, state)
else: # pragma: no cover
raise TypeError("Unexpected data type in condition parse tree: " + cond.__class__.__name__)

def convert_condition_field_eq_val(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState, negation: bool = False
) -> Any:
"""Conversion dispatcher of field = value conditions. Dispatches to value-specific methods."""
with self._negated_expressions(negation):
if isinstance(cond.value, SigmaCasedString):
return self.convert_condition_field_eq_val_str_case_sensitive(cond, state)
elif isinstance(cond.value, SigmaString):
return self.convert_condition_field_eq_val_str(cond, state)
elif isinstance(cond.value, SigmaNumber):
return self.convert_condition_field_eq_val_num(cond, state)
elif isinstance(cond.value, SigmaBool):
return self.convert_condition_field_eq_val_bool(cond, state)
elif isinstance(cond.value, SigmaRegularExpression):
return self.convert_condition_field_eq_val_re(cond, state)
elif isinstance(cond.value, SigmaCIDRExpression):
return self.convert_condition_field_eq_val_cidr(cond, state)
elif isinstance(cond.value, SigmaCompareExpression):
return self.convert_condition_field_compare_op_val(cond, state)
elif isinstance(cond.value, SigmaFieldReference):
return self.convert_condition_field_eq_field(cond, state)
elif isinstance(cond.value, SigmaNull):
return self.convert_condition_field_eq_val_null(cond, state)
elif isinstance(cond.value, SigmaQueryExpression):
return self.convert_condition_field_eq_query_expr(cond, state)
elif isinstance(cond.value, SigmaExists):
return self.convert_condition_field_eq_val_exists(cond, state)
elif isinstance(cond.value, SigmaExpansion):
return self.convert_condition_field_eq_expansion(cond, state)
else: # pragma: no cover
raise TypeError("Unexpected value type class in condition parse tree: " + cond.value.__class__.__name__)

def convert_condition_or(
self, cond: ConditionOR, state: ConversionState, negation: bool = False
) -> Union[str, DeferredQueryExpression]:
"""Conversion of OR conditions."""
try:
if (
self.token_separator == self.or_token
): # don't repeat the same thing triple times if separator equals or token
joiner = self.or_token
else:
joiner = self.token_separator + self.or_token + self.token_separator

return joiner.join(
(
converted
for converted in (
(
self.convert_condition(arg, state, parent_cond=cond.parent)
if self.compare_precedence(cond, arg) or negation # Don't group NOT conditions
else self.convert_condition_group(arg, state)
)
for arg in cond.args
)
if converted is not None and not isinstance(converted, DeferredQueryExpression)
)
)
except TypeError: # pragma: no cover
raise NotImplementedError("Operator 'or' not supported by the backend")

def convert_condition_and(
self, cond: ConditionAND, state: ConversionState, negation: bool = False
) -> Union[str, DeferredQueryExpression]:
Expand All @@ -209,25 +332,14 @@ def convert_condition_and(
self.token_separator == self.and_token
): # don't repeat the same thing triple times if separator equals and token
joiner = self.and_token
if negation:
joiner = joiner + self.token_separator + self.not_token
else:
joiner = self.token_separator + self.and_token + self.token_separator
if negation:
joiner = (
self.token_separator
+ self.and_token
+ self.token_separator
+ self.not_token
+ self.token_separator
)

return joiner.join(
(
converted
for converted in (
(
self.convert_condition(arg, state, parent_cond=ConditionNOT)
self.convert_condition(arg, state, parent_cond=cond.parent)
if self.compare_precedence(cond, arg) or negation # Don't group NOT conditions
else self.convert_condition_group(arg, state)
)
Expand All @@ -248,10 +360,7 @@ def convert_condition_not(self, cond: ConditionNOT, state: ConversionState) -> U
if isinstance(expr, DeferredQueryExpression): # negate deferred expression and pass it to parent
return expr.negate()
else: # convert negated expression to string
if isinstance(arg, ConditionOR):
return self.not_token + self.token_separator + self.group_expression.format(expr=expr)
else:
return self.not_token + self.token_separator + expr
return expr
except TypeError: # pragma: no cover
raise NotImplementedError("Operator 'not' not supported by the backend")

Expand Down Expand Up @@ -351,17 +460,17 @@ def convert_condition_as_in_not_expression(
We also have to separate each expression with OR and not use | regex in one expression.
"""
joiner = (
self.token_separator + self.or_token + self.token_separator
self.token_separator + self.and_token + self.token_separator
) # NOT is prepended outside the whole expression, which is surrounded by parens ({expr})
converted = [
self.re_expression.format(
self.re_not_expression.format(
field=self.escape_and_quote_field(cond.args[0].field),
regex=self.convert_value_for_in_expression(arg.value, state),
)
for arg in cond.args
]

return joiner.join(converted)
return self.group_expression.format(expr=joiner.join(converted))

def convert_value_for_in_expression(self, value, state):
"""Convert a value for an IN expression. SecOps does not support the IN operator, so we have to use the eq_token operator with a regex.
Expand Down
61 changes: 57 additions & 4 deletions tests/test_backend_secops.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_secops_negation_basic(secops_backend: SecOpsBackend):
)
)
== [
'target.process.file.full_path = /\\\\process\\.exe$/ nocase AND target.process.command_line = "this" nocase AND (NOT target.process.command_line = "notthis" nocase)'
'target.process.file.full_path = /\\\\process\\.exe$/ nocase AND target.process.command_line = "this" nocase AND (target.process.command_line != "notthis" nocase)'
]
)

Expand Down Expand Up @@ -225,7 +225,7 @@ def test_secops_negation_contains(secops_backend: SecOpsBackend):
)
)
== [
"target.process.file.full_path = /\\\\process\\.exe$/ nocase AND target.process.command_line = /this/ nocase AND (NOT target.process.command_line = /notthis/ nocase)"
"target.process.file.full_path = /\\\\process\\.exe$/ nocase AND target.process.command_line = /this/ nocase AND (target.process.command_line != /notthis/ nocase)"
]
)

Expand Down Expand Up @@ -333,7 +333,7 @@ def test_secops_cmdline_filters(secops_backend: SecOpsBackend):
"""
)
)[0]
== 'target.process.file.full_path = /\\\\netsh\\.exe$/ nocase AND target.process.command_line = / firewall / nocase AND target.process.command_line = / add / nocase AND (NOT (target.process.command_line = /advfirewall firewall add rule name=Dropbox dir=in action=allow "program=.:\\\\Program Files \\(x86\\)\\\\Dropbox\\\\Client\\\\Dropbox\\.exe" enable=yes profile=Any/ nocase OR target.process.command_line = /advfirewall firewall add rule name=Dropbox dir=in action=allow "program=.:\\\\Program Files\\\\Dropbox\\\\Client\\\\Dropbox\\.exe" enable=yes profile=Any/ nocase))'
== 'target.process.file.full_path = /\\\\netsh\\.exe$/ nocase AND target.process.command_line = / firewall / nocase AND target.process.command_line = / add / nocase AND ((target.process.command_line != /advfirewall firewall add rule name=Dropbox dir=in action=allow "program=.:\\\\Program Files \\(x86\\)\\\\Dropbox\\\\Client\\\\Dropbox\\.exe" enable=yes profile=Any/ nocase AND target.process.command_line != /advfirewall firewall add rule name=Dropbox dir=in action=allow "program=.:\\\\Program Files\\\\Dropbox\\\\Client\\\\Dropbox\\.exe" enable=yes profile=Any/ nocase))'
)


Expand Down Expand Up @@ -410,5 +410,58 @@ def test_secops_or_grouping_regex_escaping(secops_backend: SecOpsBackend):
"""
)
)[0]
== "target.process.command_line = /devtunnel/ nocase OR target.process.file.full_path = /\\\\devtunnel\\.exe$/ nocase AND (NOT (principal.process.file.full_path = /\\\\Teams\\.exe/ nocase OR principal.process.file.full_path = /\\\\devenv\\.exe/ nocase OR principal.process.file.full_path = /\\\\git/ nocase OR principal.process.file.full_path = /Code Helper \\(Plugin\\)/ nocase OR principal.process.file.full_path = /GitHub Desktop Helper \\(Renderer\\)/ nocase))"
== "target.process.command_line = /devtunnel/ nocase OR target.process.file.full_path = /\\\\devtunnel\\.exe$/ nocase AND ((principal.process.file.full_path != /\\\\Teams\\.exe/ nocase AND principal.process.file.full_path != /\\\\devenv\\.exe/ nocase AND principal.process.file.full_path != /\\\\git/ nocase AND principal.process.file.full_path != /Code Helper \\(Plugin\\)/ nocase AND principal.process.file.full_path != /GitHub Desktop Helper \\(Renderer\\)/ nocase))"
)


def test_secops_or_filtering(secops_backend: SecOpsBackend):
assert (
secops_backend.convert_rule(
SigmaRule.from_yaml(
r"""
title: Exports Registry Key To a File
id: f0e53e89-8d22-46ea-9db5-9d4796ee2f8a
related:
- id: 82880171-b475-4201-b811-e9c826cd5eaa
type: similar
status: test
description: Detects the export of the target Registry key to a file.
references:
- https://lolbas-project.github.io/lolbas/Binaries/Regedit/
- https://gist.github.com/api0cradle/cdd2d0d0ec9abb686f0e89306e277b8f
author: Oddvar Moe, Sander Wiebing, oscd.community
date: 2020-10-07
modified: 2024-03-13
tags:
- attack.exfiltration
- attack.t1012
logsource:
category: process_creation
product: windows
detection:
selection_img:
- Image|endswith: '\regedit.exe'
- OriginalFileName: 'REGEDIT.EXE'
selection_cli:
CommandLine|contains: ' -E '
filter_1: # filters to avoid intersection with critical keys rule
CommandLine|contains:
- 'hklm'
- 'hkey_local_machine'
filter_2:
CommandLine|endswith:
- '\system'
- '\sam'
- '\security'
condition: all of selection_* and not 1 of filter_*
fields:
- ParentImage
- CommandLine
falsepositives:
- Legitimate export of keys
level: low
"""
)
)[0]
== 'target.process.file.full_path = /\\\\regedit\\.exe$/ nocase OR target.process.file.names = "REGEDIT.EXE" nocase AND target.process.command_line = / -E / nocase AND ((target.process.command_line != /hklm/ nocase AND target.process.command_line != /hkey_local_machine/ nocase) OR (target.process.command_line != /\\\\system/ nocase AND target.process.command_line != /\\\\sam/ nocase AND target.process.command_line != /\\\\security/ nocase))'
)

0 comments on commit bd52d13

Please sign in to comment.