호돌찌의 AI 연구소
article thumbnail

이전 글에서는 Collection을 생성하는 부분에 대해 짧게 다루었습니다.

2023.10.10 - [AI/Vector Database] - [Vector DB] 3. Milvus 튜토리얼 (1) - 설치, 변수 정의, Collection 생성하기

이번 글에서는 샘플 텍스트 데이터를 임베딩하고 insert를 수행하고 적재가 제대로 되어있는지 확인하는 과정까지 다루어 보겠습니다.

 

샘플 데이터 준비


Milvus에 삽입할 데이터로는 과거 뉴스 데이터들을 활용할 예정입니다. 여기 문서 분류 application에 사용되는 샘플 데이터를 아래 명령어로 다운로드합니다.

wget --no-check-certificate 'https://docs.google.com/uc?export=download&id=1Lg2jL89n3lqkKCulAnk4WwmI8G1hNfIA' -O BalancedNewsCorpusShuffled.zip
unzip BalancedNewsCorpusShuffled.zip

 

그다음 필요한 라이브러리들을 부르고 조금의 전처리를 수행합니다. 

import torch
from torch.utils.data import Dataset, DataLoader
from sentence_transformers import SentenceTransformer
import pandas as pd
import numpy as np
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility
from tqdm.auto import tqdm
import re


def clean(text:str):
    # https://github.com/YongWookHa/kor-text-preprocess/blob/master/src/clean.py
    not_used = re.compile('[^ .?!/@$%~|0-9|ㄱ-ㅣ가-힣]+')
    dup_space = re.compile('[ \t]+')  # white space duplicate
    dup_stop = re.compile('[\.]+')  # full stop duplicate

    cleaned = not_used.sub('', text.strip()) 
    cleaned = dup_space.sub(' ', cleaned)
    cleaned = dup_stop.sub('.', cleaned) 

    return cleaned

df = pd.read_csv('./BalancedNewsCorpus_train.csv')
df['News'] = df['News'].apply(lambda text: text.replace('<p>', '\n').replace('</p>', '\n'))
df['News'] = df['News'].apply(clean)
df

 

아래와 같이 9,000개의 뉴스 텍스트 데이터를 임베딩하겠습니다.

 

 

Collection 생성


이전 글에서 선언하는 Parameter에 대해서는 자세히 설명을 하였습니다. 실제로 collection을 생성할 때 Pandas DataFrame과의 dtype이 일치해야 추후에 insert가 됩니다. (참고로 Pandas DataFrame으로 구성하고 insert를 하는 것은 필수가 아닙니다.)

DATASET = 'BalancedNewsCorpus'  
MODEL = 'snunlp/KR-SBERT-V40K-klueNLI-augSTS'  
COLLECTION_NAME = 'BalancedNewsCorpus_db'  # Collection name
DIMENSION = 768  # Embeddings size
MILVUS_HOST = 'IP HOST 입력' # 'localhost'
MILVUS_PORT = 'IP PORT 입력' # '8888'
INDEX_TYPE = "IVF_FLAT"


connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)

if utility.has_collection(COLLECTION_NAME):
    utility.drop_collection(COLLECTION_NAME)
    
    
## 가지고 있는 샘플데이터의 dtype과 맞게, collection의 필드 정보들을 선언
EMBEDDING_FIELD_NAME ='News_embedding'
field2args = {
    'file_id': {'dtype': DataType.INT64, 'is_primary':True, 'auto_id':True},
    'filename': {'dtype': DataType.VARCHAR, 'max_length': 20},
    'date': {'dtype': DataType.INT64, 'max_length': 10},
    'NewsPaper': {'dtype': DataType.VARCHAR, 'max_length': 100},
    'Topic': {'dtype': DataType.VARCHAR, 'max_length': 100},
    'News': {'dtype': DataType.VARCHAR, 'max_length': 30000},
    EMBEDDING_FIELD_NAME: {'dtype': DataType.FLOAT_VECTOR, 'dim': DIMENSION},
}

# Milvus 연결을 설정합니다. 작업하기 전에 반드시 Milvus 서버에 연결해야 합니다.
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT)

if utility.has_collection(COLLECTION_NAME):
    utility.drop_collection(COLLECTION_NAME)

# 스키마 정의
fields = [
    FieldSchema(name = field_name, **args)
    for field_name, args in field2args.items()
]

schema = CollectionSchema(fields=fields)
collection = Collection(name=COLLECTION_NAME, schema=schema)

# collection에 이미 인덱스 유무 파악
if collection.has_index():
    try:
        collection.drop_index()
    except:
        collection.release()
        collection.drop_index()

# index 정의하기
index_params = {
    'metric_type': 'L2',         # "L2", "IP", "COSINE" 
    'index_type': INDEX_TYPE,    # "IVF_FLAT","IVF_SQ8", "IVF_PQ", "HNSW", "ANNOY"
    'params': {"nlist": 1},
}

collection.create_index(field_name=EMBEDDING_FIELD_NAME, index_params=index_params)
collection.load()
collection

 

Collection을 조회하면 다음과 같이 확인 가능합니다.

 

실제로 여기까지 run을 하신다면, Collection 이 생성된 것을 attu 위에서도 확인이 가능합니다. (attu는 아래에서 자세히 설명합니다.)

 

임베딩 모델 적용


임베딩 모델은 snunlp/KR-SBERT-V40K-klueNLI-augSTS 로 선정하였습니다. 9,000개의 News 텍스트들을 Sentence Transformer을 활용하여 임베딩 수행합니다.

embedder = SentenceTransformer(MODEL)
texts = df['News'].tolist()
embeddings = embedder.encode(texts, show_progress_bar=True, normalize_embeddings=True)

 

Insert 수행하기


insert를 수행하는 방식은 다양합니다. 가장 간단하게 insert를 하는 방식은 collection이 생성이 되어있다는 가정하에 nested list를 만들어 공식 문서대로 insert()를 진행하면 됩니다. 이 글에서는 Custom으로 Dataset을 구성하여 내가 원하는 형태의 column명으로 바꾸면서, 기존 column과 임베딩 필드를 한꺼번에 insert를 하는 방식으로 접근해 보겠습니다. 

# 만약에 DataFrame과 VectorDB에 적재할 Column이 다른 경우에 이와 같은 방식을 고려해볼 수 있음
column2field = {
    'filename':'filename',
    'date':'date',
    'NewsPaper':'NewsPaper',
    'Topic':'Topic',
    'News':'News',
}

field2column = {v:k for k,v in column2field.items()}


class CustomDataset(Dataset):
    
    def __init__(self, df, embeddings, field2column, embedding_field):
        self.df = df
        self.embeddings = embeddings
        self.field2column = field2column
        self.embedding_field = embedding_field
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        items = {field: self.df.iloc[idx][column] for field, column in self.field2column.items()}
        items[self.embedding_field] = self.embeddings[idx].tolist()
        return items
    
dataset = CustomDataset(df, embeddings, field2column, EMBEDDING_FIELD_NAME)

 

여기서 dataset을 insert를 수행하고 flush를 수행합니다. flush()는 삽입한 데이터를 묶는 작업을 뜻합니다. index 검색을 하기 위해 이 작업을 거쳐야 하고, 추후에 upsert(DB에서 update와 비슷한 개념)을 하는 데 있어서도 flush()는 해주셔야 합니다. 

# insert 수행하기
for data in tqdm(iter(dataset), total=len(dataset)):
    collection.insert(data)
    
collection.flush()
collection.num_entities # 9000, 삽입된 엔티티의 수를 나타낸다.

 

Attu에서 Vector DB 확인하기


아래 그림은 Milvus를 관리할 수 있는 GUI 인 'Attu' 메인 화면입니다. (Attu 설치 또한 docker 위에서 install을 수행합니다. 설치방법은 여기를 참고해 주세요. 버전이 낮을 때에는 버그 같은 것들이 많았었는데, 최근에 개선이 된 것 같습니다. 제가 사용하는 attu 버전은 v2.2.7 입니다.)

Collection을 생성하면 하나의 창이 생기고 insert를 수행하면 entity 값들이 채워지는 구조입니다. 샘플 데이터 9,000개가 들어가 있는 것을 확인할 수 있습니다. 

 

실제로 위에서 선언한 Collection에서 스키마들을 확인할 수 있습니다. 

 

실제로 insert 된 모습입니다. 메타 정보인 file_id부터 News까지 적재가 되어있고, 그 옆에 768차원 임베딩 값들이 붙어있습니다. 

 

다음 글에서는 적재되어 있는 DB에 대해 쿼리 임베딩을 통해 검색하는 내용을 다루겠습니다. 

 


아래는 블로그 주인장의 토스 익명 후원 링크입니다. 글이 도움 되거나 흡족스러웠다면 후원해 주시면 감사하겠습니다.

https://toss.me/hotorch

 

hotorch님에게 보내주세요

토스아이디로 안전하게 익명 송금하세요.

toss.me

 

 

 

profile

호돌찌의 AI 연구소

@hotorch's AI Labs

포스팅이 도움이 되셨다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!