호돌찌의 AI 연구소
article thumbnail

 

1. 시작하기 전에 개념 잡기 - Pipeline & Component

kubeflow를 하기 전에 pipeline과 component에 대한 개념이 제대로 잡혀있어야 합니다. 두 용어에 대한 관계와 정의는 다음과 같습니다. 

  • Component : 재사용 가능한 형태로 분리된 하나의 작업 단위, 쿠버네티스 관점에서는 Pod와 매핑이 가능합니다.
  • Pipeline : 여러 Component 들의 연관성, 순서에 따라 연결지은 그래프(DAG), 쿠버네티스 관점에서는 Workflow와  매핑이 가능합니다. 

 

- Pipeline

  • kfp sdk 를 사용하여 pipeline을 구현한 뒤, kfp의 dsl compiler 즉, "dsl-compile" 혹은 "kfp.compiler.Compiler().compile()" 명령을 사용해 컴파일하면 k8s 가 이해할 수 있는 형태의 yaml 파일이 생성됩니다. 이 부분은 예시로 보여드리겠습니다.
    • yaml 파일을 조금 자세히 살펴보면, kind 가 Workflow 로 되어있는 것을 확인할 수 있으며, 작성한 component code들이 중간중간에 copy 되어있는 것을 확인하실 수 있습니다. Workflow라는 리소스는 간단히 말하면 여러 개의 container 들을 정해진 순서에 따라 실행시키고, input/output을 전달하는 것을 정의한 DAG입니다.

 

- Component

  • kfp sdk를 사용하여 component를 구현하면, 그 component 를 사용하는 pipeline을 컴파일했을 때 생성되는 workflow yaml 파일의 spec.templates 에 해당 컴포넌트를 감싼 (containerized) 부분이 추가됩니다.
    • 하나의 component는 k8s 상에서는 하나의 독립적인 pod으로 생성되어 component 내부에 작성된 코드를 component decorator에 작성한 base_image 환경에서 실행하게 됩니다. base_image 지정을 통해 항상 동일한 환경에서 정해진 코드가 실행되는 것을 보장할 수 있습니다.
    • 따라서 하나의 pipeline 내에서 연속된 component라고 하더라도 memory를 공유하는 일은 일어나지 않으며, 일반적으로 서로 다른 component 간의 data 공유는 input/output 변수 혹은 파일 경로로 넘겨주는 방식을 사용합니다.

 

 

 

2. 환경 세팅

python 3.8.9 위에서 시행하였습니다. pypi 패키지로는 kfp를 설치하여야 합니다. 

pip install kfp --upgrade --use-feature=2020-resolver

 

아래 그림의 명령어들을 입력하여 정상 설치되어있는지 확인합니다. kfp 관련 버전들을 확인해주시길 바랍니다. 

kubctl get po -n kubeflow는 status가 running이 될 때 까지 기다린다. 

 

저번처럼 minikube with kubeflow 환경에서 시행하며 새 터미널을 켜서 port - forward를 실시합니다. 

 

 

3. Example 1 - 컴파일해보고, python을 사용하여 pipeline 업로드하고 run 해보기

간단한 python code를 바탕으로 component와 pipeline으로 만들어본 뒤, kubeflow에 배포하여 사용해보겠습니다. "add.py"라고 만들겠습니다. 파이썬 코드 내용은 쉽습니다. 하지만 아래 코드 주석을 반드시 참고해주시길 바랍니다.

# Python 함수를 Component 로 바꿔주는 함수
# decorator 로도 사용할 수 있으며, 여러 옵션을 argument 로 설정할 수 있음
# add_op = create_component_from_func(
#                 func=add,
#                 base_image='python:3.7', # Optional : component 는 k8s pod 로 생성되며, 해당 pod 의 image 를 설정
#                 output_component_file='add.component.yaml', # Optional : component 도 yaml 로 compile 하여 재사용하기 쉽게 관리 가능
#                 packages_to_install=['pandas==0.24'], # Optional : base image 에는 없지만, python code 의 의존성 패키지가 있으면 component 생성 시 추가 가능
#             )
from kfp.components import create_component_from_func

"""
kfp.components.create_component_from_func :
    Python 함수를 Component 로 바꿔주는 함수
    decorator 로도 사용할 수 있으며, 여러 옵션을 argument 로 설정할 수 있음
    
    add_op = create_component_from_func(
                func=add,
                base_image='python:3.7', # Optional : component 는 k8s pod 로 생성되며, 해당 pod 의 image 를 설정
                output_component_file='add.component.yaml', # Optional : component 도 yaml 로 compile 하여 재사용하기 쉽게 관리 가능
                packages_to_install=['pandas==0.24'], # Optional : base image 에는 없지만, python code 의 의존성 패키지가 있으면 component 생성 시 추가 가능
            )
"""


def add(value_1: int, value_2: int) -> int:
    """
    더하기
    """
    ret = value_1 + value_2
    return ret


def subtract(value_1: int, value_2: int) -> int:
    """
    빼기
    """
    ret = value_1 - value_2
    return ret


def multiply(value_1: int, value_2: int) -> int:
    """
    곱하기
    """
    ret = value_1 * value_2
    return ret


# Python 함수를 선언한 후, kfp.components.create_component_from_func 를 사용하여
# ContainerOp 타입(component)으로 convert
add_op = create_component_from_func(add)
subtract_op = create_component_from_func(subtract)
multiply_op = create_component_from_func(multiply)

from kfp.dsl import pipeline


@pipeline(name="add example")
def my_pipeline(value_1: int, value_2: int):
    task_1 = add_op(value_1, value_2)
    task_2 = subtract_op(value_1, value_2)

    # component 간의 data 를 넘기고 싶다면,
    # output -> input 으로 연결하면 DAG 상에서 연결됨

    # compile 된 pipeline.yaml 의 dag 파트의 dependency 부분 확인
    # uploaded pipeline 의 그래프 확인
    task_3 = multiply_op(task_1.output, task_2.output)

** 주의점 : python code의 주석에 한글이 들어가면 encoding 문제로 pipeline upload 가 되지 않을 수 있으니, 컴파일 전에는 한글 주석을 모두 제거해주시길 바랍니다. 

 

주석의 내용을 요약하면 python을 활용하여 pipeline을 만드는 순서는 아래와 같습니다.

  1. Python 함수를 구현합니다.
    1. 해당 함수 밖에서 선언된 코드를 사용해서는 안 됩니다. 반드시 import 문까지도 함수 안에 작성되어야 합니다.
    2. 단, 해당 python 함수를 component로 만들 때, base_image로 사용하는 Docker 이미지에 들어있는 코드는 함수 내부에 선언하지 않아도 사용할 수 있습니다.
      1. 복잡한 로직을 모두 Python 함수 단위로 모듈화를 하기는 어렵기 때문에, 이런 경우에는 Component의 base image로 사용할 docker image를 만들어두고 base_image로 지정하는 방식을 주로 사용합니다.
      2. 자세한 사용 방법은 다음 링크를 참고합니다.
    3. pipeline으로 엮을 수 있도록 input, output을 잘 지정하여 구현합니다.
      1. component 간에 input, output 을 넘기는 방식은 추후 다른 예제로 함께 살펴보겠습니다.
  2. "kfp.components.create_component_from_func" 함수를 사용하여, Python 함수를 kubeflow Component (ContainerOp)로 변환합니다.
    1. decorator로 사용할 수도 있고, base_image, extar_packages 등 여러 argument를 지정할 수도 있습니다.
  3. "kfp.dsl.pipeline" 데코레이터 함수를 사용하여, 각 component 들간의 input-output을 엮습니다.
    1. kfp.dsl의 여러 메서드들을 사용하여, 컴포넌트 실행 시의 Condition 등을 지정할 수도 있습니다. 
  4. "kfp.compiler.Compiler().compile" 혹은 "dsl-compile"을 사용하여 pipeline python code를 k8s의 Workflow yaml 파일로 컴파일합니다.
  5. 컴파일된 yaml 파일을 UI를 통해 업로드하고 run 합니다. 혹은 kfp.Client를 사용하거나 kfp CLI, HTTP API를 사용해서 run 할 수도 있습니다.

 

이제 pipeline을 compile을 하면, 동일한 경로에 "add_pipeline.yaml"이 생성됩니다. 아래와 같이 compile을 합니다. 

dsl-compile --py add.py --output add_pipeline.yaml

 

yaml 파일은 아래와 같이 생겼습니다. python에 작성된 것을 바탕으로 yaml 이 생성되고 이 yaml이 dag를 구성하게 될 예정입니다. I/O를 위주로 보시면 됩니다.

 

 

이제 kubeflow에 들어가서 pipeline을 업로드합니다.

 

python code로 component와 pipeline을 짜고, 이를 컴파일하여 yaml을 생성합니다. 이를 업로드하면 아래와 같은 DAG가 생성되는 것을 확인이 가능합니다.

 

우측 상단에 + create run을 누릅니다. 그 후 pipeline을 선택하겠다는 것을 누릅니다. 방금 업로드한 pipeline을 선택합니다.

 

그 후에 experiment와 run parameter를 입력합니다.

 

그 후, 새로 생긴 run of add_pipeline(run name)을 누릅니다.

 

처음에 이렇게 컨테이너가 생성중이라고 펜딩 중임을 알 수 있습니다. 

 

시간이 지나면 multiply component 가 생긴 것을 알 수 있습니다.

 

 

 

 

 

 

 

4. Example 2 - kfp.compiler를 활용한 Compile

위의 "add.py" 와의 차이를 중점적으로 보시길 바랍니다. add_2.py로 생성하겠습니다. 동일한 코드이지만 main 문이 큰 차이점이라고 할 수 있습니다. 

import kfp.compiler
from kfp.components import create_component_from_func


def add(value_1: int, value_2: int) -> int:
    """
    더하기
    """
    ret = value_1 + value_2
    return ret


def subtract(value_1: int, value_2: int) -> int:
    """
    빼기
    """
    ret = value_1 - value_2
    return ret


def multiply(value_1: int, value_2: int) -> int:
    """
    곱하기
    """
    ret = value_1 * value_2
    return ret


add_op = create_component_from_func(add)
subtract_op = create_component_from_func(subtract)
multiply_op = create_component_from_func(multiply)

from kfp.dsl import pipeline


@pipeline(name="add example")
def my_pipeline(value_1: int, value_2: int):
    task_1 = add_op(value_1, value_2)
    task_2 = subtract_op(value_1, value_2)

    task_3 = multiply_op(task_1.output, task_2.output)


if __name__ == "__main__":
    kfp.compiler.Compiler().compile(my_pipeline, "./add_pipeline_2.yaml")

 

compile 할 때, "dsl-compile" 대신 "kfp.compiler"를 사용하는 코드가 추가된 버전입니다. python add_2.py를 실행하면 yaml 파일이 생긴 것을 알 수 있습니다.

 

 

다음 글에서는 또 다른 버전의 pipeline에 대한 글로 찾아뵙겠습니다.

 

 


https://bit.ly/37BpXiC

 

패스트캠퍼스 [직장인 실무교육]

프로그래밍, 영상편집, UX/UI, 마케팅, 데이터 분석, 엑셀강의, The RED, 국비지원, 기업교육, 서비스 제공.

fastcampus.co.kr

 

* 본 포스팅은 패스트캠퍼스 환급 챌린지 참여를 위해 작성되었습니다.

 

 

 


profile

호돌찌의 AI 연구소

@hotorch's AI Labs

포스팅이 도움이 되셨다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!