호돌찌의 AI 연구소
article thumbnail

 

이전 글에서는 GCP와 Colab에서 세팅을 하고 FeatureView를 생성해 배포하였습니다. 이제 모델 훈련과 배포를 해보도록 하겠습니다. 


 

3. 모델 훈련과 배포 

 

훈련 데이터를 추출하는 코드는 다음과 같습니다.

from datetime import datetime, timedelta
from feast import FeatureStore

store = FeatureStore(repo_path=".")

now = datetime.now()
two_days_ago = datetime.now() - timedelta(days=2)

training_data = store.get_historical_features(
    entity_df=f"""
    select 
        src_account as user_id,
        timestamp,
        is_fraud
    from
        feast-oss.fraud_tutorial.transactions
    where
        timestamp between timestamp('{two_days_ago.isoformat()}') 
        and timestamp('{now.isoformat()}')""",
    features=[
        "user_transaction_count_7d:transaction_count_7d",
        "user_account_features:credit_score",
        "user_account_features:account_age_days",
        "user_account_features:user_has_2fa_installed",
        "user_has_fraudulent_transactions:user_has_fraudulent_transactions_7d"
    ],
    full_feature_names=True
).to_df()

training_data.head()

 

모델 훈련하는 코드는 아래와 같습니다.

from sklearn.linear_model import LinearRegression

training_data.dropna(inplace=True)

X = training_data[[
    "user_transaction_count_7d__transaction_count_7d", 
    "user_account_features__credit_score",
    "user_account_features__account_age_days",
    "user_account_features__user_has_2fa_installed",
    "user_has_fraudulent_transactions__user_has_fraudulent_transactions_7d"
]]
y = training_data["is_fraud"]

model = LinearRegression()
model.fit(X, y)

 

한번 predict도 해보겠습니다.

samples = X.iloc[:2]
model.predict(samples)

 

모델을 저장하겠습니다. joblib을 이용합니다.

samples = X.iloc[:2]
model.predict(samples)

 

그 후 Bucket으로 모델을 업로드 합니다. gsutil cp를 이용합니다.

!gsutil cp ./model.joblib gs://$BUCKET_NAME/model_dir/model.joblib

 

 

이제 Bucket에서 확인해봅니다.

 

Google AI-Platform에 모델 생성을 합니다. 웹에서 생성하는 방법과 노트북에서 생성하는 방법이 있는데, 노트북에서 생성하는 코드는 다음과 같습니다. Debug모드를 수행하였고, 시간이 다소 걸립니다.

!gcloud ai-platform models create $AI_PLATFORM_MODEL_NAME --region global

!gcloud ai-platform versions create version_1 \
--model $AI_PLATFORM_MODEL_NAME \
--runtime-version 2.2  \
--python-version 3.7 \
--framework scikit-learn \
--origin gs://$BUCKET_NAME/model_dir \
--region global \
--verbosity "debug"

이제 모델의 Endpoint 테스트를 해봅니다. 아래 코드를 실행합니다. 

googleapiclient.discovery를 이용해서 AIPlatform에 있는 모델을 활용해볼 수 있습니다. 

import googleapiclient.discovery
from IPython.display import clear_output


instances = [
  [6.7, 3.1, 4.7, 1.5, 0],
  [4.6, 3.1, 1.5, 0.2, 0],
]

service = googleapiclient.discovery.build('ml', 'v1')
name = f'projects/{PROJECT_ID}/models/{AI_PLATFORM_MODEL_NAME}/'

response = service.projects().predict(
    name=name,
    body={'instances': instances}
).execute()

if 'error' in response:
    raise RuntimeError(response['error'])
else:
  clear_output()
  print("Predictions: ", response['predictions'])

 

 

4. Online Store로 Data Load(Materialize)

 

마지막 materialize 실행 시점으로부터 데이터 로드합니다. 시간이 다소 걸립니다.

!feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S")

 

5. Model Inference

 

앞서 생성했던 Feature들을 읽어서 predict하는 함수입니다. 이전과 같이 googleapiclient.discovery를 이용합니다. 

import googleapiclient.discovery

def predict(entity_rows):
    feature_vector = store.get_online_features(
        features=[
        "user_transaction_count_7d:transaction_count_7d",
        "user_account_features:credit_score",
        "user_account_features:account_age_days",
        "user_account_features:user_has_2fa_installed",
        "user_has_fraudulent_transactions:user_has_fraudulent_transactions_7d"
    ],
        entity_rows=entity_rows
    ).to_dict()
    
    del feature_vector["user_id"]

    instances = [
        [feature_values[i] for feature_values in feature_vector.values()]
        for i in range(len(entity_rows))
    ]
    
    service = googleapiclient.discovery.build('ml', 'v1')
    name = f'projects/{PROJECT_ID}/models/{AI_PLATFORM_MODEL_NAME}'
    
    response = service.projects().predict(
        name=name,
        body={'instances': instances}
    ).execute()
    
    clear_output()
    return response

predict()

 

 

여기까지가 Feast를 활용한 예제였습니다.

 

 

 

 

 

 

 

 

6. 작업 실행 예약하기

이제 GCP를 활용하여 이 전 작업을 예약 작업을 할 수 있습니다. Cloud Build API를 이용하여 사용설정을 합니다. 

https://console.cloud.google.com/apis/library/cloudbuild.googleapis.com?project=mlops-project-334512 

 

Google Cloud Platform

하나의 계정으로 모든 Google 서비스를 Google Cloud Platform을 사용하려면 로그인하세요.

accounts.google.com

 

그 후 Colab에서 cloud_function이라는 폴더를 생성하고 configs.py를 생성하겠습니다.

!mkdir cloud_function

configs = f"""
project_id = '{PROJECT_ID}'
bigquery_dataset_name = '{BIGQUERY_DATASET_NAME}'
bucket_name = '{BUCKET_NAME}'
"""

with open('cloud_function/configs.py', "w") as configs_file:
    configs_file.write(configs)

 

main.py도 생성합니다. Query문을 유의깊게 보시면 될 것 같습니다. 

%%writefile cloud_function/main.py
from datetime import datetime, timedelta
from google.cloud import bigquery
from feast import FeatureStore, RepoConfig
from configs import project_id, bigquery_dataset_name, bucket_name


def generate_user_count_features(aggregation_end_date):
    table_id  = f"{project_id}.{bigquery_dataset_name}.user_count_transactions_7d"

    client = bigquery.Client()
    job_config = bigquery.QueryJobConfig(destination=table_id, write_disposition='WRITE_APPEND')

    aggregation_start_date = datetime.now() - timedelta(days=7)

    sql = f"""
    SELECT
        src_account AS user_id,
        COUNT(*) AS transaction_count_7d,
        timestamp'{aggregation_end_date.isoformat()}' AS feature_timestamp
    FROM
        feast-oss.fraud_tutorial.transactions
    WHERE
        timestamp BETWEEN TIMESTAMP('{aggregation_start_date.isoformat()}')
        AND TIMESTAMP('{aggregation_end_date.isoformat()}')
    GROUP BY
        user_id
    """

    query_job = client.query(sql, job_config=job_config)
    query_job.result()
    print("Generated features as of: ", aggregation_end_date.isoformat())


def materialize_features(end_date):
    store = FeatureStore(
        config=RepoConfig(
            project="fraud_tutorial",
            registry=f"gs://{bucket_name}/registry.db",
            provider="gcp"
        )
    )
    store.materialize_incremental(end_date=datetime.now())

def main(data, context):
    generate_user_count_features(aggregation_end_date=datetime.now())
    materialize_features(end_date=datetime.now())

 

위 코드를 수행하면 생성이 잘 된것을 확인할 수 있습니다. 

 

 

그 후 requirements.txt 도 생성합니다. code 상에서 작업합니다. 

%%writefile cloud_function/requirements.txt
google-cloud-bigquery
feast[gcp]

 

cloud function을 실행합니다. 

!cd cloud_function && gcloud functions deploy feast-update-features \
--entry-point main \
--runtime python37 \
--trigger-resource feast-schedule \
--trigger-event google.pubsub.topic.publish \
--timeout 540s

 

y로 응답하고 나면 operation이 성공적으로 되었다고 알려줍니다. 시간이 꽤 오래걸립니다. 

 

이제 결과를 확인해 보겠습니다. gcp에서 cloud functions를 검색하여 접근합니다.

https://console.cloud.google.com/functions/

 

Google Cloud Platform

하나의 계정으로 모든 Google 서비스를 Google Cloud Platform을 사용하려면 로그인하세요.

accounts.google.com

 

이제 작업을 예약합니다. web에서 할 수 있지만 colab에서 한번 설정하겠습니다. 스케쥴은 crontab과 같은 형식입니다. aws lambda function과 똑같습니다. 

!gcloud scheduler jobs create pubsub feast-daily-job \
--schedule "0 22 * * *" \
--topic feast-schedule \
--message-body "This job schedules feature materialization once a day."

 

클라우드 스케쥴러가서 확인합니다. 

https://console.cloud.google.com/cloudscheduler

 

Google Cloud Platform

하나의 계정으로 모든 Google 서비스를 Google Cloud Platform을 사용하려면 로그인하세요.

accounts.google.com

 

이제 이렇게 냅두면 매일 22시마다 작업이 수행이 되는 문제가 생깁니다. 따라서 모든 자원 해제를 합니다. 이렇게 하여 과금 조절을 할 수 있습니다. 

!bq rm -t -f feast_fraud_tutorial.user_count_transactions_7d
!bq rm -r -f -d mlops-project
!gcloud functions delete feast-update-features
!gcloud scheduler jobs delete feast-daily-job --project $PROJECT_ID

 

 

여기까지가 Feast Feature Store에서 Provider가 GCP인 경우에 대한 예제를 다루었습니다. 

이전 글에서 다루었던 실습 코드를 첨부하겠습니다.

Feature_Store_practice.ipynb
0.08MB

 

 

2022.03.11 - [AI & Programming/MLOps] - [패스트캠퍼스 챌린지 47일차] GCP - Feast Feature Store (1)

2022.03.12 - [AI & Programming/MLOps] - [패스트캠퍼스 챌린지 48일차] GCP - Feast Feature Store (2)

 

 

 

 


 

 

https://bit.ly/37BpXiC

 

패스트캠퍼스 [직장인 실무교육]

프로그래밍, 영상편집, UX/UI, 마케팅, 데이터 분석, 엑셀강의, The RED, 국비지원, 기업교육, 서비스 제공.

fastcampus.co.kr

 

* 본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성되었습니다.

 

 

 

 

 


profile

호돌찌의 AI 연구소

@hotorch's AI Labs

포스팅이 도움이 되셨다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!