Node.js

[NodeJS] ElasticSearch bulk

SongMinu 2021. 11. 23. 08:38
728x90

elasticsearch 7.x 기준

 try {
    let bulk = [];
    for(let item of files_info) {
      const fileName = item.originalname;
      const fileSize = item.size;
      const file_mk_dt = moment().format('YYYY-MM-DD HH:mm:ss');
      const fileContent = item.buffer.toString('base64');
      const data = {
        file_name : fileName,
        file_size : fileSize,
        file_content : fileContent,
        file_mk_dt : file_mk_dt
      }
      bulk.push({"index": {"_index": index_name, "_type": "_doc"}});
      bulk.push(data);
    }
    let rs;
    if (bulk.length > 0) {
      rs = await es_client.bulk({
        body: bulk,
        refresh: 'wait_for'
      })
      Log.info(rs);
    }
    rt.error = false;
    rt.result = rs;
  } catch (err) {
    Log.error(err);
    rt.error = true;
    rt.msg = 'err';
    rt.result = err.message;
  }

for문을 돌려서 배열에 인덱스 정보, 데이터를 푸시해준 뒤 bulk 해주면 됨. (map을 쓰는 것도 나쁘지 않을 것 같음)

 

데이터가 너무 많을 경우에는 스크롤 검색과 while문을 사용해 데이터를 모두 뽑으면서 bulk 데이터를 만들 수 도 있음.

let rs = await ES_CLIENT.search({
  index: idx_threat + data._datetime.substring(0, 4),
  type: 'doc',
  scroll: '3m',
  body: query
})

while(rs.hits.hits.length > 0) {
  rs.hits.hits.map( (doc) => {
    bulk.push({update:{_index: idx_threat + doc._source._datetime.substring(0,4), _type: 'doc', _id: doc._id}});
    bulk.push({doc: {TW_STATE: '3'}});
  })
  rs = await ES_CLIENT.scroll({ scrollId: rs._scroll_id, scroll: '3m' });
}

특정 인덱스에서 쿼리에 해당하는 데이터의 개수가 많아 스크롤 검색을 사용했고,

while문으로 데이터를 끝까지 뽑으면서 데이터를 수정하는 bulk 데이터를 만드는 로직.

반응형