728x90
반응형




Open LLM Model을 활용하여 Fine Tuning한 모델을 가지고 실제 비즈니스에 적용한 서비스를 만들기 위해
LLM을 테마로 하여 LLM 모델에 대한 개요와 원리 등을 공부하여 블로그로 연재하겠습니다.


먼저 본 글에서는 현재까지의 LLM 발전기에 대해 다뤄보겠습니다.


딥러닝 기반 언어모델의 등장

LLM(Large Language Model)이란? 인간이 활용하는 수많은 자연어를 학습한 딥러닝 기반의 언어모델입니다.



LLM은 딥러닝 기술을 통해 더욱 효율적이고 개선된 성능으로 자연어 처리 및 생성을 수행하기 때문에 위 사진과 같은 벤다이어그램으로 표현할 수 있습니다.


딥러닝의 자연어 처리 분야 적용

RNN(Recurrent Neural Network)이란? 순환신경망, 자연어 문장과 같은 시퀀스한 입력데이터를 받는 모델입니다. 입력으로 주어지는 시퀀스한 데이터의 순서가 달라질 경우, 모델이 의미를 다르게 해석하여 출력도 달라질 수 있습니다.



RNN의 개념부터 다른 이유는 RNN이 자연어 문장과 같은 시퀀스 데이터를 처리하기 위해 처음 등장한 모델이기 때문입니다. (1986년 등장)


위 그림에서 볼 수 있는 것처럼 RNN모델은 토큰(단어) 단위 현재 시점의 입력 정보와 과거 시점의 출력 정보 두 개의 정보를 입력하여 연속적으로 학습시킬 수 있습니다. 하지만 입력 정보의 길이가 길어짐에 따라 학습도 길어지게 되면 오래된 정보일수록 의미가 희미해지는 현상이 발견됩니다. (Vanishing Gradient) 그래서 1997년 아래와 같은 아키텍처의 모델이 등장하게 됩니다.


  • LSTM (Long Short Term Memory)




입력 정보가 길어질수록 과거 정보의 의미가 희미해지는 현상을 보완하기 위해 LSTM 아키텍처가 등장하게 됩니다.
위 그림에서 볼 수 있는 것처럼 LSTM 아키텍처는 각 노드마다 과거 시점의 입력 정보를 추가로 받고,
게이트 단에서 세 개의 입력정보 (과거 시점 입력정보/과거 시점 출력정보/현재 시점 입력정보)를 입력받아서 데이터 활용여부와 방법을 결정하게 됩니다.
자세한 내용은 추후 작성될 RNN글에서 다뤄보겠습니다.


그럼에도 불구하고 과거 시점의 의미가 희미해지는 현상이 완전히 해소되지 않아서 숙제로 남게됩니다.


딥러닝의 대두

2012년 이미지 인식 대회인 ImageNet에서 딥러닝 기술을 활용한 AlexNet 모델이 우승하였습니다.
당시에 AlexNet 모델은 기존 방식을 활용한 이미지 인식 모델의 오류율을 크게 개선하였는데요.
뿐만 아니라 해당 모델을 특정 문제를 해결하는데 뿐만 아니라 다른 문제를 해결하는 데에도 범용적으로 활용할 수 있어서
해당 사건을 계기로 딥러닝 기술에 대한 사람들의 관심이 다시 높아졌습니다.


AlexNet 모델은 대표적은 CNN(Convolutional Neural Network) 모델입니다. 다음에 기회가 된다면 상세히 살펴보겠습니다.


Word2Vec 기술 등장

2013년, 구글의 연구원들은 Word2Vec 이라는 기술을 개발하게 됩니다.
Word2Vec 기술은 단어를 실수벡터 형태로 변환하는 기술입니다. 그리고 실수벡터 형태로 변환하는 과정을 Embedding이라고 합니다.




  • 등장 배경

컴퓨터가 이해할 수 있도록 자연어를 숫자로 변환하는 방식인 One-Hot 인코딩은 표현하고자 하는 단어의 인덱스를 1로, 나머지 단어의 인덱스는 0으로 세팅하는 방식입니다. 하지만 해당 방식은 단어 간의 유사성을 확인할 수 없다는 단점이 있었습니다.
이러한 단점을 보완하기 위해 Word2Vec 기술이 등장하게 됩니다.


  • 유사성을 벡터화하는 방법

단어간의 유사성 정보를 어떻게 벡터에 담을 수 있을까요?
Word2Vec을 관통하는 핵심 개념은 "비슷한 문맥에서 등장하는 단어들은 비슷한 의미를 가진다." 입니다.
강아지라는 단어는 귀엽다, 예쁘다 등의 단어가 주로 함께 등장하는 것처럼요.


위 개념을 기반으로 하는 분포 가설이라는 것을 정의하고,
정의한 분포 가설을 활용하여 학습한 모델에 자연어를 입력하면,
해당 자연어는 유사성 정보가 담긴 벡터로 변환됩니다.


자세한 내용은 👉Embedding글에서 참고 바랍니다.


Transformer 모델의 등장

2017년, 자연어 처리 분야의 딥러닝 연구가 지속되고 있는 중에 구글의 연구진은 "Attention is All you need"라는 논문을 통해 Transforemer 모델 아키텍처를 공개하게 됩니다. 해당 모델은 학습 텍스트가 길어질수록 과거의 정보가 희미해지는 RNN의 고질적인 문제를 획기적으로 해결하였습니다.
RNN의 순차 처리방식을 버리고 맥락을 모두 참조하는 Attention 연산을 이용한 것입니다.


그리고 현재까지 해당 트랜스포머 모델을 기반으로 구글, 메타, OpenAI 등 주요 테크기업을 중심으로 여러 LLM 모델이 개발되거나 개발중이고,
OpenAI의 ChatGPT 서비스가 일반 사용자에게 공개됨에 따라 AI에 대한 대중의 관심이 크게 증가하여
다양한 분야에 LLM을 접목하는 시도가 많이 나타나고 있습니다.


트랜스포머 모델의 자세한 내용은 다음 글에서 다뤄보겠습니다.


Reference

반응형
728x90
반응형

본 글에서는 👉이전글에서 전처리된 데이터를 기반으로 선형회귀, 로지스틱 회귀, 의사결정나무 모델에 학습시키고 성능을 평가해볼 것입니다.

데이터 준비

  • 샘플데이터 : boston.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df = pd.read_csv("./data/boston.csv")
  1. 데이터 분리
    : 모델의 Overfitting(과적합)을 방지하기 위해 학습용, 평가용, 검증용(Option) 데이터를 나누는 과정입니다.
    Overfitting(과적합) : 학습용(train) 데이터에 과도하게 적합해져서 새로운 데이터에 대한 예측력이 떨어지는 현상
# Feature는 x, Target은 y 로 저장합니다.
x = df.drop(columns=['MEDV'])
y = df['MEDV']

# sklearn 패키키 증 train_test_split 함수 불러오기
from sklearn.model_selection import train_test_split

# Feature와 Target을 train, test 데이터 셋으로 나누기
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=42)



지도학습 회귀 - 선형회귀

  • 샘플데이터 : boston.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df = pd.read_csv("./data/boston.csv")
  1. 데이터 분리
# Feature는 x, Target은 y 로 저장합니다.
x = df.drop(columns=['MEDV'])
y = df['MEDV']

# sklearn 패키키 증 train_test_split 함수 불러오기
from sklearn.model_selection import train_test_split

# Feature와 Target을 train, test 데이터 셋으로 나누기
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=42)
  1. 단순선형회귀 (Linear Regression)
    : 1개의 feature를 기반으로 수치를 예측하는 모델, 👉Linear Regression글 참고
# 라이브러리 호출
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

# 모델 선언
model = LinearRegression()

# 학습하기
model.fit(x_train[['RM']], y_train)

# 예측하기
y_pred = model.predict(x_test)

# 평가하기
print("MSE : ", mean_squared_error(y_test, y_pred))
# MSE(Mean Squared Error) : 평균제곱오차, 예측값과 실제값과의 차이를 제곱한 것의 평균
# 모델의 성능과 MSE는 반비례

# 선형회귀 분석 결과 시각화
# 회귀계수 확인
w = model.coef_
b = model.intercept_
print("기울기 : ", w)    # 기울기
print("절편 : ", b)     # 절편

# 선형회귀식
x = x_test[['RM']]
y = w * x + b

# 시각화
import matplotlib.pyplot as plt
plt.scatter(x_test[['RM']], y_test)
plt.plot(x, y, color='r')
plt.xlabel('RM')
plt.ylabel('MEDV')

plt.show()

# 예측값 실제값 비교
# 실제값과 예측값을 비교하기 위해 데이터프레임으로 만들기
df_lr = pd.DataFrame()
df_lr['y_test'] = y_test
df_lr['y_pred'] = y_pred
df_lr.reset_index(drop=True, inplace=True)

# 시각화
plt.plot(df_lr['y_test'], label='Actual')
plt.plot(df_lr['y_pred'], label='Predicted')
plt.legend()
plt.show()
  • MSE (Mean Squared Error)
  1. 다중선형회귀 (Linear Regression)
    : 다수의 feature를 기반으로 수치를 예측하는 모델, 👉Linear Regression글 참고
# 라이브러리 호출
from sklearn.linear_model import LinearRegression

# 모델 선언
model2 = LinearRegression()

# 학습하기
model2.fit(x_train, y_train)

# 예측하기
y_pred = model2.predict(x_test)

# 평가하기
print("MSE : ", mean_squared_error(y_test, y_pred))

# 선형회귀 분석 결과 시각화
# 회귀계수 확인
w = model.coef_
b = model.intercept_
print("기울기 : ", w)    # 기울기
print("절편 : ", b)     # 절편

# 선형회귀식
formula = "MEDV = {:.2f}".format(b)
for i, feature_name in enumerate(x_test.columns):
    formula += " + {:.2f} * {}".format(w[i], feature_name)
print("선형회귀식: " + formula)

# 시각화
import matplotlib.pyplot as plt
plt.plot(y_test.values[:50], label='Actual')
plt.plot(y_pred[:50], label='Predicted')
plt.legend()
plt.show()



지도학습 분류

  • 샘플데이터 : wine.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df = pd.read_csv("./data/wine.csv")
  1. 데이터 분리
# Feature는 x, Target은 y 로 저장합니다.
x = df.drop(columns=['class'])
y = df['class']

# sklearn 패키키 증 train_test_split 함수 불러오기
from sklearn.model_selection import train_test_split

# Feature와 Target을 train, test 데이터 셋으로 나누기
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)
  1. 로지스틱 회귀 (Logistic Regression)
    : 다수의 feature를 기반으로 이진분류하는 모델, 👉Logistic Regression글 참고
# 라이브러리 호출
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# 모델 선언
model = LogisticRegression()

# 이진분류를 위한 테스트데이터 세팅
x_train_b = x_train.loc[y_train[y_train !='class_2'].index]
y_train_b = y_train[y_train !='class_2']
x_test_b = x_test.loc[y_test[y_test !='class_2'].index]
y_test_b = y_test[y_test !='class_2']

# 학습하기
model.fit(x_train_b, y_train_b)

# 예측하기
y_pred = model.predict(x_test_b)

# 평가하기
print("accuracy : ", accuracy_score(y_test_b, y_pred))
# accuracy_score : 정확도를 계산하기 위한 sklearn패키지 함수 (예측결과가 동일한 데이터 건수 / 전체 예측 데이터 건수)

# 예측값 실제값 비교
# 실제값과 예측값을 비교하기 위해 데이터프레임으로 만들기
df_result = pd.DataFrame()
df_result['y_test_b'] = y_test_b
df_result['y_pred'] = y_pred
df_result.reset_index(drop=True, inplace=True)
  1. 소프트맥스 회귀 (Softmax Regression)
    : 다수의 feature를 기반으로 다중분류하는 모델, 👉Softmax Regression글 참고
# 라이브러리 호출
from sklearn.linear_model import LogisticRegression

# 모델 선언
model_softmax = LogisticRegression(multi_class='multinomial')

# 학습하기
model_softmax.fit(x_train, y_train)

# 예측하기
y_pred = model_softmax.predict(x_test)

# 평가하기
print("accuracy : ", accuracy_score(y_test, y_pred))

# 예측값 실제값 비교
# 실제값과 예측값을 비교하기 위해 데이터프레임으로 만들기
df_result = pd.DataFrame()
df_result['y_test'] = y_test
df_result['y_pred'] = y_pred
df_result.reset_index(drop=True, inplace=True)



지도학습 - 의사결정나무

: 의사결정나무 모델은 아래 그림처럼 여러 target으로 분기하는 과정에서 여러 기준에 의거하여 데이터를 분류하거나 결과값을 예측하는 모델
학습을 통해 기준 수치를 조정하여 모델을 일반화하게 되고, 회귀/분류 모델로서 활용 가능합니다.
아래 그림은 outlook, humidity, windy 기준으로 경기가 열릴지 안열리지를 판단하는 모델을 도식화한 그림입니다.

  • 샘플데이터 : wine.csv, boston.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df = pd.read_csv("./data/wine.csv"
  1. 데이터 분리
# Feature는 x, Target은 y 로 저장합니다.
x = df.drop(columns=['class'])
y = df['class']

# sklearn 패키키 증 train_test_split 함수 불러오기
from sklearn.model_selection import train_test_split

# Feature와 Target을 train, test 데이터 셋으로 나누기
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)
  1. 의사결정나무 - 분류
# 라이브러리 호출
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score

# 모델 선언
model = DecisionTreeClassifier()

# 학습하기
model.fit(x_train, y_train)

# 예측하기
y_pred = model.predict(x_test)

# 평가하기
print("accuracy : ", accuracy_score(y_test, y_pred))

# 의사결정나무(Decision Tree) 구조 보기
# 시각화 모듈 불러오기
from sklearn.tree import export_graphviz

# 이미지 파일 만들기
export_graphviz(model,                        # 모델이름
                out_file='tree_wine.dot',
                feature_names=x.columns,      # Feature 이름
                class_names=y.unique(),       # Target Class 이름
                rounded=True, 
                precision = 3, 
                filled = True)
!dot -Tpng tree_wine.dot -o tree_wine.png -Gdpi=300

# 이미지 파일 로딩
from IPython.display import Image
Image(filename='tree_wine.png', width=600)        # 사이즈 조정

# 예측값 실제값 비교
# 실제값과 예측값을 비교하기 위해 데이터프레임으로 만들기
df_result = pd.DataFrame()
df_result['y_test'] = y_test
df_result['y_pred'] = y_pred
df_result.reset_index(drop=True, inplace=True)
  1. 의사결정나무 - 회귀
# 데이터 불러오기
df = pd.read_csv("./data/boston.csv")

# 데이터 분리
# Feature는 x, Target은 y 로 저장합니다.
x = df.drop(columns=['MEDV'])
y = df['MEDV']

# sklearn 패키키 증 train_test_split 함수 불러오기
from sklearn.model_selection import train_test_split

# Feature와 Target을 train, test 데이터 셋으로 나누기
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)



# 라이브러리 호출
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_squared_error

# 모델 선언
model = DecisionTreeRegressor()

# 학습하기
model.fit(x_train, y_train)

# 예측하기
y_pred = model.predict(x_test)

# 평가하기
print("MSE : ", mean_squared_error(y_test, y_pred))

# 의사결정나무(Decision Tree) 구조 보기
# 시각화 모듈 불러오기
from sklearn.tree import export_graphviz

# 이미지 파일 만들기
export_graphviz(model,                        # 모델이름
                max_depth=5,
                out_file='tree_boston.dot',
                feature_names=x.columns,      # Feature 이름
                rounded=True, 
                precision = 3, 
                filled = True)
!dot -Tpng tree_boston.dot -o tree_boston.png -Gdpi=300

# 이미지 파일 로딩
from IPython.display import Image
Image(filename='tree_boston.png', width=600)        # 사이즈 조정

# 예측값 실제값 비교
# 실제값과 예측값을 비교하기 위해 데이터프레임으로 만들기
df_result = pd.DataFrame()
df_result['y_test'] = y_test
df_result['y_pred'] = y_pred
df_result.reset_index(drop=True, inplace=True)

# 시각화
plt.plot(y_test.values[:50], label='Actual')
plt.plot(y_pred[:50], label='Predicted')
plt.legend()
plt.show()



예측과 성능 평가

  • 샘플데이터 : boston.csv, wine.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv("./data/boston.csv")
  1. 데이터 분리
# Feature는 x, Target은 y 로 저장합니다.
x = df.drop(columns=['MEDV'])
y = df['MEDV']

# sklearn 패키키 증 train_test_split 함수 불러오기
from sklearn.model_selection import train_test_split

# Feature와 Target을 train, test 데이터 셋으로 나누기
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)
  1. 회귀모델 성능 평가
# 라이브러리 호출
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# 모델 선언
model_lr = LinearRegression()
model_dt = DecisionTreeRegressor()

# 모델 학습
model_lr.fit(x_train, y_train)
model_dt.fit(x_train, y_train)

# 예측하기
y_pred_lr = model_lr.predict(x_test)
y_pred_dt = model_dt.predict(x_test)

# 평가하기
print("LinearRegression 모델")
print("MSE : ", mean_squared_error(y_test, y_pred_lr))
print("RMSE :", mean_squared_error(y_test, y_pred_lr)**(1/2))
print("MAE : ", mean_absolute_error(y_test, y_pred_lr))
print("R2 Score : ", r2_score(y_test, y_pred_lr))
print("")
print("DecisionTreeRegressor 모델")
print("MSE : ", mean_squared_error(y_test, y_pred_dt))
print("RMSE :", mean_squared_error(y_test, y_pred_dt)**(1/2))
print("MAE : ", mean_absolute_error(y_test, y_pred_dt))
print("R2 Score : ", r2_score(y_test, y_pred_dt))

# R2_Score (결정계수) = SSR/SST = 1 - SSE/SST, 해당 수치가 클 수록 회귀 모델의 성능이 높음
# SST (Total Sum of Squares) : 실제값에서 실제값의 평균을 뺀 차이의 제곱을 모두 합한 수치
# SSR (Sum of Squares due to Regression) : 예측값에서 실제값의 평균을 뺀 차이의 제곱을 모두 합한 수치
# SSE (Sum of Squares Residual of Error) : 실제값에서 예측값을 뺀 차이의 제곱을 모두 합한 수치
  • SST/SSR/SSE
  1. 분류모델 성능 평가
# 데이터 불러오기
df = pd.read_csv("./data/wine.csv")

# 데이터 분리
# Feature는 x, Target은 y 로 저장합니다.
x = df.drop(columns=['class'])
y = df['class']

# sklearn 패키키 증 train_test_split 함수 불러오기
from sklearn.model_selection import train_test_split

# Feature와 Target을 train, test 데이터 셋으로 나누기
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=42)

# 라이브러리 호출
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import confusion_matrix, precision_score, recall_score, accuracy_score, f1_score, classification_report

# 모델 선언
model_lr = LogisticRegression()
model_dt = DecisionTreeClassifier()

# 모델 학습
model_lr.fit(x_train, y_train)
model_dt.fit(x_train, y_train)

# 예측하기
y_pred_lr = model_lr.predict(x_test)
y_pred_dt = model_dt.predict(x_test)

# 평가하기
print("LogisticRegression 모델")
print("Precision : ", precision_score(y_test, y_pred_lr, average='macro'))
print("Recall :", recall_score(y_test, y_pred_lr, average='macro'))
print("Accuracy : ", accuracy_score(y_test, y_pred_lr))
print("F1 Score : ", f1_score(y_test, y_pred_lr, average='macro'))
print("")
print("DecisionTreeClassifier 모델")
print("Precision : ", precision_score(y_test, y_pred_dt, average='macro'))
print("Recall :", recall_score(y_test, y_pred_dt, average='macro'))
print("Accuracy : ", accuracy_score(y_test, y_pred_dt))
print("F1 Score : ", f1_score(y_test, y_pred_dt, average='macro'))

# precision_score : 정밀도, 모델이 1이라고 예측한 것중에 실제로 1인 것의 정도
# accuracy_score : 정확도, 모델이 예측한 값과 실제 값이 일치하는 정도
# recall_score : 재현율, 실제로 1인 것중에 모델이 1이라고 예측한 비율
# f1_score : 정밀도와 회수율을 고르게 활용하기 위한 조화평균, 1에 가까울수록 성능이 좋음
  • Precision/Recall



Cross Validation (교차 검증)

: 데이터를 train set과 test set으로 나누는 방식에서 train set을 train set + validation set으로 추가로 나누어 검증하는 방식
데이터를 train set과 test set으로 나누고, test set을 활용하여 모델의 성능을 검증 및 개선 과정을 반복하면 모델이 test set 과적합(overfitting)될 우려가 있음. 이를 방지하기 위해 교차 검증으로 모델의 성능 검증 및 개선을 진행함.

교차검증

  • 샘플데이터 : boston.csv, wine.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv("./data/boston.csv")

# Feature는 x, Target은 y 로 저장합니다.
x = df.drop(columns=['MEDV'])
y = df['MEDV']
  1. 회귀모델 교차검증
# 라이브러리 호출
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import cross_val_score

# 모델 선언하기
model_lr = LinearRegression()
model_dt = DecisionTreeRegressor()

# 학습, 예측, 평가하기
lr_cv_score = cross_val_score(model_lr, x, y, cv=10, scoring='neg_mean_squared_error')    
dt_cv_score = cross_val_score(model_dt, x, y, cv=10, scoring='neg_mean_squared_error')    # cv는 위 사진에서 Fold의 갯수를 가리킴

# 결과 확인
# 확인
print("LinearRegression 모델")
print(lr_cv_score)
print("MSE : ", -lr_cv_score.mean())
print("DecisionTreeRegressor 모델")
print(dt_cv_score)
print("MSE : ", -dt_cv_score.mean())
  1. 분류모델 교차검증
# 데이터 불러오기
df = pd.read_csv("./data/wine.csv")

# Feature는 x, Target은 y 로 저장합니다.
x = df.drop(columns=['class'])
y = df['class']

# 라이브러리 호출
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import cross_val_score

# 모델 선언하기
model_lr = LogisticRegression()
model_dt = DecisionTreeClassifier()

# 학습, 예측, 평가하기
lr_cv_score = cross_val_score(model_lr, x, y, cv=5, scoring='f1_macro')
dt_cv_score = cross_val_score(model_dt, x, y, cv=5, scoring='f1_macro')

# 결과 확인
# 확인
print("LogisticRegression 모델")
print(lr_cv_score)
print("F1 Score : ", lr_cv_score.mean())
print("DecisionTreeClassifier 모델")
print(dt_cv_score)
print("F1 Score : ", dt_cv_score.mean())
반응형

'개발 > AI' 카테고리의 다른 글

[LLM] LLM 발전기  (0) 2025.01.05
[AI] pandas를 활용한 데이터 핸들링 및 전처리  (2) 2024.04.10
KT AI 해커톤 회고 (2)  (1) 2023.11.02
KT AI 해커톤 회고 (1)  (0) 2023.10.27
[AI] Softmax Regression  (0) 2023.10.01
728x90
반응형

👉이전글에서 소개된 것처럼 AI 모델은 많은 데이터를 기반으로 생성됩니다.
그렇기때문에 양질의 데이터는 높은 퀄리티의 AI 모델을 생성하기 위해 반드시 필요한 요소죠.

본 글에서는 일반적인 데이터를 AI 모델에 활용할 양질의 데이터로 가공하기 위한 방법들을 다뤄보겠습니다.

사용 환경은 아래와 같습니다.

  • IDE : Jupyter Notebook
  • Language : python
  • library : pandas

데이터 탐색

  • 샘플데이터 : ratings.csv
  1. 데이터 확인
# 라이브러리 불러오기
import pandas as pd

# 샘플데이터 가져오기
df = pd.read_csv("./data/ratings.csv")

# 앞에서부터 데이터확인
df.head() # 앞 10개 (default)
df.head(5) # 앞 5개

# 뒤에서부터 데이터확인
df.tail() # 뒤 10개 (default)
df.tail(5) # 뒤 5개
  1. 데이터프레임 정보 확인
    : 데이터 컬럼정보 확인
df.info()

## 결과
# <class 'pandas.core.frame.DataFrame'>
# RangeIndex: 100836 entries, 0 to 100835
# Data columns (total 4 columns):
#  #   Column     Non-Null Count   Dtype  
# ---  ------     --------------   -----  
#  0   userId     100836 non-null  int64  
#  1   movieId    100836 non-null  int64  
#  2   rating     100836 non-null  float64
#  3   timestamp  100836 non-null  object 
# dtypes: float64(1), int64(2), object(1)
# memory usage: 3.1+ MB
  1. 통계적 특성 확인
    : 데이터의 row수와 평균, 최소/최대값 등을 확인
df.describe()

#     userId    movieId    rating
# count    100836.000000    100836.000000    100836.000000
# mean    326.127564    19435.295718    3.501557
# std    182.618491    35530.987199    1.042529
# min    1.000000    1.000000    0.500000
# 25%    177.000000    1199.000000    3.000000
# 50%    325.000000    2991.000000    3.500000
# 75%    477.000000    8122.000000    4.000000
# max    610.000000    193609.000000    5.000000



데이터 결합과 정렬

  • 샘플데이터 : movies.csv, links.csv, ratings.csv, users.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df_movies = pd.read_csv("./data/movies.csv")
df_links = pd.read_csv("./data/links.csv")
df_ratings = pd.read_csv("./data/ratings.csv")
df_users = pd.read_csv("./data/users.csv")
  1. 데이터 붙이기 : concat()
# 단순 위/아래로 row를 붙임
df_concat_1 = pd.concat([df_movies, df_ratings])

# 옆으로 row를 붙임
df_concat_2 = pd.concat([df_movies, df_ratings], axis=1)

# 합치는 두 데이터프레임에 모두 존재하는 row의 인덱스만 가져옴
df_concat_3 = pd.concat([df_movies, df_ratings], join='inner')
  1. 데이터 병합하기 : merge()
# on='key값'이 없을 경우, default 설정대로 같은 이름을 가진 열이 자동으로 key값으로 지정됨
pd.merge(df_movies, df_ratings)

# movieId 기준 병합
pd.merge(df_movies, df_ratings, on='movieId')

# join 방법 설정
pd.merge(df_movies, df_ratings, how='outer' on='movieId') # outer join
pd.merge(df_movies, df_ratings, how='left' on='movieId') # left outer join
pd.merge(df_movies, df_ratings, how='right' on='movieId') # right outer join
  1. 정렬하기 : sort_values()
# movieId기준 오름차순 정렬
df.sort_values(by='movieId')

# movieId기준 내림차순 정렬
df.sort_values(by='movieId', ascending=False)

# movieId기준 오름차순 정렬 후 바로 적용
df.sort_values(by='movieId', inplace=True)

# 정렬로 인해 뒤섞인 인덱스를 재배열
df.reset_index()
df.reset_index(drop=True, inplace=True) # 기존 인덱스 삭제 후, 재배열된 인덱스 적용



데이터 필터링과 편집

  • 샘플데이터 : preprocessing_03.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df = pd.read_csv("./data/preprocessing_03.csv")
  1. 데이터 필터링
# 성별의 구성을 확인합니다. 
df['gender'].value_counts()

# 성별이 남자인지를 확인해 봅니다.
df['gender']=='M'

# 전체 데이터프레임에서 성별이 남자인 사람으로 필터링한 정보를 가져옵니다.
df[df['gender']=='M']

# 전체 데이터프레임에서 'userId'가 5인 고객이 본 영화 목록을 확인합니다.
df[df['userId']==5]
len(df[df['userId']==5])

# 전체 데이터프레임에서 'userId'가 5인 고객이 평점 2점 이하를 준 영화 목록을 확인합니다.
df[(df['userId']==5) & (df['rating']<=2) ]

# 필터링 결과 중 'title'만 출력
df[(df['userId']==5) & (df['rating']<=2)]['title']
  1. 컬럼명 변경
# 기존 컬럼명 확인하기
df.columns

# 새로운 컬럼명 List를 작성하여 대체하기
df.columns = ['movieId', 'imdbId', 'tmdbId', '영화제목', 'genres', 'userId', 'rating',
       'timestamp', 'gender', 'age', 'occupation', 'zipcode']

# 기존 컬럼명 선택하여 변경 ( df.rename(columns={'기존 컬럼명' : '새 컬럼명'}) )
df = df.rename(columns={'zipcode':'우편번호'})
  1. 컬럼 생성
# 컬럼 생성 후 기본값 세팅 ( df['새로운 컬럼명'] = 기본값 )
df['new'] = 0

# df['영화제목']의 정보를 문자열 처리한 후 괄호'(',')'를 제외한 4자리의 숫자만 가져온 후에 새로운 컬럼 'year'에 세팅
df['year'] = df['영화제목'].str[-5:-1]

# 데이터 분리 후 새로운 컬럼 생성
df['genres'].str.split('|')    # 배열 형태
df['genres'].str.split('|', expand=True).head() # 'expand=True' 옵션을 통해 새로운 데이터프레임으로 생성
  1. 컬럼 삭제
# new 컬럼 삭제
df.drop(columns=['new'])
df.drop(columns=['new'], inplace=True) # 변경내용 적용

# 필요한 컬럼만 선택하여 다시 저장
new_columns = ['movieId', 'imdbId', 'tmdbId', '영화제목', 'year', 'genres', 'userId', 'rating', 'timestamp', 'gender', 'age', 'occupation', '우편번호']
df = df[new_columns]



데이터 결측치 처리

  • 샘플데이터 : preprocessing_04.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df = pd.read_csv("./data/preprocessing_04.csv")
  1. 결측치 확인
# 결측치 행 확인 ( 결측치가 있을 경우 True를 반환 )
df.isnull()

# 각 열별 결측치 갯수 확인
df.isnull().sum()
  1. 결측치 제거
# 결측치가 있는 행 모두 삭제
df_temp = df.dropna()

# 결측치가 있는 열 모두 삭제
df_temp2 = df.dropna(axis=1)

# 행 전체가 결측치인 행만 모두 삭제
df_temp3 = df.dropna(how ='all')

# 행의 결측치가 n초과인 행만 모두 삭제
df_temp4 = df.dropna(thresh=n)

# 특정 열 중 결측치가 있는 경우의 행만 모두 삭제
df_temp5 = df.dropna(subset=['MovieId'])
df.dropna(subset=['MovieId'], inplace=True) # 데이터프레임에 바로 적용
  1. 결측치 채우기
# 결측치를 단일 값(0)으로 대체
df.fillna(0)

# 특정 열의 결측치만 0으로 대체하기
df_na_sample2['UserId'].fillna(0, inplace=True) # 데이터프레임에 바로 적용

# 특정 열의 결측치만 평균값으로 대체하기
df['Age'].fillna(df_na_sample2['Age'].mean(), inplace=True)

# 특정 열의 결측치만 최빈값으로 대체하기
df['Gender'].fillna(df_na_sample2['Gender'].mode()[0], inplace=True)

# 결측치 이전값으로 채우기
df.fillna(method='ffill')

# 결측치 이후값으로 채우기
df.fillna(method='bfill')



데이터 타입 변환/중복데이터 제거

  • 샘플데이터 : preprocessing_05.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df = pd.read_csv("./data/preprocessing_05.csv")
  1. 데이터 타입 확인
# 모든 컬럼의 데이터타입을 확인
df.dtypes

# 컬럼별 데이터타입 확인
df['MovieId'].dtype
  1. 데이터 타입 변경
# 모든 Columns의 데이터 타입을 'object' 형으로 변경
df_temp = df.astype('object')

# 특정 컬럼의 데이터 타입을 변경
df = df.astype({'Year':'int'})
  1. 중복데이터 처리
# 중복 데이터 확인 ( 첫 번째 값은 False, 두 번째 값부터는 True 반환 )
df.duplicated()

# 데이터프레임 df의 'Title'과 'UserId' 컬럼이 중복이 되는 데이터를 확인
df[df.duplicated(['Title','UserId'])]

# 예제 : 영화 제목이 같은 영화를 두번 본 고객이 존재한다고?!
df[(df['Title']=='Iron Man') & (df['UserId']==550)]
df[df.duplicated(['Title','UserId'])]

# 중복데이터 제거
df.drop_duplicates(['Title','UserId', 'Year'], ignore_index=True)    # 기존 인덱스를 무시하고 새로 인덱스를 부여



데이터 이상치 처리

  • 샘플데이터 : preprocessing_06.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df = pd.read_csv("./data/preprocessing_06.csv")
  1. 이상치 탐지
# 컬럼별 통계적 특성 확인
df.describe()

# 시각화를 통한 이상치 탐지
# 라이브러리 불러오기 
import matplotlib.pyplot as plt

# 차트 영역 설정하기 ( box plot 차트 )
plt.figure()
plt.boxplot(df['Age'], labels=['Age'])
plt.show()

  1. 이상치 확인
# IQR의 1.5배 이상 크거나 작은 데이터를 이상치로 판별
# IQR 계산
# 중앙값 계산
Q2 = df['Age'].median()

# 하위 25%, 상위 25% 범위 계산
Q1 = df['Age'].quantile(0.25)
Q3 = df['Age'].quantile(0.75)
IQR = Q3 - Q1

# 이상치 판별
df['IsOutlier_Age'] = (df['Age'] < Q1 - 1.5 * IQR) | (df['Age'] > Q3 + 1.5 * IQR)
  1. 이상치 처리
# 위에서 생성한 이상치여부 컬럼(IsOutlier_Age) 활용하여 이상치 삭제
df = df[df['IsOutlier_Age'] == False]

# 이상치 대체
df.loc[df['IsOutlier_Age'], 'Age'] = df['Age'].median() # 중앙값 대체



데이터 인코딩

  • 샘플데이터 : preprocessing_07.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df = pd.read_csv("./data/preprocessing_07.csv")
  1. 레이블 인코딩
    : 범주형 데이터를 숫자로 변환하는 방법으로, 각 범주에 고유한 숫자를 할당하는 방식입니다.
    예를 들어, "남성", "여성"과 같은 범주형 데이터를 각각 0과 1로 변환하는 방법이 있습니다.
# 라이브러리 불러오기
from sklearn.preprocessing import LabelEncoder

# Label Encoder 호출하기
le = LabelEncoder()

# 'Occupation' 컬럼의 데이터를 Label 인코딩하기.
le_occupation = le.fit_transform(df['Occupation'])
df_le['Occupation_Le'] = le_occupation
df_le['Occupation_Le']
# array([ 0, 20,  1, ..., 11, 11,  7])

# label Encoding 매핑 클래스 확인
le.classes_
# array(['K-12 student', 'academic/educator', 'artist', 'clerical/admin',
#        'college/grad student', 'customer service', 'doctor/health care',
#        'executive/managerial', 'farmer', 'homemaker', 'lawyer', 'other',
#        'programmer', 'retired', 'sales/marketing', 'scientist',
#        'self-employed', 'technician/engineer', 'tradesman/craftsman',
#        'unemployed', 'writer'], dtype=object)

# Pandas의 map() 함수를 사용하여 레이블 인코딩
# 'Occupation' 컬럼의 값들을 lable mapping
label_mapping = { "other" : 0 , 
                  "academic/educator" : 1, 
                  "artist" : 2, 
                  "clerical/admin" : 3, 
                  "college/grad student" : 4, 
                  "customer service" : 5, 
                  "doctor/health care" : 6, 
                  "executive/managerial" : 7, 
                  "farmer" : 8, 
                  "homemaker" : 9, 
                  "K-12 student" : 10,
                  "lawyer" : 11, 
                  "programmer" : 12, 
                  "retired" : 13, 
                  "sales/marketing" : 14, 
                  "scientist" : 15, 
                  "self-employed" : 16, 
                  "technician/engineer" : 17, 
                  "tradesman/craftsman" : 18, 
                  "unemployed" : 19, 
                  "writer" : 20 }

# pandas replace() 메소드를 사용해서 값을 대체
df_le['Occupation_Map'] = df['Occupation'].map(label_mapping)
  1. 더미 변수 생성
    : 각 카테고리를 이진 형태로 표현하는 방법
    각각의 값을 고유한 이진 벡터로 변환하여 해당하는 값의 위치에 1을 표시하고, 나머지 위치에는 0을 표시합니다. (원-핫 인코딩)
# | 구분자 기준으로 데이터를 분할하여 생성된 list type 데이터를 다시 저장
df['Genres'] = df['Genres'].str.split('|')

# list type으로 저장된 데이터를 모두 나누어 새로운 row로 생성
df = df.explode('Genres')

# get_dummies() 함수를 사용 더미변수 생성
pd.get_dummies(df, columns=['Genres'])
pd.get_dummies(df['Genres'])
pd.get_dummies(df['Genres'], prefix='Genre_')    # 더미변수 컬럼의 prefix 설정
pd.get_dummies(df['Genres'], drop_first=True)    # 더미변수 첫번째 컬럼 삭제
df = pd.get_dummies(df, columns=['Genres'], prefix='Genre', drop_first=True)    # 최종

# 더미변수 컬럼 외의 컬럼들을 키값으로 그룹화하여 중복데이터 제거
# 'groupby()'는 특정 컬럼의 값을 기준으로 데이터프레임을 그룹화하여 그룹별로 연산을 수행할 수 있습니다.
# 'agg()'는 그룹화된 데이터프레임에 대해 다양한 집계 함수들을 적용하여 그룹 별로 연산 결과를 집계한 결과를 반환합니다.
df = df.groupby(['MovieId', 'ImdbId', 'TmdbId', 'Title', 'Year', 'UserId', 'Rating', 'Gender', 'Age', 'Occupation']).agg('sum').reset_index()



데이터 스케일링

  • 샘플데이터 : preprocessing_08.csv
  1. 데이터 불러오기
# 라이브러리 불러오기
import pandas as pd

df = pd.read_csv("./data/preprocessing_08.csv")
  1. 데이터 정규화
    : 데이터를 일정 범위로 변환하는 스케일링 방법으로, 데이터를 0과 1 사이의 값으로 변환하는 것을 의미
    정규화는 다양한 스케일을 가진 변수들을 동일한 범위로 맞춰줌으로써, 변수 간의 크기 차이를 제거하여 모델이 각 변수를 공평하게 처리할 수 있도록 돕습니다.
# 라이브러리 불러오기
from sklearn.preprocessing import MinMaxScaler

# MinMaxScaler 호출하기
min_max_scaler = MinMaxScaler()

# 'Age', 'Rating' 컬럼의 데이터를 Min Max Scaling 하기
df_normalized = min_max_scaler.fit_transform(df[['Age', 'Rating']])
  1. 데이터 표준화
    : 데이터를 평균이 0이고 표준편차가 1인 분포로 변환하는 스케일링 방법으로, 데이터를 표준정규분포에 따르는 값으로 변환하는 것을 의미
    표준화는 데이터의 분포를 중심으로 조절하여 이상치에 덜 민감하게 만들어줌으로써, 모델의 안정성을 높이고 예측 결과를 개선하는 데 도움을 줍니다.
# 라이브러리 불러오기
from sklearn.preprocessing import StandardScaler

# StandardScaler 호출하기
standard_scaler = StandardScaler()

# 'Age', 'Rating' 컬럼의 데이터를 StandardScaling 하기
df_standardized = standard_scaler.fit_transform(df[['Age', 'Rating']])



이어서

다음은 전처리된 데이터를 이용하여 AI 모델링을 진행하는 과정을 살펴보겠습니다.

반응형

'개발 > AI' 카테고리의 다른 글

[LLM] LLM 발전기  (0) 2025.01.05
[AI] pandas/sklearn을 활용한 머신러닝 모델링  (0) 2024.04.12
KT AI 해커톤 회고 (2)  (1) 2023.11.02
KT AI 해커톤 회고 (1)  (0) 2023.10.27
[AI] Softmax Regression  (0) 2023.10.01
728x90
반응형

👉이전글에서 예선에 대한 내용을 확인할 수 있다.
9월13일 본선OT 이후로 KT AI해커톤 본선이 시작됐다.

본선

본선 과제는 KT DX플랫폼(RPADU, APPDU)과 GPT, Bard와 같은 외부망의 생성형AI 모델에 proxy로 접근할 수 있게 해주는 gen.AI플랫폼을 이용하여
업무에 활용할 수 있는 서비스를 개발하는 것이였다.
즉, RPADU를 활용하여 데이터를 수집하고, APPDU를 활용하여 backend서버를 개발하고 View를 제공해야하는데, 데이터 분석이나 학습에 활용할 AI모델은 gen.AI플랫폼을 활용하라는 것이다.

그리고 본선이 진행되는 10/19(목) 이전까지 사전 개발기간이 주어진다.
본선이 진행되는 10/19~20 1박2일 동안은 실무평가, 임원평가 총 두 번의 평가가 진행되며, 해당 평가에서 좋은 성적을 거두어야 수상을 할 수 있게 된다.

사전학습

나는 과제발굴에 앞서 LLM이 뭔지 파악할 필요가 있다고 생각했다.
LLM은 Large Language Model의 약자로 사용자 질의에 따른 응답을 사람처럼 해주는 모델이다. 많이들 써봤을 ChatGPT 그 자체이다.
ChatGPT와 같은 LLM모델을 우리의 비즈니스에 맞게 응답해줄 수 있도록 할 수 있다.
예를들어, OSS가 뭐야?라고 ChatGPT에 질문을 했을 때, Open Source Software라는 대답을 준다.
여기서 ChatGPT를 우리 비즈니스(OSS시스템)에 맞게 학습을 시키면, Operating Support System이라는 답변을 줄 수 있도록 할 수 있다.

ChatGPT와 같은 LLM모델을 학습시키는 방법은 대표적으로 fine-tuning(파인튜닝)기법이 있는데,
LLM모델에 추가 데이터를 학습시켜 우리 비즈니스에 특화된 모델을 만드는 것이다.
하지만 이 방법은 들어가는 비용대비 얻을 수 있는 효과는 미미하다. 그래서 대규모의 플랫폼을 만드는 것이 아니라면 추천하지 않는 방법이다.

대안으로 Embedding(임베딩)기술을 응용한 Retrieval-Augmented Generation(RAG)방식이 있다.
RAG방식은 먼저 임베딩 과정을 거쳐 벡터로 만들어진 우리 비즈니스 데이터를 별도 저장소에 저장한다.
그리고 사용자가 질의를 하면 해당 질의와 유사한 비즈니스 데이터를 별도 저장소에서 가져오고,
가져온 비즈니스 데이터를 사용자 질의와 합쳐서 프롬프트 형태로 구성한 후에 LLM모델에 질의를 날린다. 이 방식이 RAG방식이다.

이 때, 활용되는 Embedding(임베딩)은 자연어를 모델이 이해할 수 있는 벡터형태로 변환하는 기술인데,
RAG방식에서 사용자 질의와 유사한 데이터를 찾을 때에도 벡터 간의 유사도를 계산하기 때문에 임베딩 기술이 유용하게 쓰인다.

Embedding(임베딩)에 관한 자세한 내용은 👉여기에서 확인할 수 있다.

과제선정

과제선정부터 어려웠다.
과제 선정을 위해 팀내 Confluence플랫폼을 활용하여 아이디어를 수렴했다.
팀장님께서 본선부터는 팀내 다른 사원들도 같이 간접참여할 수 있는 형태면 좋을 것 같다고 말씀하셔서,
해커톤 팀원들 뿐만 아니라 다른 사원들의 아이디어도 같이 수렴했다.

여러 아이디어가 많이 나왔지만...
우리가 잘할 수 있고, 정책이나 데이터 활용상 제약이 없는 OSS Assistant 아이디어로 선정했다.

OSS Assistant는 현장작업자의 VOC를 줄이고 빠른 정보검색 및 신규인력의 빠른 적응을 위한 OSS도메인 특화 챗봇이다.
(역시 LLM모델엔 챗봇이 가장 만만하다..)

대표적인 두 가지 기능으로 여러데이터를 기반으로 응답을 주는 RAG기반 챗봇기능과
사용자 질의에 따라 OSS의 오더라는 것을 작업자가 직접 처리할 수 있게 하는 기능을 구현하기로 했다.
그리고 나는 RAG기반 챗봇기능 구현을 담당하였다.

구현과정

초기설계

내가 담당한 RAG기반 챗봇기능을 구현하기 위해 설계를 진행했다.
초기설계는 챗봇의 서버로 활용될 APPDU서버가 다음 작업을 모두 수행하는 것으로 설계했다.

  1. KMS(사내 Jira Confluence플랫폼) 데이터 로딩
    : KMS라는 사내 Jira Confluence플랫폼에는 업무에 필요한 SOP나 가이드 자료 등을 업로드하여 공유할 수 있는 플랫폼이다. 해당 플랫폼의 데이터를 챗봇에 활용하기 위해 KMS서버에 데이터를 요청해서 받아야한다.
  2. RPADU(사내 RPA tool)의 파싱데이터 수신
    : AI해커톤 본선의 평가요소 중 RPADU라는 사내 RPA tool 활용 항목이 있다. 이를 충족시키기 위해 우리 시스템의 고객들에게 매월 공유하고 있는 ppt형태의 사용자매뉴얼 데이터를 RPADU를 활용하여 파싱할 수 있도록 RPADU개발을 진행했다.
    해당 툴은 코딩이 아닌 GUI로 개발할 수 있도록 만들어진 프로그램이고, C#기반의 프로그램이다.
  3. 데이터전처리 및 임베딩 수행
    : 챗봇에 활용될 데이터의 품질을 높이기 위해서는 적절한 전처리 작업이 필요하다. 그리고 전처리가 완료된 데이터들을 임베딩하여 LLM모델이 이해할 수 있는 벡터형태로 변환해야 한다.
  4. Vector DB 임베딩 벡터 저장
    : Vector Database는 임베딩된 벡터를 저장하고 조회하는데 특화된 데이터베이스이다. 벡터에 메타데이터 형태로 관련 정보를 매핑하여 저장할 수 있고, 사용자 질의와 유사한 정보를 보다 쉽게 조회할 수 있다. 임베딩된 벡터를 해당 Vector DB에 저장하는 작업이 필요하다.

환경세팅

개발을 위한 개발환경을 세팅하고 APPDU서버환경을 세팅하는 작업을 진행했다. APPDU서버환경은 서버간 방화벽 작업을 진행해야 했다.
그 중, KMS서버-APPDU서버간 방화벽 작업은 각 서버 담당자 문의 결과, 지원하지 않는 연동으로 방화벽해제가 어렵다는 회신을 받았다.
KMS데이터를 활용하지 않는 방향까지 고려를 했지만, 해당 애로사항을 해커톤 팀내 공유하고 회의를 진행한 결과 KMS데이터를 활용할 새로운 아이디어가 고안될 수 있었다.

2차 설계

초기설계에서 KMS데이터는 내 로컬PC에서 로드하여 APPDU서버로 전달하는 방향으로 2차 설계를 확정지었다.
KMS담당자로서 KMS데이터 학습을 요청받거나 필요할 때, 해당 데이터들을 검수하고 학습에 적절한 데이터라고 판단되면
사전에 개발한 KMS데이터 로딩 프로그램을 실행시켜 APPDU서버에 KMS데이터를 전달할 것이다.

개발

KMS Loader

학습이 진행될 KMS데이터는 oss2chatbot라벨이 붙은 Confluence 페이지이다. 해당 라벨은 설정하기 나름이다.
그리고 KMS데이터는 LangchainConfluenceLoader라이브러리를 활용하여 가져온다.

from langchain.document_loaders import ConfluenceLoader
from flask import current_app, jsonify
import requests

class KmsService:
    def __init__(self, config):
        self.config = config

    def load_kms_sop(self):
        # confluence loader 객체 생성
        loader = ConfluenceLoader(
            url=current_app.config['KMS_URL'],
            username=current_app.config['KMS_USERNAME'],
            api_key=current_app.config['KMS_PASSWORD'],
        )
        # kms데이터 로드 (라벨 설정)
        kms_data = loader.load(label="oss2chatbot")
        kms_documents = []

        # 제목 + 내용
        for data in kms_data:
            temp = data.metadata['title'] + " : " + data.page_content
            kms_documents.append(temp)

         request_data = {'data': kms_documents}
         response = requests.post(APPDU_SERVER_URL, json=request_data)

         return response

위 코드는 특정 라벨이 붙은 Confluence 페이지를 가져와서 각 페이지의 데이터마다 제목과 내용을 병합해주고 APPDU서버로 데이터를 전송하는 코드이다.

Manual Parsing RPA

Manual PPT 데이터를 파싱하는 RPA tool을 활용하면 위 사진에서 보이는 것처럼 코드가 아닌 GUI형태로 프로그램을 개발할 수 있다.
프로그램 소스 전체가 사진에 표현되어 있진 않지만, flow를 간략히 설명하면,

  1. 파싱할 PPT파일이 있는 디렉토리에서 파일 list를 읽는다.
  2. 각 파일 list를 for loop 모듈을 활용하여 읽는다.
  3. 각 파일에서 파싱된 데이터들을 활용하여 json 형태로 문자열을 만들어준다. (APPDU서버로 HTTP POST요청을 통해 body에 json형태로 실어보내주기 위함)
  4. 만들어진 json형태의 문자열에서 Encoding 오류나는 문자를 찾아 수정해준다. (if 모듈 하드코딩)
  5. json형태로 만들어진 문자열을 body에 실어 APPDU서버로 HTTP POST 요청을 보낸다.

위 flow를 모두 거치면, 파싱 대상 PPT파일의 데이터가 APPDU서버에 전송된다.

Embedding

import os, re
import openai
from langchain.text_splitter import RecursiveCharacterTextSplitter
from flask import current_app, jsonify

def preprocessing_text(document, sep_token = " \n "):
    preprocessing_document = []
    for doc in document:
        doc.page_content = doc.page_content.lower() # 소문자 통일
        doc.page_content = doc.page_content.replace("\n", " ")    # 개행 -> 공백문자
        doc.page_content = doc.page_content.replace("\t", " ")    # White space 통일
        doc.page_content = re.sub('[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z]a-z]{2,}', 'email_address', doc.page_content) # email 주소 치환
        doc.page_content = re.sub('[A-Za-z0-9가-힣\s]', ' ', doc.page_content) # 특수문자 제거
        doc.page_content = re.sub(r"\s+", " ", doc.page_content)
        preprocessing_document.append(doc)

    return preprocessing_document

def split_chunk(document, size = 1000, overlap=200):
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
    documents = text_splitter.split_documents(document)
    return documents

def get_embedding(text, engine="text-embedding-ada-002"):
    openai.api_type = current_app.config('AZURE_OPENAI_API_TYPE']
    openai.api_base = current_app.config('AZURE_OPENAI_API_BASE']
    openai.api_version = current_app.config('AZURE_OPENAI_API_VERSION']
    openai.api_key = current_app.config('AZURE_OPENAI_API_KEY']
    text = text.replace("\n", " ")

    return openai.Embedding.create(input=[text], engine=engine)["data"][0]["embedding"]

def documents_embedding(input_documents, file_name):
    current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    ids = []
    doc_metas = []
    documents = []
    embedding_docs = []

    for idx, document in enumerate(input_documents):
        embedding_doc = get_embedding(document.page_content)
        meta = {
            "source": file_name,
            "updated_time": current_time
        }
        ids.append(idx)
        doc_metas.append(meta)
        documents.append(document.page_content)
        embedding_docs.append(embedding_doc)

    embeded_document = {
        'ids': ids,
        'doc_metas': doc_metas,
        'documents': documents,
        'embedding_docs': embedding_docs
    }
    return embeded_document

전처리 코드는 데이터에 따라 상이하여 공통부분만 작성하였다.
Langchain.ConfluenceLoader로 데이터를 가져오든, RPA로 데이터를 가져오든 위 공통함수들을 모두 거치게 된다.

  1. 받은 데이터는 먼저 preprocessing_text함수에서 전처리가 진행된다. 챗봇의 정확도를 위한 작업이고, 간단하게 소문자 통일, 특수문자 및 White space 제거 작업이 진행된다.
  2. 전처리된 데이터들은 1000자씩 앞뒤로 200자가 중첩되게 하나의 chunk로서 분리된다. 이또한 챗봇의 정확도를 향상시키기 위함이다.
  3. 각각의 chunk로 분리된 데이터들은 openai.Embedding api를 활용하여 임베딩이 진행된다. 임베딩 작업을 통해 벡터로 변환된 데이터들은 메타 데이터로 원본 데이터와 id값, file name 등을 매핑하여 ChromaDB라는 Vector DB에 저장된다.

하지만 Vector DB로 ChromaDB를 활용하는데 어려움이 있었다.
사용중인 Python 3.8 환경에서 버전문제가 발생하였다. Sqlite3 버전이 낮아서 문제였고, 버전을 올려야 했다.
버전 올리는 것 또한 local의 window환경과 실서버의 linux환경에서의 방법이 상이했다.
window 환경에서는 Sqlite3의 dll파일만 교체해주면 되었지만, linux환경에서는 pysqlite3-binary를 설치해야 했다.

그래서 사외망 환경에서 whl파일을 사내망으로 가져와서 프로젝트 root경로에 놓고, Dockerfile에 pip install 명령어를 추가하여 가져온 whl파일을 설치하도록 했다. 그랬더니 비로소 서버환경에서의 ChromaDB가 설치되었다.
그 후, pysqlite3-binary 라이브러리를 사내 라이브러리 저장소인 nexus라는 저장소에 저장하도록 요청하여 별도 Dockerfile에 명령어를 입력하지 않아도 되도록 했다.

프롬프트 엔지니어링

from flask import current_app, jsonify
from .file_manager import get_dict_for_collection, get_embedding
from views.order.order_analysis import *
import openai, copy

# 프롬프트 템플릿 생성
def make_template(documents, filename):
    result_documents = ""
    for doc in documents:
        result_documents = result_documents + " " + doc
    template = """You are the best assistant that describes many knowledges in Korean.
    당신은 문의에 대한 방법을 제공하는 것이 목표입니다.
    인사를 할 경우에 '안녕하세요. 무엇을 도와드릴까요?라고 답변해주세요.
    그리고 답변 마지막에 '자세한 내용은 {filename}을 참고해주세요.'라는 메세지를 남겨주세요.
    아래 내용을 참고하여 답변해주세요. <내용>{documents}<내용끝>
    1. 위 내용 전부를 읽기 쉽도록 단계별로 재작성 해주세요.
    2. 불필요한 개행문자는 제거해주세요.
    3. 답변은 1000자 이내로 작성해주세요.""".format(documents=result_documents, filename=filename)

    return template

# Function Call용 템플릿 생성
def make_order_template():
    template = """You are the best assistant that describes many knowledges in Korean."""
    return template

# 챗봇 main 함수
def basic_rag_chat_completion(messages, rag):
    # function call을 위한 메세지 백업
    func_messages = copy.deepcopy(messages['messages'])

    # openai config
    openai.api_type = current_app.config['AZURE_OPENAI_API_TYPE']
    openai.api_base = current_app.config('AZURE_OPENAI_API_BASE']
    openai.api_version = current_app.config('AZURE_OPENAI_API_VERSION']
    openai.api_key = current_app.config('AZURE_OPENAI_API_KEY']

    # 사용자 질의 임베딩 및 Chroma DB에서 유사 documents 조회
    user_input = messages['messages'][-1]['content']
    user_embedding = get_embedding(user_input)
    chroma_response = rag.query(
        query_embeddings=user_embedding,
        n_results=3
    )

    # system message 세팅
    template_message = make_template(chroma_response['documents'][0], chroma_response['metadatas'][0][0]['source'])
    system_message = {"role": "system", "content": template_message}
    messages['messages'].insert(0, system_message)

    # 질의
    response = openai.ChatCompletion.create(
        engine="gpt-4",
        messages=messages['messages'],
        functions=get_functions(),
        temperature=0.0,        # 모델의 정확성, 0과 1사이로 조절하여 창의성을 제어
        max_tokens=1000,        # 모델이 생성할 수 있는 최대 토큰 수 (일반적으로 1개 토큰은 약 4글자)
        top_p=0.95,                # 무작위성과 독창성 조절
        frequency_penalty=0,    # 모델이 예측을 반복하는 경향을 제어. 이미 생성된 단어의 확률을 낮춤
        presence_penalty=0,        # 새로운 예측을 만들도록 유도. 이미 예측된 텍스트에 단어가 나타난 경우, 해당 단어의 확률을 낮춤
        stop=None                # 모델의 응답 정지 옵션
    )

    res_message = response["choices"][0]["message"]

    # function call 응답일 경우,
    if res_message.get("function_call"):
        res_message.update({'content': None})
        available_functions = {
            "order_analysis": order_analysis,
            "remove_ponr": remove_ponr,
            "get_order_task_list": get_order_task_list,
            "close_task": close_task
        }

        # function 실행
        function_name = res_message["function_call"]["name"]
        function_to_call = available_functions[function_name]
        function_args = json.loads(res_message["function_call"]["arguments"])
        function_response = function_to_call(
            function_args
        )

        # system message 세팅 (order template 활용)
        order_template_message = make_order_template()
        order_system_message = {"role": "system", "content": order_template_message}

        # 사전 백업해뒀던 function call용 메세지에 system 메세지 및 응답 세팅
        func_messages.append(order_system_message)
        func_messages.append(res_message)

        # function 실행 결과 추가
        func_messages.append(
            {
                "role": "function",
                "name": function_name,
                "content": function_response
            }
        )

        # 함수 실행결과를 바탕으로 챗봇 응답 생성
        res = openai.ChatCompletion.create(
            engine="gpt-4",
            messages = func_messages,
            temperature=0.0,
            max_tokens=1000,
            top_p=0.95,
            frequency_penalty=0,
            presence_penalty=0,
            stop=None
        )

        assistant_turn = res.choices[0].message
        return jsonify(assistant_turn)

    if (res_message['content'] == "안녕하세요. 무엇을 도와드릴까요?"):
        return jsonify({
            "content": "안녕하세요. 무엇을 도와드릴까요?",
            "role": "assistant"
        })

    # 질의에 맞는 document가 없을 경우 (할루시네이션 방지)
    if (chroma_response['distances'][0][0] >= 0.17):
        return jsonify({
            "content": "찾으시는 정보가 없어 답변드리기 어렵습니다.",
            "role": "assistant"
        })

    return jsonify(res_message)

위는 사용자 질의에 따른 응답을 주는 코드이다. Function Call의 핵심 부분은 내가 구현한 부분이 아니므로 뺐다.
기본적인 flow는 아래와 같다.

  1. 사용자 질의가 들어오면 basic_rag_chat_completion main 함수가 실행된다.
  2. 사용자 질의를 임베딩하고 임베딩 데이터를 기반으로 ChromaDB에서 유사한 document를 가져온다.
  3. 가져온 document를 system message에 프롬프트 형태로 세팅해준다. (system message가 뭔지는 아래 참고)
  4. openai api에 만들어진 message를 던져 질의한다.
  5. 해당 질의가 function call호출을 위한 질의라면, if문으로 들어가 맞는 function을 실행시킨다.
    1. system message로 function call용 template를 세팅해준다.
    2. function 결과를 message에 세팅한다.
    3. 만들어진 message로 한번더 질의하여 받은 응답을 반환한다.
  6. function call 질의가 아니라면, if문을 skip 한다.
  7. 질의가 인사말이라면 인사말로 응답한다.
  8. 질의에 맞는 document가 없다면, 답변할 수 없는 내용이라고 답변한다.

chroma db 조회 결과 중, distances 정보가 있는데 해당 정보는
사용자 질의와 반환된 document의 연관성이 얼마나 밀접한지를 나타내는 정보이다.
distance가 1에 가까울수록 연관성이 없는 document이고,
시행착오 끝에 0.17 수치를 기준으로 실제 적절한 문서를 가져오는지 판단되는 것 같아 위와 같이 코드를 작성하였다.

위 작업으로 할루시네이션을 어느정도 방지했지만,
인사말이 들어올 경우 ChromaDB에 관련 Document가 없어 distance가 높게 나오는 바람에 인사에 대한 응답으로 찾을 수 없는 정보가 나온다는 답변이 나오게 되었다.
이를 방지하기 위해 프롬프트에 인사말이 나올 경우, 특정 메세지의 인사말로 응답해달라는 문구를 추가하고
특정 메세지의 인사말이 응답으로 나왔을 경우, 인사로 답변할 수 있도록 하단에 코드를 작성했다.

그리고 rag chat 부분과 function call 부분을 합치는 과정에서 질의에 활용되는 message에 동일한 템플릿에 세팅되다보니
function call의 동작이 제대로 되지 않아서 rag용과 function call용 template을 구분했다.

뿐만 아니라 function call 파라미터로 난수가 입력되는 등 여러 시행착오가 있었지만,
function call 부분은 내 관할이 아니므로 pass,,,

또한, 내가 구현한 부분은 아니지만, 이전 답변에 대한 history를 프론트에서 받게끔 해서
이전 답변을 고려한 답변이 될 수 있도록 하였고,
문서 형식에 상관없이 UI를 통해 문서를 업로드하면 해당 문서의 내용을 파싱하여 임베딩, Vector DB저장까지 될 수 있도록 구현되었다.

프롬프트는 아래 조건이 녹아들어갈 수 있도록 구성했다.

  1. 질문을 시작하기 전에 대답하는 ChatGPT에 역할부여
  2. 질문하는 사용자의 구체적인 목표를 제시
  3. 사용자가 원하는 구체적인 대답방법을 제시
  4. 얻고 싶은 결과 형식을 명확하게 제시
  5. 제한된 대답의 길이를 제시
    그리고 추가적으로 위에 언급한 것처럼 인사말에 대한 응답이 인사말이 될 수 있도록하고
    대답에 참고한 문서의 정보(file name)를 전달할 수 있도록 두 가지 문구를 추가했다.

그리고 질의를 할 때, message는 role과 content라는 두 가지 속성을 가지는데,
content에는 실제 내용이 들어가고, role에는 system과 user, assistant, function 등과 같이 role에 관한 값이 들어간다.
해당 role에 따라 content 처리 방법이 달라진다. 아래는 role에 따라 주로 어떤 내용이 들어가는지 매핑한 내용이다.

  • user : 사용자 질의
  • system : 프롬프트 템플릿 + document
  • assistant : 이전 답변 내용 (history)
  • function : function call에 의한 함수 실행 결과

시연 및 발표

시연과 발표준비도 만만치 않았다... (문서작업이 가장 어려운 개발자들...)

시연영상 촬영을 위한 시나리오를 짜기전에 코드를 합쳤는데, 위에서 언급했던 여러가지 문제가 발생해서 해결하느라 시간 다 썼다.
그리고 부랴부랴 시나리오 정해서 영상촬영까지 끝낸 후 마감시각 정각에 파일들을 제출할 수 있었다.
해커톤 팀의 과장님이 발표를 담당하셨는데, PPT 제작에도 많은 힘을 쓰신 것 같았다.

그리고 한가지 과제가 또 있었다...
시연과 코드리뷰가 Jupyter 상에서 진행된다고 하여 APP형태의 코드들을 모두 Jupyter 환경으로 옮기는 데도 힘이 들었다.
환경이 다르다 보니... 라이브러리 설치가 안되는 부분도 많았고 해커톤 팀원 셋이서 정말 애썼다.
그리고 코드리뷰 시간이 다 되어가서 겨우 Jupyter 환경으로 모두 옮길 수 있었다. (과장님 발표하는 동안에도 계속 환경 옮기고 있었음;;)

과장님 발표는 성공적이었다. 평가하시는 분들과 같이 보시는 분들의 반응이 좋았다.
과장님께 직접 말씀드리기는 어렵지만... 영업사원 같았다. 우리가 구현한 시스템이 어디에 쓰이고 어떤 기대효과가 있고, 어떤 기술이 쓰였는지 어필을 잘 하셨다. 발표시간이 다 되어 뒷 부분 발표를 못하게 되었는데도, 심사자분께서 뒷 내용이 궁금하다고 하시어 뒷 내용도 마저 발표할 수 있었다.
그리고 우리가 구현한 시스템에 대한 자신감도 엿보였다. 발표를 할 때에는 내 제품에 대한 확신과 자신감이 있어야 청중의 공감을 살 수 있다는 것을 몸소 깨닫게 되었다.

그리고 코드리뷰 시간의 리뷰어 분들의 반응도 좋았다.
RPA쪽을 구현한 나는 따로 RPA코드를 설명드리고 있었고, 다행히 RPA활용 점수를 받을 수 있었다.
그리고 바로 옆에서 우리의 APP코드 리뷰를 진행하셨는데, 리뷰어 분께서 제공한 샘플보다 코드를 더 잘 짠 것 같다는 평가도 받을 수 있었다.
그리고 프롬프트 엔지니어링에 신경쓴 티도 난다고 하시는 등 호평이 이어졌다.
본선에 대한 부푼 기대감을 안고 리뷰장을 나왔다. 그리고 22시, 모든 발표가 마무리 되었다.

이제 앞서 받은 실무평가를 통해 20팀중 단 7팀만이 임원평가를 받을 수 있는 자격이 주어지고 대상, 최우수상, 우수상, 장려상 입상을 할 수 있다.
해당 결과는 다음날 발표되지만, 우리는 입상에 대한 기대감이 부푼 상태였다. 이제 임원평가를 위한 발표를 준비해야 했다.
물론 실무평가 때 활용한 발표를 그대로 활용할 수 있지만, 타겟이 임원이므로 기술적인 내용보다는 개발하게 된 동기나 기대효과가 발표의 주를 이룰 수 있도록 수정하는 것이 좋다고 판단되었다. 그리고 이전에 촬영한 시연영상은 임원평가 때 보여지기 때문에 영상의 퀄리티를 높이기 위해 시나리오를 재작성해서 재촬영했다. 그리고 우리는 새벽 2시경에 숙소에 들어갈 수 있었다.

다음 날 10시, 발표가 진행되는 대강당에 집결했다.
예상했던대로 임원평가 대상자로 우리팀이 선발되었다. 장려상까지는 확보한 것이었다.
과장님의 발표가 진행되고, 어제보다 침착해진 어조로 우리가 구현한 시스템의 기대효과를 어필하셨다. 임원평가 때의 반응도 좋아보였다.
발표장에는 우리회사 사장님이 IT부문장 자격으로 참관하고 계셨다. 실물로 처음 뵙는 분인데, 악수까지 청해주셔서 영광이었다. (사장님 fan이된 ssul푼다)
우리팀의 발표를 보신 이후에도 comment를 남겨주셨는데, 발표에 나온 내용은 본부장과도 카톡으로 얘기를 나누고 있던 내용이었다... J과장과 사원들이 자기계발에 관심이 많은 것을 알고있다... 등 호평도 많이 주셨다. 이러다가 대상까지 받는 것 아닌지 김칫국을 원샷했다.

결과

결과는 우수상이었다.
사실 우수상도 감사하지만, 최소 최우수상이라고 생각했던 우리팀은 조금 아쉬워했다. 기대를 많이해서 그런 것 같다.
그래도 우수상이라도 받을 수 있어서 좋았다.

해커톤에 참여하여 어떤 서비스를 만드는 일 자체도 재밌었는데, 상을 수상하여 더욱 보람된 9~10월이 될 수 있었다. 끝!

반응형

'개발 > AI' 카테고리의 다른 글

[AI] pandas/sklearn을 활용한 머신러닝 모델링  (0) 2024.04.12
[AI] pandas를 활용한 데이터 핸들링 및 전처리  (2) 2024.04.10
KT AI 해커톤 회고 (1)  (0) 2023.10.27
[AI] Softmax Regression  (0) 2023.10.01
[NLP] Embedding  (0) 2023.09.23

+ Recent posts