Logstash Filter 전처리 활용 사례
이번 시간에는 Logstash Filter 사용법과 실제 실무에서 활용 사례를 중심으로 방대한 보안 로그 데이터를 처리하는 방법을 설명하고자 합니다. 여러분이 보안 로그 데이터의 전처리 및 저장을 효율적으로 설계하기 위한 실질적인 사례를 제공받길 바랍니다.
현재 저희 회사 보안팀에서는 Logstash를 활용하여 보안 로그 데이터를 전처리한 뒤 Elastic Stack과 연동해 사용 중입니다. Logstash는 모든 서비스의 보안 이벤트 로그(방화벽, 침입 방지 시스템(IPS), 웹 애플리케이션 방화벽(WAF) 등)를 효율적으로 처리하고 있으며, 최종적으로 Elastic Stack에 데이터를 저장함으로써 분석 및 활용이 가능하도록 구성하고 있습니다.
일평균 처리량은 Elastic 인덱스 기준으로 하루 약 6억 건 ~ 7억건에 이르는 엄청난 데이터를 처리하고 있으며, 이는 문서 엘라스틱에 저장되는 문서(Document) 수 기준입니다. 데이터 저장 전략을 최적화하기 위해 Logstash 필터 플러그인을 활용해 꼭 필요한 데이터만 골라내어 저장하고 있습니다. 이를 통해 실제 하루 평균 저장되는 데이터 용량은 약 60GB에서 75GB 정도로 유지되고 있습니다. 아래는 Logstash 필터를 통한 실무 활용 사례이니 참고하시면 도움이 될 것입니다.
Logstash Drop 필터 플러그인 적용 사례
- 보안 이벤트 로그 중 `deviceCustomString1` 필드의 키워드가 "SCAN" 또는 "ATTACKIP"로 시작하는 경우, 해당 로그는 전부 폐기(drop) 처리하여 저장하지 않습니다.
- `destinationAddress` 필드 값이 특정 IP 주소(예: 20.xx.xx.xx 등)인 경우에는 해당 로그도 폐기됩니다.
- `service` 필드 값이 "PING"인 경우에는 데이터를 저장하지 않고 폐기됩니다.
- `applicationProtocol` 필드 값이 "NTP"나 "ICMP"인 경우에 해당하는 로그 역시 폐기 처리됩니다.
if [deviceCustomString1] =~ /^SCAN+/ or [deviceCustomString1] =~ /^ATTACKIP+/ {
drop {}
}
if [destinationAddress] =~ "x.xx.xx.10" or [destinationAddress] =~ "xx.xx.xx.33" or [destinationAddress] =~ "x5.x.xx.x"
or [destinationPort] =~ "2048" or [applicationProtocol] =~ "ICMP" or [service] =~ "PING" or [name] =~ "Notable Characteristics of the analyzed sample"
or [name] =~ "Deny List updated" or [name] =~ "Evidence" or [applicationProtocol] =~ "ntp" or [service] =~ "NTP" or [sourceAddress] =~ "192.168.0.x" or
[sourceAddress] =~ "x.x.x.x" or [sourceAddress] =~ "xx.xxx.x.xx" or [name] =~ /^PHP DIESCAN+/ or [name] =~ /^NETGEAR+/ {
drop {}
}
Logstash Mutate 필터 플러그인 사례
- 필드명 변경(rename 옵션):
특정 로그에서 필드명을 업무에 맞게 대체하는 것이 가능합니다. 예를 들어, `deviceHostName` 필드에서 키워드가 "ZONE01-FW"로 시작하는 방화벽 로그의 경우에는 필드명을 아래와 같이 변경합니다:
- `name` → `csname`
- `severity` → `csseverity`
- `deviceCustomString6` → `threatAction`
if [deviceHostName] =~ /^CLOUD-FW+/ {
mutate {
rename => { "name" => "csname" }
rename => { "deviceCustomString1" => "policyName" }
rename => { "applicationProtocol" => "csapplicationProtocol" }
rename => { "severity" => "csseverity" }
}
}
if [deviceHostName] =~ "WAF001" {
mutate {
rename => { "deviceCustomString1" => "title" }
rename => { "deviceCustomString6" => "threatAction" }
rename => { "deviceCustomString3" => "domainName" }
rename => { "severity" => "hostSeverity" }
rename => { "deviceEventCategory" => "threatType" }
}
}
이렇게 변경된 필드는 Elastic Stack에 저장될 때 실제로 새로운 필드명으로 반영됩니다. 이 방법은 대규모 데이터엔 특히 유용하며, 규칙적으로 변경된 데이터로 보다 체계적이고 간편한 관리를 제공합니다.
- 필드 값 변환(replace 옵션):
특정 필드 값의 내용을 정해진 문자열로 대체 변경할 수 있습니다. 예를 들면, `hostSeverity` 값이 2 또는 3인 경우 이를 `Low-Minor`로 변환하여 저장합니다. 이를 통해 로그 데이터를 보다 직관적이고 일관적으로 구성할 수 있습니다.
if [hostSeverity] == "2" or [hostSeverity] == "3" {
mutate {
replace => { "[hostSeverity]" => "Low-Minor" }
}
}
if [hostSeverity] == "4" or [hostSeverity] == "5" or [hostSeverity] == "6" or [hostSeverity] == "7" {
mutate {
replace => { "[hostSeverity]" => "Medium-Major" }
}
}
if [hostSeverity] == "8" or [hostSeverity] == "9" or [hostSeverity] == "10" {
mutate {
replace => { "[hostSeverity]" => "High-Critical" }
}
}
- 필드 삭제(remove_field 옵션):
불필요한 필드를 완전히 삭제하여 저장 공간을 최적화합니다. 예를 들어 `"PaloDGl2"`, `"PaloDGl4"`, `"PaloDstUUID"`, `"PaloPacketsReceived"` 등의 필드는 저장 대상에서 제거됩니다.
mutate {
remove_field => [ "msg", "message", "deviceReceiptTime", "deviceVersion", "deviceProcessName", "cefVersion", "type", "sourceUserName", "deviceEventCategory", "host", "port", "deviceDirection", "generatorID", "bytesIn", "bytesOut", "PanOSDGl2", "PanOSDGl4", "PanOSDstUUID", "PanOSPacketsReceived", "PanOSParentSessionID", "PanOSParentStartTime", "deviceExternalId", "eventId", "externalId" ]
}
Logstash Prune 필터 플러그인 활용
Prune 필터는 이벤트 로그에서 특정 필드를 이름 또는 값에 기반하여 삭제하거나 유지할 수 있는 기능을 제공합니다. 이때 화이트 리스트 또는 블랙 리스트 방식이 사용되며, 정규 표현식을 활용해 세부 조건도 설정 가능합니다.
prune {
blacklist_names => [ "^*[\\]+", "^*[/]+", "^*[..]+", "^ad.+", "^A.+", "^B.+", "^AK1+", "^SH2+", "^AK+", "^agent+", "^deviceEvent+", "^file+", "^cfg+", "^category+", "^flex+", "^xau+", "^*ZoneURI$", "^*boundInterface$", "^*.TimeZone$" ]
}
보안 장비 로그를 수집하는 과정에서 간혹 이전에 본 적 없는 형태의 필드명이 들어오는 경우가 발생합니다. 이를 해결하기 위해 삽질 아닌 삽질을 했던 경험이 있습니다만, 이러한 문제는 특정 패턴을 가진 필드를 제외하기 위해 Prune 필터를 활용하면 효과적으로 처리할 수 있습니다.
실제로 로그 데이터를 전처리하는 과정에서 꼭 Logstash를 사용해야 하는 것은 아닙니다. Elasticsearch의 버전이 점차 향상됨에 따라, 이제는 Logstash를 사용하는 대신 Elastic Ingest Node를 설정하여 유용하게 활용할 수 있습니다.
Logstash와 Elastic Ingest Node 구성의 성능 차이에 관해서 궁금하다면, 구글에서 "Logstash vs Ingest Node"라는 키워드로 검색을 진행하면 다양한 비교 자료를 쉽게 찾아볼 수 있을 것입니다. 이를 통해 각각의 장단점과 특정 환경에서 어떤 솔루션이 더 적합한지에 대해 보다 깊이 있는 이해가 가능할 것입니다.
이번 포스팅에서는 방대한 보안 로그 이벤트를 처리하는 데 있어서 Logstash의 사용법 및 활용 사례에 대해 정리한 내용을 공유했습니다. 내용이 충분하지 않을 수 있으나, 이 작은 글이 누군가에게는 실질적인 도움을 줄 수 있기를 바랍니다. 앞으로 이러한 주제와 관련된 다양한 의견과 경험이 서로 공유되기를 기대합니다.
댓글