InTen

Logstash JDBC 로 DB 연동 + 멀티 파이프 라인 본문

프로그래밍/그외

Logstash JDBC 로 DB 연동 + 멀티 파이프 라인

인텐 2024. 5. 20. 14:13

일단 로그스태쉬란 가볍게 말하면 데이터 수집기라고 볼 수 있다.

이 글에서 설명하는 건 SQL 쿼리 구문을 통해서 디비와 조인해서 데이터를 가져오거나 서버와 직접적인 연결을 통해서 Log데이터 등을 수집해서
Elastic서버에 데이터를 생성해서 넣어주는 데이터 수집 에이전트라고 생각하면 된다.

각 파이프를 통해서 쿼리 구문을 개별로 만들어서 Elastic 인덱스를 생성 가능하다.

 

일단 apt나 yum, dpkg 등을 통해 설치하면 
/usr/share/logstash/ 해당 경로로 설치가 되어 있을 것이다.

일단은 저는 /usr/share/logstash/conf를 파이프라인 설정 파일 경로로 설정할 것이기 때문에 아래와 같이 작성 후 재시작

vi logstash.yml

# 경로만 지정, 파일명은 pipelines.yml으로 생성
path.settings: '/usr/share/logstash/conf/'

 

그 후 파이프 라인 작성 설정 파일은 /usr/share/logstash/config 경로에
pipelines.yml 파일이다.

아래의 방법은 path.config 를 사용하여 파이프 설정 파일을 따로 두고 실행 시키는 방법이다.

conf.string: 명령어를 통해서 바로 명령어를 실행하는 플러그인도 존재한다. 아래의 플러그인 설명도 링크해 두겠습니다.
https://www.elastic.co/guide/en/logstash/current/logstash-settings-file.html

- pipeline.id: om1
  path.config: "/usr/share/logstash/config/om1-mapping.conf"
- pipeline.id: om2
  path.config: "/usr/share/logstash/config/om2-mapping.conf"

 

이제 대표적으로  om1-mapping.conf 파일에 대해서 알아보자

뭐라고 명시가 되어있는지 알아보겠다.

#om1-mapping.conf

input {
  jdbc {
    jdbc_driver_library => "${CUBRID_DRIVER_PATH}" 
    jdbc_driver_class => "${CUBRID_DRIVER_CLASS}" 
    jdbc_connection_string => "${CUBRID_GSND_CONNECTION_URL}" 
    jdbc_user => "${CUBRID_GSND_USER}" 
    jdbc_password => "${CUBRID_GSND_PASSWORD}" 
    tracking_column => "unix_ts_in_secs" 
    tracking_column_type => "numeric" 
    use_column_value => true
    schedule => "*/1 * * * *" 
    last_run_metadata_path => "/usr/share/logstash/data/last_run_jisik_gsnd.txt" 
    record_last_run => true
    statement => "select * ,UNIX_TIMESTAMP(CONCAT(mod_dt,mod_tm)) AS 
    unix_ts_in_secs from om1 where UNIX_TIMESTAMP(CONCAT(mod_dt,mod_tm)) 
    > :sql_last_value AND (CONCAT(mod_dt,mod_tm) < NOW()) ORDER BY unix_ts_in_secs"  
  }
}

filter {
  mutate {
    add_field => { "crt_dt_formatted" => "%{crt_dt}" }
  }

  date {
    match => ["crt_dt_formatted", "YYYY-MM-dd HH:mm"]
    target => "crt_dt_formatted" 
  }
  mutate {
    add_field => { "mod_dt_formatted" => "%{mod_dt}" }
  }
  date {
    match => ["mod_dt_formatted", "yyyy-MM-dd HH:mm"]
    target => "mod_dt_formatted" 
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "test-index" 
    user => "elastic" 
    password => "elastic" 
    document_id => "%{id}" 
  }
  stdout{}
}

 

먼저 input{} 플러그인 필드는 데이터를 받는 부분을 정의하는 필드이다.
위에서 해당 플러그인에서 jdbc 플러그인을 사용해서 데이터를 받아오지만 그거 외에도 여러 방식으로 데이터를 수집 할 수 있다.
https://www.elastic.co/guide/en/logstash/current/input-plugins.html
위의 URL에서 플러그인 종류를 확인 가능하다.
대표적으로 사용하는건 beats, jdbc, file, exec, elsaticsearch 정도가 있는것 같다.
input 플러그인에 대해 자세히 설명하는것은 너무 범위가 넓어서 해당 파일의 jdbc 플러그인에 대해서만 설명하겠다.


    jdbc_driver_library => "${CUBRID_DRIVER_PATH}" # jdbc connect library 각 연결할 디비 사이트에 들어가서 JDBC 드라이버 다운 받아서 서버에 드라이버 설치 후 해당 경로를 명시해 주면 됩니다. 
    jdbc_driver_class => "${CUBRID_DRIVER_CLASS}" # 로드 할 JDBC 드라이버 클래스를 명시 환경 변수 안에는 Java::cubrid.jdbc.driver.CUBRIDDriver 라는 문자열이 들어가 있다.
    jdbc_connection_string => "${CUBRID_GSND_CONNECTION_URL}" # 연결할 디비 경로 및 IP 환경 변수 안에는 
 "jdbc:cubrid:{IP}:{PORT}:{DB_NAME}:::?charset=utf-8" 라는 문자열이 들어가 있다.
    jdbc_user => "${CUBRID_GSND_USER}" # 해당 디비 유저
    jdbc_password => "${CUBRID_GSND_PASSWORD}" # 해당 디비 패스워드

 

참고로 환경 변수 명이 CUBRID인 것은 제가 디비를 cubrid로 사용해서 cubrid 변수 명인 것입니다.

${변수명} 은 해당 리눅스의 환경 변수를 불러오는 것이다.
statement 플러그인은 jdbc 설정으로 연결된 디비로 쿼리를 보내서 받은 결과 값을 가져오는 것이다. 안에는 디비로 보낼 쿼리 구문을 명시해주면 됩니다.
tracking_column 은 statement 플러그인을 통해 받은 결과 중에서 해당 하는 필드 중에 가져올 필드를 명시하는 것
tracking_column_type 은 해당 타입 명시 정의 가능한 속성 : numeric, timestamp
use_column_value 은 tracking_column 필드의 값을 :sql_last_value 값으로 사용한다는 것 입니다.
schedule 은 반복 시간
record_last_run 은 마지막 실행 기록 저장 여부
last_run_metadata_path 은 마지막 실행 기록 저장할 경로를 뜻한다.

filter {
  mutate {
    add_field => { "crt_dt_formatted" => "%{crt_dt}" }
  }

  date {
    match => ["crt_dt_formatted", "YYYY-MM-dd HH:mm"]
    target => "crt_dt_formatted" 
  }
  mutate {
    add_field => { "mod_dt_formatted" => "%{mod_dt}" }
  }
  date {
    match => ["mod_dt_formatted", "yyyy-MM-dd HH:mm"]
    target => "mod_dt_formatted" 
  }
}

 

filter 플러그인은 받은 데이터를 중개처리 해주는 플러그인 입니다.
데이터의 1차 가공이라고 생각하면 되겠습니다.
위에서는 mutate 와 date를 같이 사용 중입니다.
mutate -> add_field 를 통해 statement 를 통해 받은 데이터 필드 중에서
crt_dt 필드의 데이터를 crt_dt_formatted 라는 필드를 생성하고 해당 필드에 데이터를 넣어주는 구문 입니다.
date 필드는 해당 필드의 속성을 date 로 정의 하겠다는 것 입니다.
이렇게 하는 이유는 기본적으로 statement 통해 들어온 필드는 별 다른 명시가 없다면 모두 속성이 text 속성으로 들어오기 때문에
속성을 변환해주는 것 입니다.
match는 필드 구성 방식을 정의 target은 목표 필드 명입니다.

 

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "test-index" 
    user => "elastic" 
    password => "elastic" 
    document_id => "%{id}" 
  }
  stdout{}
}

 

output의 경우는 input -> filters 를 거친 event data들을
지정된 목적지로 보내는 플러그인 입니다.
csv, exec, email, db, elasticsearch 등 다양한 경로의 플러그인을 통해서 데이터를 발송 가능하다.
elasticsearch 플러그인을 사용해서 hosts에 서버 IP/port를 명시
user와 password까지 입력 하면 해당 서버와 연결이 가능하다.
index는 해당 conf로 인해 elasticsearch 만들어질 index(table) 명을 나타내면 된다.
document_id 는 statement 를 통해 받은 필드중에서 primary_key 값을 선택해주면 된다.
stdout{} 플러그인은 logstash를 실행 후 해당 파이브의 결과를 콘솔에 찍어주는 명령어이다.
이런 .conf 파일이 한개의 파이브 라인이 되는 것이다.

 

이렇게 작성 한 후에 로그 스태시 재시작을 해주시면 됩니다.

여기까지 DB JDBC 연동 + 멀티 파이프 라인 설정에 대해서 알아보았습니다.

 

Comments