이전 글에서는 GCP와 Colab에서 세팅을 하고 FeatureView를 생성해 배포하였습니다. 이제 모델 훈련과 배포를 해보도록 하겠습니다.
3. 모델 훈련과 배포
훈련 데이터를 추출하는 코드는 다음과 같습니다.
from datetime import datetime, timedelta
from feast import FeatureStore
store = FeatureStore(repo_path=".")
now = datetime.now()
two_days_ago = datetime.now() - timedelta(days=2)
training_data = store.get_historical_features(
entity_df=f"""
select
src_account as user_id,
timestamp,
is_fraud
from
feast-oss.fraud_tutorial.transactions
where
timestamp between timestamp('{two_days_ago.isoformat()}')
and timestamp('{now.isoformat()}')""",
features=[
"user_transaction_count_7d:transaction_count_7d",
"user_account_features:credit_score",
"user_account_features:account_age_days",
"user_account_features:user_has_2fa_installed",
"user_has_fraudulent_transactions:user_has_fraudulent_transactions_7d"
],
full_feature_names=True
).to_df()
training_data.head()
모델 훈련하는 코드는 아래와 같습니다.
from sklearn.linear_model import LinearRegression
training_data.dropna(inplace=True)
X = training_data[[
"user_transaction_count_7d__transaction_count_7d",
"user_account_features__credit_score",
"user_account_features__account_age_days",
"user_account_features__user_has_2fa_installed",
"user_has_fraudulent_transactions__user_has_fraudulent_transactions_7d"
]]
y = training_data["is_fraud"]
model = LinearRegression()
model.fit(X, y)
한번 predict도 해보겠습니다.
samples = X.iloc[:2]
model.predict(samples)
모델을 저장하겠습니다. joblib을 이용합니다.
samples = X.iloc[:2]
model.predict(samples)
그 후 Bucket으로 모델을 업로드 합니다. gsutil cp를 이용합니다.
!gsutil cp ./model.joblib gs://$BUCKET_NAME/model_dir/model.joblib
이제 Bucket에서 확인해봅니다.
Google AI-Platform에 모델 생성을 합니다. 웹에서 생성하는 방법과 노트북에서 생성하는 방법이 있는데, 노트북에서 생성하는 코드는 다음과 같습니다. Debug모드를 수행하였고, 시간이 다소 걸립니다.
!gcloud ai-platform models create $AI_PLATFORM_MODEL_NAME --region global
!gcloud ai-platform versions create version_1 \
--model $AI_PLATFORM_MODEL_NAME \
--runtime-version 2.2 \
--python-version 3.7 \
--framework scikit-learn \
--origin gs://$BUCKET_NAME/model_dir \
--region global \
--verbosity "debug"
이제 모델의 Endpoint 테스트를 해봅니다. 아래 코드를 실행합니다.
googleapiclient.discovery를 이용해서 AIPlatform에 있는 모델을 활용해볼 수 있습니다.
import googleapiclient.discovery
from IPython.display import clear_output
instances = [
[6.7, 3.1, 4.7, 1.5, 0],
[4.6, 3.1, 1.5, 0.2, 0],
]
service = googleapiclient.discovery.build('ml', 'v1')
name = f'projects/{PROJECT_ID}/models/{AI_PLATFORM_MODEL_NAME}/'
response = service.projects().predict(
name=name,
body={'instances': instances}
).execute()
if 'error' in response:
raise RuntimeError(response['error'])
else:
clear_output()
print("Predictions: ", response['predictions'])
4. Online Store로 Data Load(Materialize)
마지막 materialize 실행 시점으로부터 데이터 로드합니다. 시간이 다소 걸립니다.
!feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")
5. Model Inference
앞서 생성했던 Feature들을 읽어서 predict하는 함수입니다. 이전과 같이 googleapiclient.discovery를 이용합니다.
import googleapiclient.discovery
def predict(entity_rows):
feature_vector = store.get_online_features(
features=[
"user_transaction_count_7d:transaction_count_7d",
"user_account_features:credit_score",
"user_account_features:account_age_days",
"user_account_features:user_has_2fa_installed",
"user_has_fraudulent_transactions:user_has_fraudulent_transactions_7d"
],
entity_rows=entity_rows
).to_dict()
del feature_vector["user_id"]
instances = [
[feature_values[i] for feature_values in feature_vector.values()]
for i in range(len(entity_rows))
]
service = googleapiclient.discovery.build('ml', 'v1')
name = f'projects/{PROJECT_ID}/models/{AI_PLATFORM_MODEL_NAME}'
response = service.projects().predict(
name=name,
body={'instances': instances}
).execute()
clear_output()
return response
predict()
여기까지가 Feast를 활용한 예제였습니다.
6. 작업 실행 예약하기
이제 GCP를 활용하여 이 전 작업을 예약 작업을 할 수 있습니다. Cloud Build API를 이용하여 사용설정을 합니다.
https://console.cloud.google.com/apis/library/cloudbuild.googleapis.com?project=mlops-project-334512
그 후 Colab에서 cloud_function이라는 폴더를 생성하고 configs.py를 생성하겠습니다.
!mkdir cloud_function
configs = f"""
project_id = '{PROJECT_ID}'
bigquery_dataset_name = '{BIGQUERY_DATASET_NAME}'
bucket_name = '{BUCKET_NAME}'
"""
with open('cloud_function/configs.py', "w") as configs_file:
configs_file.write(configs)
main.py도 생성합니다. Query문을 유의깊게 보시면 될 것 같습니다.
%%writefile cloud_function/main.py
from datetime import datetime, timedelta
from google.cloud import bigquery
from feast import FeatureStore, RepoConfig
from configs import project_id, bigquery_dataset_name, bucket_name
def generate_user_count_features(aggregation_end_date):
table_id = f"{project_id}.{bigquery_dataset_name}.user_count_transactions_7d"
client = bigquery.Client()
job_config = bigquery.QueryJobConfig(destination=table_id, write_disposition='WRITE_APPEND')
aggregation_start_date = datetime.now() - timedelta(days=7)
sql = f"""
SELECT
src_account AS user_id,
COUNT(*) AS transaction_count_7d,
timestamp'{aggregation_end_date.isoformat()}' AS feature_timestamp
FROM
feast-oss.fraud_tutorial.transactions
WHERE
timestamp BETWEEN TIMESTAMP('{aggregation_start_date.isoformat()}')
AND TIMESTAMP('{aggregation_end_date.isoformat()}')
GROUP BY
user_id
"""
query_job = client.query(sql, job_config=job_config)
query_job.result()
print("Generated features as of: ", aggregation_end_date.isoformat())
def materialize_features(end_date):
store = FeatureStore(
config=RepoConfig(
project="fraud_tutorial",
registry=f"gs://{bucket_name}/registry.db",
provider="gcp"
)
)
store.materialize_incremental(end_date=datetime.now())
def main(data, context):
generate_user_count_features(aggregation_end_date=datetime.now())
materialize_features(end_date=datetime.now())
위 코드를 수행하면 생성이 잘 된것을 확인할 수 있습니다.
그 후 requirements.txt 도 생성합니다. code 상에서 작업합니다.
%%writefile cloud_function/requirements.txt
google-cloud-bigquery
feast[gcp]
cloud function을 실행합니다.
!cd cloud_function && gcloud functions deploy feast-update-features \
--entry-point main \
--runtime python37 \
--trigger-resource feast-schedule \
--trigger-event google.pubsub.topic.publish \
--timeout 540s
y로 응답하고 나면 operation이 성공적으로 되었다고 알려줍니다. 시간이 꽤 오래걸립니다.
이제 결과를 확인해 보겠습니다. gcp에서 cloud functions를 검색하여 접근합니다.
https://console.cloud.google.com/functions/
이제 작업을 예약합니다. web에서 할 수 있지만 colab에서 한번 설정하겠습니다. 스케쥴은 crontab과 같은 형식입니다. aws lambda function과 똑같습니다.
!gcloud scheduler jobs create pubsub feast-daily-job \
--schedule "0 22 * * *" \
--topic feast-schedule \
--message-body "This job schedules feature materialization once a day."
클라우드 스케쥴러가서 확인합니다.
https://console.cloud.google.com/cloudscheduler
이제 이렇게 냅두면 매일 22시마다 작업이 수행이 되는 문제가 생깁니다. 따라서 모든 자원 해제를 합니다. 이렇게 하여 과금 조절을 할 수 있습니다.
!bq rm -t -f feast_fraud_tutorial.user_count_transactions_7d
!bq rm -r -f -d mlops-project
!gcloud functions delete feast-update-features
!gcloud scheduler jobs delete feast-daily-job --project $PROJECT_ID
여기까지가 Feast Feature Store에서 Provider가 GCP인 경우에 대한 예제를 다루었습니다.
이전 글에서 다루었던 실습 코드를 첨부하겠습니다.
2022.03.11 - [AI & Programming/MLOps] - [패스트캠퍼스 챌린지 47일차] GCP - Feast Feature Store (1)
2022.03.12 - [AI & Programming/MLOps] - [패스트캠퍼스 챌린지 48일차] GCP - Feast Feature Store (2)
* 본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성되었습니다.
'AI > MLOps' 카테고리의 다른 글
[패스트캠퍼스 챌린지 50일차] Future Works in MLOps (0) | 2022.03.14 |
---|---|
[패스트캠퍼스 챌린지 49일차] GCP - Feast FastAPI App 배포 (0) | 2022.03.13 |
[패스트캠퍼스 챌린지 47일차] GCP - Feast Feature Store (1) (0) | 2022.03.11 |
[패스트캠퍼스 챌린지 46일차] Amazon SageMaker Autopilot 실습 (0) | 2022.03.10 |
[패스트캠퍼스 챌린지 45일차] Amazon SageMaker 계정 생성 & Amazon SageMaker Autopilot (0) | 2022.03.09 |