Node.js

[NodeJS] node-schedule을 이용한 elasticsearch 인덱스 생성

SongMinu 2022. 1. 6. 15:36
728x90

 

지금 작성하는 글은

https://minu0807.tistory.com/117

 

[NodeJS] ElasticSearch API getTemplate(index_template)

인덱스 템플릿(index_template)? 인덱스를 생성할 때 맵핑 정보를 입력해서 생성해야 한다. 맵핑 정보 입력 없이 그냥 생성하고 데이터를 넣으면 필드 맵핑이 이상하게 만들어져서 데이터 조회나, 어

minu0807.tistory.com

이 글과 연관된 글이기도 하다.

 

위 글에 언급한 연도 단위로 인덱스를 사용하면서 발생한 문제를 해결하고자 node-schedule을 이용했다.

 

로직 구조는 간단하다.

1. 매일 00시00분01초에 moment를 이용해 다음날 날짜를 뽑는다. (2021-01-03일에서 2021-01-04일이 되면 2021-01-05 날짜를 뽑음)

   날짜가 바뀐 당일 기준으로 생성하면 그 날짜가 넘어가는 간발의 차에 데이터 생성중에 이슈가 생기는 경우가 좀 있었다.

   그래서 날짜가 바뀐 당일 기준이 아닌 하루를 더해서 미리 생성해놓자는 생각으로 이렇게 했다.

   그리고 하루 단위로 한 이유는 대용량 인덱스 중에 연도가 바뀐다고 바로 해당 연도의 데이터가 생성 안 되는 인덱스들도 좀 있어서 조회 중 문제가 생기는 걸 방지하고자 했다.

2. 그 후 그 날짜의 연도를 가지고 대용량 인덱스의 존재 여부를 판단한다.

3. 있으면 그냥 패스하고, 없으면 해당 인덱스를 생성한다.

추가로 이 로직 처리는 worker_thread를 이용했다. (별도로 분리해서 로직이 작동하도록 하고 싶었음)

 

내부적으로 사용하고 있는 대용량 인덱스 템플릿명들이 tpl_ts_data_인덱스명 형식으로 구성되어 있고,

대용량이 아닌 인덱스들은 tpl_ts_인덱스명 형식으로 구성되어 있다.

 

위 링크의 API를 이용해 tpl_ts_data_* 로 리스트를 뽑고 연도를 합치면 인덱스명+년도가 되기 때문에 존재 여부를 판단할 수 있다.

 

node-schedule과 worker_thread 부분 소스

const schedule = require('node-schedule');
const worker = require('worker_threads');

let isIndexCheck = false;
if (!isIndexCheck) {
  schedule.scheduleJob('1 0 0 * * *', async function () {
    const wk = new worker.Worker('./routes/worker/checkIndexJob.js');
    wk.on('message', (msg) => {
      log.info(msg);
    })
    wk.on('exit', () => {
      isIndexCheck = true;
      log.info('Index exists check End...');
    })
  });
}

(소스는 필요한 부분만 옮겼기 때문에 생략된 부분이 많습니다.)

isIndexCheck는 동시에 실행되는 걸 방지하고자 넣은 건 데, 하루에 한 번 도는 거라 그럴 일은 없겠지만... 혹시나 하는 마음에 내가 마음이 편해서 넣었다.

실행시키는 부분은 이거 말곤 없다.

 

처리하는 소스

const moment = require('moment');
const elastic = require('elasticsearch');
const { parentPort } = require('worker_threads');

let ES_CLIENT = new elastic.Client({ hosts: es_hosts, apiVersion: '6.8', requestTimeout: 120 * 1000 });

async function run () {
  const index_list = await getTemplateList();
  if (index_list.error == false) {
    let netxDate = moment().add(1, 'days').format('YYYY-MM-DD'); 
    await check_index(index_list.result, netxDate.substr(0, 4)); 
  } else {
    log.info('체크할 인덱스 정보가 템플릿에 없습니다.');
  }
}
run();

//템플릿 리스트 가져오기
async function getTemplateList() {
  let rt = {
    error: false,
    result: []
  };
  try {
    const rs = await ES_CLIENT.indices.getTemplate({
      name:'tpl_ts_data_*'
    })                   
    let keys = Object.keys(rs);
    for (let key of keys) {
      let idx_name = makeIndexName(rs[key].index_patterns[0]);
      rt.result.push(idx_name);
    }
  } catch (err) {
    rt.error = true;
    log.error('인덱스 체크 - 템플릿 리스트 가져오기 실패');
    log.error(err);
  }
  return rt;
}

//템플릿에서 뽑은 인덱스 패턴 다듬기
function makeIndexName(data) {
  return data.replace(/^[*]|[-*]/gm, '');
}

//인덱스 여부 확인하기
async function check_index(idx_arr, yyyy) {
  try {
    let rs = null;
    for (let idx of idx_arr) {
      let idx_name = `${idx}-${yyyy}`;
      rs = await ES_CLIENT.indices.exists({
        index: idx_name
      })
      if (!rs) {
        //인덱스 없으면 생성
        await make_index(idx_name);
      }
    }
  } catch (err) {
    log.error('인덱스 존재 여부 체크 실패');
    log.error(err);
  }
}

//인덱스 생성하기
async function make_index(idx) {
  try {
    const rs = await ES_CLIENT.indices.create({
      index: idx
    })
    if (rs.acknowledged === true) {
      parentPort.postMessage(`Created index : ${idx}`);
    }
  } catch (err) {
    log.error('인덱스 생성 실패');
    log.error(err);
  }
}

(소스는 필요한 부분만 옮겼기 때문에 생략된 부분이 많습니다.)

크게 어려운 부분은 없다.

 

elasticsearch API를 이용해 원하는(대용량 인덱스) 템플릿 리스트를 가져오고,

각 템플릿에 입력된 index_patterns를 인덱스 형식으로 다듬어 주고,

전달받은 연도를 붙이고,

인덱스 존재 여부를 판단한다.

이후에 없으면 생성을, 없으면 끝이다.

 

작년에 이걸 좀 미리 만들어 놨어야 했는데.....

 

 

 

반응형