본문 바로가기

Logstash Filter 전처리 활용 사례

알찬IT 2025. 4. 8.
반응형

이번 시간에는 Logstash Filter 사용법과 실제 실무에서 활용 사례를 중심으로 방대한 보안 로그 데이터를 처리하는 방법을 설명하고자 합니다. 여러분이 보안 로그 데이터의 전처리 및 저장을 효율적으로 설계하기 위한 실질적인 사례를 제공받길 바랍니다.

 

현재 저희 회사 보안팀에서는 Logstash를 활용하여 보안 로그 데이터를 전처리한 뒤 Elastic Stack과 연동해 사용 중입니다. Logstash는 모든 서비스의 보안 이벤트 로그(방화벽, 침입 방지 시스템(IPS), 웹 애플리케이션 방화벽(WAF) 등)를 효율적으로 처리하고 있으며, 최종적으로 Elastic Stack에 데이터를 저장함으로써 분석 및 활용이 가능하도록 구성하고 있습니다.

 

logstash filter 활용 사례 Elasticsearch
Logstash Filter Elasticsearch 활용 사례

 

일평균 처리량은 Elastic 인덱스 기준으로 하루 약 6억 건 ~ 7억건에 이르는 엄청난 데이터를 처리하고 있으며, 이는 문서 엘라스틱에 저장되는 문서(Document) 수 기준입니다. 데이터 저장 전략을 최적화하기 위해 Logstash 필터 플러그인을 활용해 꼭 필요한 데이터만 골라내어 저장하고 있습니다. 이를 통해 실제 하루 평균 저장되는 데이터 용량은 약 60GB에서 75GB 정도로 유지되고 있습니다. 아래는 Logstash 필터를 통한 실무 활용 사례이니 참고하시면 도움이 될 것입니다.

 

Logstash Drop 필터 플러그인 적용 사례

 

- 보안 이벤트 로그 중 `deviceCustomString1` 필드의 키워드가 "SCAN" 또는 "ATTACKIP"로 시작하는 경우, 해당 로그는 전부 폐기(drop) 처리하여 저장하지 않습니다.

 

- `destinationAddress` 필드 값이 특정 IP 주소(예: 20.xx.xx.xx 등)인 경우에는 해당 로그도 폐기됩니다.

- `service` 필드 값이 "PING"인 경우에는 데이터를 저장하지 않고 폐기됩니다.

- `applicationProtocol` 필드 값이 "NTP"나 "ICMP"인 경우에 해당하는 로그 역시 폐기 처리됩니다.

 

 if [deviceCustomString1] =~ /^SCAN+/ or [deviceCustomString1] =~ /^ATTACKIP+/ {
     drop {}
  }
     
  
 if [destinationAddress] =~ "x.xx.xx.10" or [destinationAddress] =~ "xx.xx.xx.33" or [destinationAddress] =~ "x5.x.xx.x" 
  or [destinationPort] =~ "2048" or [applicationProtocol] =~ "ICMP" or [service] =~ "PING" or [name] =~ "Notable Characteristics of the analyzed sample" 
  or [name] =~ "Deny List updated" or [name] =~ "Evidence" or [applicationProtocol] =~ "ntp" or [service] =~ "NTP" or [sourceAddress] =~ "192.168.0.x" or 
  [sourceAddress] =~ "x.x.x.x" or [sourceAddress] =~ "xx.xxx.x.xx" or [name] =~ /^PHP DIESCAN+/ or [name] =~ /^NETGEAR+/ {
    drop {}
 }

 

 

Logstash  Mutate 필터 플러그인 사례

 

- 필드명 변경(rename 옵션):

 

특정 로그에서 필드명을 업무에 맞게 대체하는 것이 가능합니다. 예를 들어, `deviceHostName` 필드에서 키워드가 "ZONE01-FW"로 시작하는 방화벽 로그의 경우에는 필드명을 아래와 같이 변경합니다:

 

- `name` → `csname`

 

- `severity` → `csseverity`

 

- `deviceCustomString6` → `threatAction`

 

 

  if [deviceHostName] =~ /^CLOUD-FW+/ {
    mutate {
        rename => { "name" => "csname" }
        rename => { "deviceCustomString1" => "policyName" }
        rename => { "applicationProtocol" => "csapplicationProtocol" }
        rename => { "severity" => "csseverity" }
   }
  }
  
if [deviceHostName] =~ "WAF001" {
    mutate {
        rename => { "deviceCustomString1" => "title" }
        rename => { "deviceCustomString6" => "threatAction" }
        rename => { "deviceCustomString3" => "domainName" }
        rename => { "severity" => "hostSeverity" }
		rename => { "deviceEventCategory" => "threatType" }
   }
 }

 

이렇게 변경된 필드는 Elastic Stack에 저장될 때 실제로 새로운 필드명으로 반영됩니다. 이 방법은 대규모 데이터엔 특히 유용하며, 규칙적으로 변경된 데이터로 보다 체계적이고 간편한 관리를 제공합니다.

 

- 필드 값 변환(replace 옵션):

 

특정 필드 값의 내용을 정해진 문자열로 대체 변경할 수 있습니다. 예를 들면, `hostSeverity` 값이 2 또는 3인 경우 이를 `Low-Minor`로 변환하여 저장합니다. 이를 통해 로그 데이터를 보다 직관적이고 일관적으로 구성할 수 있습니다.

 

  if [hostSeverity] == "2" or [hostSeverity] == "3" {
      mutate {
      replace => { "[hostSeverity]" => "Low-Minor" }
    }
  }

 if [hostSeverity] == "4" or [hostSeverity] == "5" or [hostSeverity] == "6" or [hostSeverity] == "7" {
      mutate {
      replace => { "[hostSeverity]" => "Medium-Major" }
    }
  }

  if [hostSeverity] == "8" or [hostSeverity] == "9" or [hostSeverity] == "10" {
      mutate {
      replace => { "[hostSeverity]" => "High-Critical" }
    }
  }

 

- 필드 삭제(remove_field 옵션):

 

불필요한 필드를 완전히 삭제하여 저장 공간을 최적화합니다. 예를 들어 `"PaloDGl2"`, `"PaloDGl4"`, `"PaloDstUUID"`, `"PaloPacketsReceived"` 등의 필드는 저장 대상에서 제거됩니다.

 

mutate {
    remove_field => [ "msg", "message", "deviceReceiptTime", "deviceVersion", "deviceProcessName", "cefVersion", "type", "sourceUserName", "deviceEventCategory", "host", "port", "deviceDirection", "generatorID", "bytesIn", "bytesOut", "PanOSDGl2", "PanOSDGl4", "PanOSDstUUID", "PanOSPacketsReceived", "PanOSParentSessionID", "PanOSParentStartTime", "deviceExternalId", "eventId", "externalId" ]
 }

 

Logstash  Prune 필터 플러그인 활용

 

Prune 필터는 이벤트 로그에서 특정 필드를 이름 또는 값에 기반하여 삭제하거나 유지할 수 있는 기능을 제공합니다. 이때 화이트 리스트 또는 블랙 리스트 방식이 사용되며, 정규 표현식을 활용해 세부 조건도 설정 가능합니다.

 

  prune {
    blacklist_names => [ "^*[\\]+", "^*[/]+", "^*[..]+", "^ad.+", "^A.+", "^B.+", "^AK1+", "^SH2+", "^AK+", "^agent+", "^deviceEvent+", "^file+", "^cfg+", "^category+", "^flex+", "^xau+", "^*ZoneURI$", "^*boundInterface$", "^*.TimeZone$" ]
  }

 

보안 장비 로그를 수집하는 과정에서 간혹 이전에 본 적 없는 형태의 필드명이 들어오는 경우가 발생합니다. 이를 해결하기 위해 삽질 아닌 삽질을 했던 경험이 있습니다만, 이러한 문제는 특정 패턴을 가진 필드를 제외하기 위해 Prune 필터를 활용하면 효과적으로 처리할 수 있습니다.

 

실제로 로그 데이터를 전처리하는 과정에서 꼭 Logstash를 사용해야 하는 것은 아닙니다. Elasticsearch의 버전이 점차 향상됨에 따라, 이제는 Logstash를 사용하는 대신 Elastic Ingest Node를 설정하여 유용하게 활용할 수 있습니다.

 

Logstash와 Elastic Ingest Node 구성의 성능 차이에 관해서 궁금하다면, 구글에서 "Logstash vs Ingest Node"라는 키워드로 검색을 진행하면 다양한 비교 자료를 쉽게 찾아볼 수 있을 것입니다. 이를 통해 각각의 장단점과 특정 환경에서 어떤 솔루션이 더 적합한지에 대해 보다 깊이 있는 이해가 가능할 것입니다.

 

이번 포스팅에서는 방대한 보안 로그 이벤트를 처리하는 데 있어서 Logstash의 사용법 및 활용 사례에 대해 정리한 내용을 공유했습니다. 내용이 충분하지 않을 수 있으나, 이 작은 글이 누군가에게는 실질적인 도움을 줄 수 있기를 바랍니다. 앞으로 이러한 주제와 관련된 다양한 의견과 경험이 서로 공유되기를 기대합니다.

 

반응형

댓글