1. 시작하기 전에 개념 잡기 - Pipeline & Component
kubeflow를 하기 전에 pipeline과 component에 대한 개념이 제대로 잡혀있어야 합니다. 두 용어에 대한 관계와 정의는 다음과 같습니다.
- Component : 재사용 가능한 형태로 분리된 하나의 작업 단위, 쿠버네티스 관점에서는 Pod와 매핑이 가능합니다.
- Pipeline : 여러 Component 들의 연관성, 순서에 따라 연결지은 그래프(DAG), 쿠버네티스 관점에서는 Workflow와 매핑이 가능합니다.
- Pipeline
- kfp sdk 를 사용하여 pipeline을 구현한 뒤, kfp의 dsl compiler 즉, "dsl-compile" 혹은 "kfp.compiler.Compiler().compile()" 명령을 사용해 컴파일하면 k8s 가 이해할 수 있는 형태의 yaml 파일이 생성됩니다. 이 부분은 예시로 보여드리겠습니다.
- yaml 파일을 조금 자세히 살펴보면, kind 가 Workflow 로 되어있는 것을 확인할 수 있으며, 작성한 component code들이 중간중간에 copy 되어있는 것을 확인하실 수 있습니다. Workflow라는 리소스는 간단히 말하면 여러 개의 container 들을 정해진 순서에 따라 실행시키고, input/output을 전달하는 것을 정의한 DAG입니다.
- Component
- kfp sdk를 사용하여 component를 구현하면, 그 component 를 사용하는 pipeline을 컴파일했을 때 생성되는 workflow yaml 파일의 spec.templates 에 해당 컴포넌트를 감싼 (containerized) 부분이 추가됩니다.
- 하나의 component는 k8s 상에서는 하나의 독립적인 pod으로 생성되어 component 내부에 작성된 코드를 component decorator에 작성한 base_image 환경에서 실행하게 됩니다. base_image 지정을 통해 항상 동일한 환경에서 정해진 코드가 실행되는 것을 보장할 수 있습니다.
- 따라서 하나의 pipeline 내에서 연속된 component라고 하더라도 memory를 공유하는 일은 일어나지 않으며, 일반적으로 서로 다른 component 간의 data 공유는 input/output 변수 혹은 파일 경로로 넘겨주는 방식을 사용합니다.
2. 환경 세팅
python 3.8.9 위에서 시행하였습니다. pypi 패키지로는 kfp를 설치하여야 합니다.
pip install kfp --upgrade --use-feature=2020-resolver
아래 그림의 명령어들을 입력하여 정상 설치되어있는지 확인합니다. kfp 관련 버전들을 확인해주시길 바랍니다.
저번처럼 minikube with kubeflow 환경에서 시행하며 새 터미널을 켜서 port - forward를 실시합니다.
3. Example 1 - 컴파일해보고, python을 사용하여 pipeline 업로드하고 run 해보기
간단한 python code를 바탕으로 component와 pipeline으로 만들어본 뒤, kubeflow에 배포하여 사용해보겠습니다. "add.py"라고 만들겠습니다. 파이썬 코드 내용은 쉽습니다. 하지만 아래 코드 주석을 반드시 참고해주시길 바랍니다.
# Python 함수를 Component 로 바꿔주는 함수
# decorator 로도 사용할 수 있으며, 여러 옵션을 argument 로 설정할 수 있음
# add_op = create_component_from_func(
# func=add,
# base_image='python:3.7', # Optional : component 는 k8s pod 로 생성되며, 해당 pod 의 image 를 설정
# output_component_file='add.component.yaml', # Optional : component 도 yaml 로 compile 하여 재사용하기 쉽게 관리 가능
# packages_to_install=['pandas==0.24'], # Optional : base image 에는 없지만, python code 의 의존성 패키지가 있으면 component 생성 시 추가 가능
# )
from kfp.components import create_component_from_func
"""
kfp.components.create_component_from_func :
Python 함수를 Component 로 바꿔주는 함수
decorator 로도 사용할 수 있으며, 여러 옵션을 argument 로 설정할 수 있음
add_op = create_component_from_func(
func=add,
base_image='python:3.7', # Optional : component 는 k8s pod 로 생성되며, 해당 pod 의 image 를 설정
output_component_file='add.component.yaml', # Optional : component 도 yaml 로 compile 하여 재사용하기 쉽게 관리 가능
packages_to_install=['pandas==0.24'], # Optional : base image 에는 없지만, python code 의 의존성 패키지가 있으면 component 생성 시 추가 가능
)
"""
def add(value_1: int, value_2: int) -> int:
"""
더하기
"""
ret = value_1 + value_2
return ret
def subtract(value_1: int, value_2: int) -> int:
"""
빼기
"""
ret = value_1 - value_2
return ret
def multiply(value_1: int, value_2: int) -> int:
"""
곱하기
"""
ret = value_1 * value_2
return ret
# Python 함수를 선언한 후, kfp.components.create_component_from_func 를 사용하여
# ContainerOp 타입(component)으로 convert
add_op = create_component_from_func(add)
subtract_op = create_component_from_func(subtract)
multiply_op = create_component_from_func(multiply)
from kfp.dsl import pipeline
@pipeline(name="add example")
def my_pipeline(value_1: int, value_2: int):
task_1 = add_op(value_1, value_2)
task_2 = subtract_op(value_1, value_2)
# component 간의 data 를 넘기고 싶다면,
# output -> input 으로 연결하면 DAG 상에서 연결됨
# compile 된 pipeline.yaml 의 dag 파트의 dependency 부분 확인
# uploaded pipeline 의 그래프 확인
task_3 = multiply_op(task_1.output, task_2.output)
** 주의점 : python code의 주석에 한글이 들어가면 encoding 문제로 pipeline upload 가 되지 않을 수 있으니, 컴파일 전에는 한글 주석을 모두 제거해주시길 바랍니다.
주석의 내용을 요약하면 python을 활용하여 pipeline을 만드는 순서는 아래와 같습니다.
- Python 함수를 구현합니다.
- 해당 함수 밖에서 선언된 코드를 사용해서는 안 됩니다. 반드시 import 문까지도 함수 안에 작성되어야 합니다.
- 단, 해당 python 함수를 component로 만들 때, base_image로 사용하는 Docker 이미지에 들어있는 코드는 함수 내부에 선언하지 않아도 사용할 수 있습니다.
- 복잡한 로직을 모두 Python 함수 단위로 모듈화를 하기는 어렵기 때문에, 이런 경우에는 Component의 base image로 사용할 docker image를 만들어두고 base_image로 지정하는 방식을 주로 사용합니다.
- 자세한 사용 방법은 다음 링크를 참고합니다.
- pipeline으로 엮을 수 있도록 input, output을 잘 지정하여 구현합니다.
- component 간에 input, output 을 넘기는 방식은 추후 다른 예제로 함께 살펴보겠습니다.
- "kfp.components.create_component_from_func" 함수를 사용하여, Python 함수를 kubeflow Component (ContainerOp)로 변환합니다.
- decorator로 사용할 수도 있고, base_image, extar_packages 등 여러 argument를 지정할 수도 있습니다.
- "kfp.dsl.pipeline" 데코레이터 함수를 사용하여, 각 component 들간의 input-output을 엮습니다.
- kfp.dsl의 여러 메서드들을 사용하여, 컴포넌트 실행 시의 Condition 등을 지정할 수도 있습니다.
- "kfp.compiler.Compiler().compile" 혹은 "dsl-compile"을 사용하여 pipeline python code를 k8s의 Workflow yaml 파일로 컴파일합니다.
- 컴파일된 yaml 파일을 UI를 통해 업로드하고 run 합니다. 혹은 kfp.Client를 사용하거나 kfp CLI, HTTP API를 사용해서 run 할 수도 있습니다.
이제 pipeline을 compile을 하면, 동일한 경로에 "add_pipeline.yaml"이 생성됩니다. 아래와 같이 compile을 합니다.
dsl-compile --py add.py --output add_pipeline.yaml
yaml 파일은 아래와 같이 생겼습니다. python에 작성된 것을 바탕으로 yaml 이 생성되고 이 yaml이 dag를 구성하게 될 예정입니다. I/O를 위주로 보시면 됩니다.
이제 kubeflow에 들어가서 pipeline을 업로드합니다.
python code로 component와 pipeline을 짜고, 이를 컴파일하여 yaml을 생성합니다. 이를 업로드하면 아래와 같은 DAG가 생성되는 것을 확인이 가능합니다.
우측 상단에 + create run을 누릅니다. 그 후 pipeline을 선택하겠다는 것을 누릅니다. 방금 업로드한 pipeline을 선택합니다.
그 후에 experiment와 run parameter를 입력합니다.
그 후, 새로 생긴 run of add_pipeline(run name)을 누릅니다.
처음에 이렇게 컨테이너가 생성중이라고 펜딩 중임을 알 수 있습니다.
시간이 지나면 multiply component 가 생긴 것을 알 수 있습니다.
4. Example 2 - kfp.compiler를 활용한 Compile
위의 "add.py" 와의 차이를 중점적으로 보시길 바랍니다. add_2.py로 생성하겠습니다. 동일한 코드이지만 main 문이 큰 차이점이라고 할 수 있습니다.
import kfp.compiler
from kfp.components import create_component_from_func
def add(value_1: int, value_2: int) -> int:
"""
더하기
"""
ret = value_1 + value_2
return ret
def subtract(value_1: int, value_2: int) -> int:
"""
빼기
"""
ret = value_1 - value_2
return ret
def multiply(value_1: int, value_2: int) -> int:
"""
곱하기
"""
ret = value_1 * value_2
return ret
add_op = create_component_from_func(add)
subtract_op = create_component_from_func(subtract)
multiply_op = create_component_from_func(multiply)
from kfp.dsl import pipeline
@pipeline(name="add example")
def my_pipeline(value_1: int, value_2: int):
task_1 = add_op(value_1, value_2)
task_2 = subtract_op(value_1, value_2)
task_3 = multiply_op(task_1.output, task_2.output)
if __name__ == "__main__":
kfp.compiler.Compiler().compile(my_pipeline, "./add_pipeline_2.yaml")
compile 할 때, "dsl-compile" 대신 "kfp.compiler"를 사용하는 코드가 추가된 버전입니다. python add_2.py를 실행하면 yaml 파일이 생긴 것을 알 수 있습니다.
다음 글에서는 또 다른 버전의 pipeline에 대한 글로 찾아뵙겠습니다.
* 본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성되었습니다.
'AI > MLOps' 카테고리의 다른 글
[패스트캠퍼스 챌린지 28일차] Kubeflow Katib (0) | 2022.02.20 |
---|---|
[패스트캠퍼스 챌린지 27일차] Kubeflow Pipeline (2) (0) | 2022.02.19 |
[패스트캠퍼스 챌린지 25일차] Kubeflow 기능 살펴보기 (0) | 2022.02.17 |
[패스트캠퍼스 챌린지 24일차] Kubeflow (0) | 2022.02.16 |
[패스트캠퍼스 챌린지 23일차] Prometheus & Grafana Practice (0) | 2022.02.15 |