지금 작성하는 글은
https://minu0807.tistory.com/117
이 글과 연관된 글이기도 하다.
위 글에 언급한 연도 단위로 인덱스를 사용하면서 발생한 문제를 해결하고자 node-schedule을 이용했다.
로직 구조는 간단하다.
1. 매일 00시00분01초에 moment를 이용해 다음날 날짜를 뽑는다. (2021-01-03일에서 2021-01-04일이 되면 2021-01-05 날짜를 뽑음)
날짜가 바뀐 당일 기준으로 생성하면 그 날짜가 넘어가는 간발의 차에 데이터 생성중에 이슈가 생기는 경우가 좀 있었다.
그래서 날짜가 바뀐 당일 기준이 아닌 하루를 더해서 미리 생성해놓자는 생각으로 이렇게 했다.
그리고 하루 단위로 한 이유는 대용량 인덱스 중에 연도가 바뀐다고 바로 해당 연도의 데이터가 생성 안 되는 인덱스들도 좀 있어서 조회 중 문제가 생기는 걸 방지하고자 했다.
2. 그 후 그 날짜의 연도를 가지고 대용량 인덱스의 존재 여부를 판단한다.
3. 있으면 그냥 패스하고, 없으면 해당 인덱스를 생성한다.
추가로 이 로직 처리는 worker_thread를 이용했다. (별도로 분리해서 로직이 작동하도록 하고 싶었음)
내부적으로 사용하고 있는 대용량 인덱스 템플릿명들이 tpl_ts_data_인덱스명 형식으로 구성되어 있고,
대용량이 아닌 인덱스들은 tpl_ts_인덱스명 형식으로 구성되어 있다.
위 링크의 API를 이용해 tpl_ts_data_* 로 리스트를 뽑고 연도를 합치면 인덱스명+년도가 되기 때문에 존재 여부를 판단할 수 있다.
node-schedule과 worker_thread 부분 소스
const schedule = require('node-schedule');
const worker = require('worker_threads');
let isIndexCheck = false;
if (!isIndexCheck) {
schedule.scheduleJob('1 0 0 * * *', async function () {
const wk = new worker.Worker('./routes/worker/checkIndexJob.js');
wk.on('message', (msg) => {
log.info(msg);
})
wk.on('exit', () => {
isIndexCheck = true;
log.info('Index exists check End...');
})
});
}
(소스는 필요한 부분만 옮겼기 때문에 생략된 부분이 많습니다.)
isIndexCheck는 동시에 실행되는 걸 방지하고자 넣은 건 데, 하루에 한 번 도는 거라 그럴 일은 없겠지만... 혹시나 하는 마음에 내가 마음이 편해서 넣었다.
실행시키는 부분은 이거 말곤 없다.
처리하는 소스
const moment = require('moment');
const elastic = require('elasticsearch');
const { parentPort } = require('worker_threads');
let ES_CLIENT = new elastic.Client({ hosts: es_hosts, apiVersion: '6.8', requestTimeout: 120 * 1000 });
async function run () {
const index_list = await getTemplateList();
if (index_list.error == false) {
let netxDate = moment().add(1, 'days').format('YYYY-MM-DD');
await check_index(index_list.result, netxDate.substr(0, 4));
} else {
log.info('체크할 인덱스 정보가 템플릿에 없습니다.');
}
}
run();
//템플릿 리스트 가져오기
async function getTemplateList() {
let rt = {
error: false,
result: []
};
try {
const rs = await ES_CLIENT.indices.getTemplate({
name:'tpl_ts_data_*'
})
let keys = Object.keys(rs);
for (let key of keys) {
let idx_name = makeIndexName(rs[key].index_patterns[0]);
rt.result.push(idx_name);
}
} catch (err) {
rt.error = true;
log.error('인덱스 체크 - 템플릿 리스트 가져오기 실패');
log.error(err);
}
return rt;
}
//템플릿에서 뽑은 인덱스 패턴 다듬기
function makeIndexName(data) {
return data.replace(/^[*]|[-*]/gm, '');
}
//인덱스 여부 확인하기
async function check_index(idx_arr, yyyy) {
try {
let rs = null;
for (let idx of idx_arr) {
let idx_name = `${idx}-${yyyy}`;
rs = await ES_CLIENT.indices.exists({
index: idx_name
})
if (!rs) {
//인덱스 없으면 생성
await make_index(idx_name);
}
}
} catch (err) {
log.error('인덱스 존재 여부 체크 실패');
log.error(err);
}
}
//인덱스 생성하기
async function make_index(idx) {
try {
const rs = await ES_CLIENT.indices.create({
index: idx
})
if (rs.acknowledged === true) {
parentPort.postMessage(`Created index : ${idx}`);
}
} catch (err) {
log.error('인덱스 생성 실패');
log.error(err);
}
}
(소스는 필요한 부분만 옮겼기 때문에 생략된 부분이 많습니다.)
크게 어려운 부분은 없다.
elasticsearch API를 이용해 원하는(대용량 인덱스) 템플릿 리스트를 가져오고,
각 템플릿에 입력된 index_patterns를 인덱스 형식으로 다듬어 주고,
전달받은 연도를 붙이고,
인덱스 존재 여부를 판단한다.
이후에 없으면 생성을, 없으면 끝이다.
작년에 이걸 좀 미리 만들어 놨어야 했는데.....
'Node.js' 카테고리의 다른 글
[NodeJS] Error: Unable to load PFX certificate 에러 (0) | 2022.02.23 |
---|---|
[NodeJS] ElasticSearchAPI putMapping 중 에러 (3) | 2022.01.22 |
[NodeJS] ElasticSearch API getTemplate(index_template) (0) | 2022.01.03 |
[NodeJS] bulk 데이터 파일 읽어서 ElasticSearch에 등록하기 (2) | 2021.11.25 |
[NodeJS] ElasticSearch search 결과로 bulk 파일 만들기 (0) | 2021.11.24 |