Kubeflow Pipelines – DSL 이해하기 #1

순차적 처리를 하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

순차적으로 작업을 하는 파이프 라인을 만들어 보겠습니다. 동전을 던지는 단계와 그 결과를 출력하는 단계로 구성 되어 있습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 파이썬 코드로 작성되었습니다. 랜덤 함수를 사용해서 0이면 앞면(heads), 1이면 뒷면(tails) 이라고 출력합니다. 그리고 출력 결과를 /tmp/output 파일에 저장합니다. 결과가 저장된 /tmp/output 파일을 다음 단계에서 사용하기 위해 file_outputs 으로 정의합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 단순히 입력 받은 메시지를 echo 명령어를 사용하여 화면에 출력합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. flip_coin_op()가 먼저 실행되고, 그 다음 print_op()가 실행됩니다. 그리고 flip_coin_op()의 출력 결과를 print_op()의 입력 파라미터로 사용하고 있습니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Sequential pipeline',
    description='A pipeline with two sequential steps.'
)
def sequential_pipeline():
    """A pipeline with two sequential steps."""

    flip = flip_coin_op()
    print_op(flip.output)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Sequential pipeline', __file__ + '.zip')

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

sequential.py

import kfp
from kfp import dsl


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Sequential pipeline',
    description='A pipeline with two sequential steps.'
)
def sequential_pipeline():
    """A pipeline with two sequential steps."""

    flip = flip_coin_op()
    print_op(flip.output)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Sequential pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Sequential pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

Graph 탭에서 flip-coin 단계를 선택하고, Input/Output 탭을 클릭하면 출력 결과를 볼 수 있습니다.


조건에 의해서 분기를 하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

조건에 의해서 분기를 파이프 라인을 만들어 보겠습니다. 동전을 던지는 단계와 그 결과에 따러 “승리” 또는 “패배” 를 출력하는 단계로 구성 되어 있습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. flip_coin_op()가 먼저 실행되고, 그 결과에 따라서 print_op()를 사용해서 “승리” 또는 “패배”를 출력하게 됩니다. 동전이 앞면인지 뒷면이지 판단하기 위해서 dsl.Condition() 을 사용하였습니다. 앞면이면 ‘YOUT WIN’을 출력하고, 뒷면이면 ‘YOU LOSE’를 출력합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Conditional execution pipeline',
    description='Shows how to use dsl.Condition().'
)
def condition_pipeline():
    flip = flip_coin_op()
    with dsl.Condition(flip.output == 'heads'):
        print_op('YOUT WIN')

    with dsl.Condition(flip.output == 'tails'):
        print_op('YOU LOSE')


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Condition pipeline', __file__ + '.zip')

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

condition.py

import kfp
from kfp import dsl


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Conditional execution pipeline',
    description='Shows how to use dsl.Condition().'
)
def condition_pipeline():
    flip = flip_coin_op()
    with dsl.Condition(flip.output == 'heads'):
        print_op('YOUT WIN')

    with dsl.Condition(flip.output == 'tails'):
        print_op('YOU LOSE')


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Condition pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Condition pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


병렬 루프를 사용하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

정적 항목 세트에 대한 병렬 루프를 사용하는 파이프 라인을 만들어 보겠습니다. 동전을 여러번 던지고, 그 결과를 json 형식으로 저장 단계와 그 결과들을 병렬 루프를 통해서 출력하는 단계로 구성 되어 있습니다.

flip_coins_op() 는 동전을 여러번 던지는 단계입니다. 동전을 던진 결과 값을 json 형식으로 저정합니다.

def flip_coins_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import json; import sys; import random; '
                   'json.dump([(\\'heads\\' if random.randint(0,1) == 1 else \\'tails\\') for i in range(10)], '
                   'open(\\'/tmp/output.json\\', \\'w\\'))"'],
        file_outputs={'output': '/tmp/output.json'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. flip_coins_op()가 먼저 실행되고, 그 결과가 json 형식으로 반환됩니다. json에 포함된 아이템의 개수 만큼 print_op() 를 실행합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Loop pipeline',
    description='A pipeline with parallel loop'
)
def loop_pipeline():
    flips = flip_coins_op()
    with dsl.ParallelFor(flips.output) as item:
        print_op(item)

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

loop.py

import kfp
from kfp import dsl


def flip_coins_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import json; import sys; import random; '
                   'json.dump([(\\'heads\\' if random.randint(0,1) == 1 else \\'tails\\') for i in range(10)], '
                   'open(\\'/tmp/output.json\\', \\'w\\'))"'],
        file_outputs={'output': '/tmp/output.json'}
    )



def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Loop pipeline',
    description='A pipeline with parallel loop'
)
def loop_pipeline():
    flips = flip_coins_op()
    with dsl.ParallelFor(flips.output) as item:
        print_op(item)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(loop_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Loop pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Loop pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


Exit Handler를 사용하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

Exit Handler 사용하는 파이프 라인을 만들어 보겠습니다. Exit Handler를 사용하면 작업 그룹이 종료 될 때 지정한 작업을 실행시킬 수 있습니다. 작업 그룹의 성공 여부와는 상관 없이 무조건 지정한 작업이 실행됩니다.

동전을 여러번 던지고, 그 결과를 json 형식으로 저장 단계와 그 결과들을 병렬 루프를 통해서 출력하는 단계로 구성 되어 있습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. dsl.ExitHandler()를 사용해서, 작업 그룹이 종료될때 실행할 작업을 지정할 수 있습니다. 작업 그룹은 flip_coin_op() 과 print_op(flip.output) 으로 구성되어 있습니다. 작업 그룹에 속한 작업들이 종료되면 dsl.ExitHandler(exit_op)에 지정한 exit_op 작업이 실행됩니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Exit handler',
    description=' The exit handler will run after the pipeline finishes (successfully or not)'
)
def sequential_pipeline():
    exit_op = print_op('Exit')

    with dsl.ExitHandler(exit_op):
        flip = flip_coin_op()
        print_op(flip.output)

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

exit_handler.py

import kfp
from kfp import dsl


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Exit handler',
    description=' The exit handler will run after the pipeline finishes (successfully or not)'
)
def sequential_pipeline():
    exit_op = print_op('Exit')

    with dsl.ExitHandler(exit_op):
        flip = flip_coin_op()
        print_op(flip.output)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Exit Handler', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Exit Handler” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


파이프 라인 파라미터 사용하기

파이프 라인을 구성하고 실행하기

파라미터를 입력 받아서, 파이프 라인을 실행할 수 있는 파이프 라인을 만들어 보겠습니다.

파이프 라인을 실행할 때 앞면(heads) 또는 뒷면(tails)을 파라미터로 입력받아서, 동전을 던진 결과와 같으면 승링를 다르면 패배를 출력하게 하였습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. 파이프 라인 함수에 predict 파라미터를 입력 받을 수 있도록 추가하였습니다. 파이프 라인 함수가 호출되면 각 파라미터는 PipelineParam 객체가 됩니다.

@dsl.pipeline(
    name='Pipeline parameters',
    description='Pipeline parameters'
)
def condition_pipeline(
        predict : str = 'heads'):
    flip = flip_coin_op()
    with dsl.Condition(flip.output == predict):
        print_op('YOU WIN')

    with dsl.Condition(flip.output != predict):
        print_op('YOU LOSE')

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다. 파이프 라인 실행 시 파라미터를 넘기기 위해서, params 파라미터를 사용하였습니다.

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Pipeline parameters', __file__ + '.zip', params={'predict' : 'tails'})

파라미터는 Kubeflow Pipelines UI를 통해서 실행할 때도 입력할 수 있습니다.

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

parameters.py

import kfp
from kfp import dsl
from kfp.dsl import PipelineParam


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Pipeline parameters',
    description='Pipeline parameters'
)
def condition_pipeline(
        predict : str = 'heads'):
    flip = flip_coin_op()
    with dsl.Condition(flip.output == predict):
        print_op('YOU WIN')

    with dsl.Condition(flip.output != predict):
        print_op('YOU LOSE')


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Pipeline parameters', __file__ + '.zip', params={'predict' : 'tails'})

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Pipeline parameters” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


Kubeflow Pipelines – SDK를 사용해서 파이프라인 만들기

Kubeflow Pipelines SDK를 사용하여 파이프라인과 컴포넌트를 구성하고, 빌드하는 방법에 대해서 알아보겠습니다. 그리고 SDK를 사용하여 파이프라인을 실행하는 방법과, Kubeflow Pipelines UI를 사용하여 파이프라인을 실행하는 방법에 대해서 알아보겠습니다.


Pipelines SDK 소개

Kubeflow Pipelines SDK는 머신 러닝 워크 플로우를 정의하고, 실행시킬 수 있는 파이썬 패키지 세트를 제공합니다. 파이프 라인은 워크 플로우의 단계를 구성하는 컴포넌트들과, 각 컴포넌트들이 서로 상호 작용하는 방식을 정의해 놓은 것입니다.

Kubeflow Pipelines SDK는 파이프 라인을 컴파일하고 실행하는 등의 여러 상호 작용 기능을 제공하고 있습니다. 그리고 파이프 라인의 구성 요소인 컴포넌트를 만들고 로드 하는 등의 기능도 제공하고 있습니다. 컴포넌트에서 사용할 컨테이너 이미지를 빌드 하는 기능도 제공하고 있습니다. 다만 Kubeflow Pipelines SDK에 포함되어 있는 컨테이너 빌더 기능은 Google Cloud Platform (GCP) 환경에서만 원활하게 사용할 수 있습니다.

SDK 패키지

Kubeflow Pipelines SDK에는 다음과 같은 패키지가 포함되어 있습니다.

  • kfp.compiler : 파이프 라인을 컴파일 할 수 있는 기능을 제공하고 있습니다. 이 패키지의 주요 사용 메소드는 다음과 같습니다
    • kfp.compiler.Compiler.compile : Python DSL 코드를 Kubeflow Pipelines 서비스가 처리 할 수 있는 단일 정적 구성 (YAML 형식)으로 컴파일합니다. Kubeflow Pipelines 서비스는 정적 구성을 실행을 위해 Kubernetes 리소스 세트로 변환합니다. (현재는 컴파일하면 Argo Workflows 형태로 변환합니다.)
  • kfp.component : 파이프 라인 컴포넌트와 상호 작용하기 위한 기능을 제공하고 있습니다. 이 패키지의 주요 사용 메소드는 다음과 같습니다.
    • kfp.components.func_to_container_op : Python 함수를 파이프 라인 컴포넌트로 변환하고 팩토리 함수를 리턴합니다. 그런 다음 팩토리 함수를 호출하여 컨테이너에서 원래 함수를 실행하는 파이프 라인 태스크 (ContainerOp)의 인스턴스를 구성 할 수 있습니다.
    • kfp.components.load_component_from_file : 파일에서 파이프 라인 컴포넌트를 로드하고 팩토리 함수를 리턴합니다. 그런 다음 팩토리 함수를 호출하여 컴포넌트 컨테이너 이미지를 실행하는 파이프 라인 태스크 (ContainerOp)의 인스턴스를 구성 할 수 있습니다.
    • kfp.components.load_component_from_url : URL에서 파이프 라인 컴포넌트를 로드하고 팩토리 함수를 리턴합니다. 그런 다음 팩토리 함수를 호출하여 컴포넌트 컨테이너 이미지를 실행하는 파이프 라인 태스크 (ContainerOp)의 인스턴스를 구성 할 수 있습니다.
  • kfp.containers : 컴포넌트 컨테이너 이미지를 빌드하는 기능을 제공하고 있습니다. 이 패키지의 주요 사용 메소드는 다음과 같습니다
    • build_image_from_working_dir : 파이썬 작업 디렉토리를 사용하여 새 컨테이너 이미지를 빌드하고 푸시합니다. Python 컨테이너 이미지를 기본 이미지로 사용하는 Dockerfile을 생성하고, requirements.txt 파일 있는 경우 패키지를 설치하고 대상 Python 파일을 컨테이너 이미지에 복사합니다. 작업 디렉토리의 루트에 사용자 정의 Dockerfile을 만들어서 대체 할 수 있습니다. (현재는 Google Cloud Platform (GCP) 환경에서만 사용할 수 있습니다.)
  • kfp.dsl : 파이프 라인 및 컴포넌트를 정의하고 상호 작용하는 데 사용할 수있는 DSL (Domain-Specific Language)이 포함되어 있습니다. 이 패키지의 주요 사용 메소드는 다음과 같습니다.
    • kfp.dsl.ContainerOp : 컨테이너 이미지로 구현 된 파이프 라인 작업을 나타냅니다.
    • kfp.dsl.PipelineParam 한 파이프 라인 컴포넌트에서 다른 파이프 라인 컴포넌트로 전달할 수있는 파이프 라인 파라미터를 나타냅니다.
    • kfp.dsl.component : 파이프 라인 컴포넌트를 반환하는 DSL 함수의 데코레이터입니다. (ContainerOp).
    • kfp.dsl.pipeline : 파이프 라인을 반환하는 Python 함수의 데코레이터입니다.
    • kfp.dsl.python_component: 파이프 라인 컴포넌트 메타 데이터를 함수 객체에 추가하는 Python 함수의 데코레이터입니다.
    • kfp.dsl.types:  Kubeflow Pipelines SDK에서 사용하는 타입들이 정의되어 있습니다. 타입에는 String, Integer, Float 및 Bool과 같은 기본 타입과 GCPProjectID 및 GCRPath와 같은 도메인 별 타입이 있습니다. DSL 정적 유형 검사에 대해서는 안내서를 참조하실 수 있습니다.
    • kfp.dsl.ResourceOp : 쿠버네티스 리소스를 직접 조작할 수 작업을 나타냅니다.(creategetapply 등 ).
    • kfp.dsl.VolumeOp : 쿠버네티스 PersistentVolumeClaim 을 생성하는 파이프 라인 작업 을 나타냅니다.
    • kfp.dsl.VolumeSnapshotOp : 새로운 볼륨 스냅 샷을 생성하는 파이프 라인 작업을 나타냅니다.
    • kfp.dsl.PipelineVolume : 파이프 라인의 단계간에 데이터를 전달하기 위해 사용하는 볼륨을 나타냅니다.
  • kfp.Client : Kubeflow Pipelines API 용 Python 클라이언트 라이브러리가 포함되어 있습니다. 이 패키지의 주요 사용 메소드는 다음과 같습니다.
    • kfp.Client.create_experiment : 파이프 라인 experiment 을 만들고, experiment  개체를 반환합니다.
    • kfp.Client.run_pipeline 파이프 라인을 실행(run)하고 실행(run) 개체를 반환합니다.
  • KFP extension modules : Kubeflow Pipelines에서 사용할 수 있는 특정 플랫폼에 대한 기능을 가지고 있습니다. 온 프레미스, Google Cloud Platform (GCP), Amazon Web Services (AWS) 및 Microsoft Azure에 대한 유틸리티 기능을 제공하고 있습니다.

KFP CLI tool

KFP CLI 도구를 사용하면 커맨드 라인에서 직접 Kubeflow Pipelines SDK의 일부분을 사용할 수 있습니다. KFP CLI 도구는 다음과 같은 명령을 제공합니다.

  • kfp diagnose_me : 지정된 파라미터로 환경 진단을 실행합니다
    • --json : 명령 실행 결과를 JSON으로 반환하도록 합니다. 별도로 설정하지 않으면, 결과는 사람이 읽을 수 있는 형식으로 반환됩니다.
    • --namespace TEXT : 대상 쿠버네티스 네임스페이스를 지정합니다. 별도로 설정하지 않으면, 모든 네임스페이스를 대상으로 합니다.
  • kfp pipeline <COMMAND> : 파이프 라인을 관리하는 데 도움이 되는 명령을 제공합니다.
    • get : Kubeflow Pipelines 클러스터의 Kubeflow 파이프 라인에 대한 상세한 정보를 조회합니다.
    • list : Kubeflow Pipelines 클러스터에 업로드 된 파이프 라인 목록을 조회 합니다.
    • upload : Kubeflow Pipelines 클러스터에 파이프 라인을 업로드합니다.
  • kfp run <COMMAND> 파이프 라인 실행을 관리하는 데 도움이 되는 명령을 제공합니다.
    • get : 파이프 라인 실행의 상세한 정보를 조회합니다.
    • list : 최근 실행한 파이프 라인 실행 목록을 조회 합니다.
    • submit – 파이프 라인을 실행 시킵니다.

파이프 라인과 컴포넌트 만들기

SDK를 사용하여 파이프 라인과 컴포넌트를 만드는 방법에 대해서 알아 보도록 하겠습니다.

컴포넌트 만드는 방법

파이프라인은 컴포넌트로 구성되어 있습니다. 그래서 파이프라인을 만들기 위해서는 사용할 컴포넌트를 먼저 만들어야합니다. 이미 만들어 놓은 컴포넌트가 있으면 재사용할 수도 있습니다.

컴포넌트를 만드는 단계는 다음과 같습니다.

가 . 컴포넌트 프로그램 작성 : 컴포넌트에서 사용할 프로그램을 작성해야 합니다. 프로그램은 다른 컴포넌트로부터 데이터를 받기 위해서, 파일이나 명령행 인수를 사용해야 합니다.

나. 컴포넌트 컨테이너화 : 작성한 프로그램을 컨테이너 이미지로 만들어야 합니다.

다. 컴포넌트 스펙 작성 : 컴포넌트의 데이터 모델을 정의하기 위해서 YAML 형식의 파일을 작성해야 합니다. 재사용 가능한 컴포넌트를 만들때는 스펙을 작성하는 것이 좋지만, 생략 가능합니다. 컴포넌트 스펙 파일이 있는 경우에는 스펙 파일을 로드해서 컴포넌트를 생성할 수 있습니다. 자세한 내용은 “재사용 가능한 컴포넌트”를 참고 하시기 바랍니다.

파이프라인 만드는 방법

컴포넌트를 이용해 파이프라인을 만들 수 있습니다. 파이프라인을 만들기 위해서 파이프라인 파이썬 코드를 작성해야 합니다.

파이프라인을 만드는 단계는 다음과 같습니다.

가. Kubeflow Pipelines DSL을 사용하여 파이프라인 함수와 컴포넌트 함수를 작성합니다.

나. 파이프 라인을 컴파일 하여 압축 된 YAML 파일을 생성합니다.

파이프 라인을 컴파일 하기 위한 방법은 두 가지가 있습니다.

  • kfp.compiler.Compiler.compile 메소드를 사용하는 방법 kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')
  • 커맨드 라인에서 dsl-compile 커맨드를 사용하는 방법 dsl-compile --py [path/to/python/file] --output my_pipeline.zip

다. 파이프 라인을 업로드하고, 실행합니다.

파이프 라인을 실행하는 방법은 두 가지가 있습니다.

  • Kubeflow Pipelines SDK 를 사용하는 방법
client = kfp.Client()
my_experiment = client.create_experiment(name='demo')
my_run = client.run_pipeline(my_experiment.id, 'my-pipeline', 
  'my_pipeline.zip')
  • Kubeflow Pipelines UI를 사용하는 방법

컴포넌트 만들기

프로그램이 포함된 컨테이너 이미지를 사용하여 컴포넌트를 생성하는 방법에 대해서 알아 보도록 하겠습니다. 생성한 컴포넌트는 파이프라인을 작성하는데 사용됩니다.

가. 프로그램 코드를 작성합니다.

mnist-simple.py

from __future__ import absolute_import, division, print_function, unicode_literals

import tensorflow as tf
import numpy as np


def train():
    print("TensorFlow version: ", tf.__version__)

    mnist = tf.keras.datasets.mnist

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    # Reserve 10,000 samples for validation
    x_val = x_train[-10000:]
    y_val = y_train[-10000:]
    x_train = x_train[:-10000]
    y_train = y_train[:-10000]

    model = tf.keras.models.Sequential([
      tf.keras.layers.Flatten(input_shape=(28, 28)),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dropout(0.2),
      tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, epochs=5, validation_data=(x_val, y_val))

    print("Average test loss: ", np.average(training_history.history['loss']))


if __name__ == '__main__':
    train()

나. 프로그램 코드가 포함된 컨테이너 이미지를 생성하고, 컨테이너 이미지 레지스트리에 업로드 합니다.

Dockerfile을 생성합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD mnist-simple.py /app/

컨테이너 이미지를 빌드하겠습니다.

docker build -t kangwoo/mnist-simple:kfp .

빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.

docker push kangwoo/mnist-simple:kfp

SDK를 사용하여 파이프라인을 만들고 실행하기

생성한 컴포넌트를 이용해 파이프라인을 만들어 보겠습니다. 파이프라인을 만들기 위해서 파이프라인 파이썬 코드를 작성해야 합니다.

가. Kubeflow Pipelines DSL을 사용하여 컴포넌트 함수를 작성합니다. image 파라미터에 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 컴포넌트 함수는 kfp.dsl.ContainerOp를 리턴해야합니다. 선택적으로 kfp.dsl.component 라는 데코레이터를 사용하여 DSL 컴파일러에서 정적 타입 검사를 활성화 할 수 있습니다. 데코레이터를 사용하려면 @kfp.dsl.component 어노테이션을 컴포넌트 함수에 추가 하면 됩니다.

@kfp.dsl.component
def train_component_op():
    return kfp.dsl.ContainerOp(
        name='mnist-train',
        image='kangwoo/kfp-mnist:kfp'
    )

나. Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하는 함수에 사용할 컴포넌트들을 추가합니다. 파이프 라인 함수에서 파이프 라인을 빌드하기 위해서 kfp.dsl.pipeline 데코레이터를 사용합니다. 데코레이터를 사용하려면 @kfp.dsl.pipeline 어노테이션을 파이프 라인 함수에 추가 하면 됩니다.

@dsl.pipeline(
    name='My pipeline',
    description='My machine learning pipeline'
)
def my_pipeline():
    train_task = train_component_op()

다. 파이프 라인을 컴파일하여 압축 된 YAML 파일을 생성하겠습니다. YAML 파일에는 파이프 라인 실행을 위한 쿠버네티스 리소스들이 정의되어 있습니다. kfp.compiler.Compiler.compile 메소드를 사용하는 컴파일 하겠습니다.

kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')

라. 파이프 라인을 업로드하고, 실행합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 업로드하고 실행하겠습니다.

client = kfp.Client()
my_experiment = client.create_experiment(name='Basic Experiment')
my_run = client.run_pipeline(my_experiment.id, 'my_pipeline', 
  'my_pipeline.zip')

다음은 파이프라인 전체 코드 입니다.

import kfp
from kfp import dsl


@kfp.dsl.component
def train_component_op():
    return kfp.dsl.ContainerOp(
        name='mnist-train',
        image='kangwoo/kfp-mnist:kfp'
    )


@dsl.pipeline(
    name='My pipeline',
    description='My machine learning pipeline'
)
def my_pipeline():
    train_task = train_component_op()


if __name__ == '__main__':
    # Compile
    pipeline_package_path = 'my_pipeline.zip'
    kfp.compiler.Compiler().compile(my_pipeline, pipeline_package_path)

    # Run
    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'my_pipeline', pipeline_package_path)

파이프라인 코드를 실행합니다.

python my_pipeline.py

파이프라인 코드가 실행되면, 파이프라인 패키지가 컴파일 됩니다. 그리고 컴파일된 패키지를 Kubeflow Pipelines 에 전달하여 실행 시킵니다. 실행 결과는 Kubeflow Pipelines UI를 통해서 확인할 수 있습니다.

파이프라인 실행 결과 확인 하기

Kubeflow 파이프라인 UI에서 실행 결과를 확인하려면, 다음 절차대로 진행하시면 됩니다.

  1. Kubeflow 대시보드 화면의 왼쪽 메뉴에서 Pipelines를 클릭합니다.
  2. Kubeflow Pipelines UI 화면의 왼쪽 메뉴에서 Experiments 클릭하여, 현재 파이프 라인 실험 목록을 조회합니다.
  3. 보려는 실험(Experiment)의 ‘실험 이름’을 클릭합니다.
  4. 보려는 실행(Run)의 “실행 이름”을 클릭합니다.
  5. Graph 탭에서 보려는 파이프 라인 컴포넌트를 나타내는 단계를 클릭합니다. 단계 세부 사항이 Artifacts  탭을 표시하며 슬라이드 됩니다.

Kubeflow – Kubeflow Pipelines 이해하기

Kubeflow Pipelines 살펴보기

실 세계에서 머신 러닝 워크 플로우를 만들고 배포하는 것은 무척이나 어렵고 힘든 일입니다.

Kubeflow Pipelines는 머신 러닝 워크 플로우를 만들고 배포하기 위한 컨테이너 기반의 플랫폼으로서, 사용자가 편리하게 사용할 수 있고, 쉽게 확장이 가능합니다.

Kubeflow Pipelines는 머신 러닝 워크 플로우를 생성하기 위해서 파이프 라인을 정의합니다. 파이프 라인에는 사용하는 컴포넌트들과 작업 처리 규칙이 정의되어 있습니다. Kubeflow Pipelines는 파이프라인 뿐만 아니라 컴포넌트도 쉽게 재사용할 수 있도록 만들어져 있습니다. 그래서 만들어진 컴포넌트나 파이프라인 있다면, 레고를 조립하는 것처럼 쉽게 워크 플로우를 구성할 수 있습니다.

Kubeflow Pipelines는 Kubeflow의 핵심 구성 요소로 포함되어 있습니다. 그래서 별도의 설치 없이 사용할 수 있습니다. 물론 Kubeflow 없이 독립적으로 설치해서 사용할 수도 있습니다.

아쉽게도 Kubeflow Pipelines 는 아직 멀티 테넌시를 지원하지 않습니다.

Kubeflow Pipelines 목표

Kubeflow Pipelines 가 추구하는 목표는 다음과 같습니다.

  • 엔드 투 엔드 오케스트레이션 : 머신 러닝 파이프 라인의 오케스트레이션을 지원하고 단순화 시킵니다
  • 손쉬운 실험 : 수많은 아이디어와 기술을 쉽게 시도 할 수 있고, 다양한 시험/실험을 관리 할 수 ​​있도록 합니다.
  • 손쉬운 재사용 : 구성 요소 및 파이프 라인을 재사용하여, 매번 재 구축 할 필요 없이 엔드 투 엔드 솔루션을 신속하게 생성 할 수 있도록 합니다.

Kubeflow Pipelines 개념

Kubeflow Pipelines 에서 사용하는 개념에 대해서 알아보겠습니다.

Pipeline

Pipeline은 머신 러닝 워크 플로우에서 사용하는 컴포넌트들과, 해당 컴토넌트들 간의 작업 처리 규칙을 그래프 형태로 정의한 것입니다. Pipeline에는 파이프 라인을 실행하는 데 필요한 입력 매개 변수와 각 컴포넌트의 입력 및 출력에 대한 정의가 포함되어 있습니다.

Pipeline을 실행하면 시스템이 워크 플로우의 단계에 해당하는 하나 이상의 쿠버네티스 포드를 시작합니다. 포드는 컨테이너를 시작하고 컨테이너는 정의된 프로그램을 실행합니다.

파이프 라인을 개발 한 후 Kubeflow Pipelines UI에서 파이프 라인을 업로드하고 실행 할 수 있습니다.

Component

Pipeline의 컴포넌트는 하나의 단계를 수행하는 코드가 모여있는 컨테이너 이미지입니다. 이 컴포넌트들은 파이프 라인의 실행 단계에서 각자의 담당 역할을 수행하게 됩니다. 예를 들어 데이터 전처리, 데이터 변환, 모델 학습 등이 있습니다. 컴포넌트는 입력 및 출력에 대한 정의를 포함하고 있습니다.

Graph

그래프는 Kubeflow Pipelines UI에서 파이프 라인의 런타임 실행을 나타내는 그림입니다. 그래프는 파이프 라인의 실행된 단계나 실행중인 단계를 나타냅니다. 화살표는 각 단계로 표시되는 파이프 라인 컴포넌트 간의 상/하위 관계를 나타냅니다.

파이프 라인이 실행되면 그래프를 볼 수 있습니다. 그래프 안의 각 노드는 파이프 라인의 단계를 나타냅니다.

각 노드의 오른쪽 상단에는 상태, 실행 중, 성공, 실패 또는 건너뜀 상태를 나타내는 아이콘이 있습니다. 조건절이 있을 경우에는 노드를 건너 뛸 수 있습니다.

Experiment

Experiment는 파이프 라인을 실행 할 수 있는 작업 공간입니다. experiment 사용하여 파이프 라인의 실행(run)을 논리적 그룹으로 묶을 수 있습니다. Experiments에는 임의의 실행(run)뿐만 아니라 반복 실행(recurring run)도 포함될 수 있습니다.

Run and Recurring Run

Run은 파이프 라인을 한번 실행 하는 것을 의미합니다. Run은 사용자가 시도하는 실행에 대한 정보를 저장하고 있기 때문에, 재현이 가능합니다. Kubeflow Pipelines UI의 세부 정보 페이지를 보면, 실행 진행률을 볼 수 있습니다. 여기에서 실행의 각 단계에 대한 런타임 그래프, 출력 결과 및 로그를 확인 할 수 있습니다.

Recurring run 은 파이프 라인의 반복 실행을 의미합니다. 반복 실행을 위한 설정에는 파이프 라인에서 사용하는 파라미터와 실행 트리거를 위한 파라미터가 포함되어 있습니다.

모든 Experiment 내에서 반복 실행을 시작할 수 있습니다. 반복 실행이 설정되어 있으면, 주기적으로 파이프 라인을 실행하게 됩니다. Kubeflow Pipelines UI에서 반복 실행을 활성화/비활성화 할 수 있습니다. 동시에 실행되는 최대 실행 개수를 제한하기 위해서, 최대 동시 실행 개수를 지정할 수도 있습니다. 최대 동시 실행 개수는, 파이프 라인의 실행 시간이 오래 걸리면서, 자주 실행되게 트리거 되는 경우에 도움이 될 수 있습니다.

Run Trigger

실행 트리거는 주어진 실행 조건에 의해서 새로운 실행을 시작하기 위하여 시스템에 알리는 역할을 합니다. 다음과 같은 유형의 실행 트리거를 사용할 수 있습니다.

  • Periodic: 주기적인 간격 기반의 실행을 위한 트리거입니다. (예 : 3 시간마다 또는 45 분마다).
  • Cron : 실행 예약을 위한 cron 표현식을 사용하는 트리거입니다.

Step

Step은 파이프 라인의 컴포넌트 중 하나를 실행한다는 것을 의미합니다. 복잡한 파이프 라인에서 컴포넌트는 반복적으로 여러 번 실행될 수 있습니다. 그리고 if 같은 조건문을 사용하여, 조건부로 실행될 수 있습니다.

Output Artifact

Output Artifact 는 파이프 라인 컴포넌트에서 생성한 출력물입니다. 이 출력물은 Kubeflow Pipelines UI를 사용하여 이해하기 쉽게 시각화로 렌더링 할 수 있습니다.

파이프 라인 컴포넌트에 아티팩트를 포함시켜서 성능 평가 같은 일을 할 수 있으며, 의사 결정을 위한 자료로 사용할 수도 있습니다. 그리고 파이프 라인의 다양한 컴포넌트들이 어떻게 작동하는 이해하는데 많은 도움을 줄 수도 있습니다. 아티팩트는 일반적인 텍스트 뿐만 아니라, 시각화를 위한 데이터까지 다양하게 존재합니다.

Kubeflow Pipelines 구성 요소

Kubeflow Pipelines은 다음과 같은 구성 요소로 이루어져 있습니다.

  • 실험, 작업 및 실행을 관리하고 추적하기 위한 사용자 인터페이스 (UI)
  • 파이프라인을 관리하는 파이프라인 서비스
  • 파이프라인과 컴포넌트를 정의하고 제어하기 위한 SDK.
  • 머신 러닝 워크 플로우 실행을 위한 컨트롤러.

Kubeflow Pipelines UI(User interface)

Kubeflow Pipelines UI 는 현재 실행 중인 파이프 라인 목록, 파이프 라인 실행 기록, 데이터 아티팩트 목록, 개별 파이프 라인 실행에 대한 디버깅 정보, 개별 파이프 라인 실행에 대한 실행 상태를 표시합니다.

https://www.kubeflow.org/docs/images/pipelines-ui.png

Kubeflow Pipelines UI 에서 다음과 같은 작업을 수행 할 수 있습니다.

  • 압축 파일로 만들어진 파이프 라인을 업로드 할 수 있습니다. 업로드 된 파이프 라인은 다른 사람들과 공유 할 수 있습니다.
  • 파이프 라인의 실행을 그룹화하는 “Experiment“을 생성할 수 있습니다.
  • Experiment 내에서 파이프라인을 실행할 수 있습니다.
  • 파이프 라인 실행의 구성, 그래프 및 출력을 확인할 수 있습니다
  • 반복 실행을 작성하여 실행을 예약할 수 있습니다.

Python SDK

Kubeflow Pipelines SDK는 머신 러닝 워크 플로우를 정의하고, 실행시킬 수 있는 파이썬 패키지 세트입니다.

다음은 SDK의 주요 패키지 입니다.

  • kfp.compiler : 파이프 라인을 컴파일 할 수 있는 기능을 제공하고 있습니다.
  • kfp.component : 파이프 라인 컴포넌트와 상호 작용하기 위한 기능을 제공하고 있습니다.
  • kfp.containers : 컴포넌트 컨테이너 이미지를 빌드하는 기능을 제공하고 있습니다.
  • kfp.Client : Kubeflow Pipelines API 용 Python 클라이언트 라이브러리가 포함되어 있습니다.

Pipeline Service

파이프라인 서비스는 파이프 라인을 생성하고 실행하는 등의 관리 역할을 하고 있습니다. 그리고 Experiment, Run 같은 파이프라인 메타데이터를 메타데이터 저장소에 저장하는 역할도 하고 있습니다.

또한 REST API도 제공하고 있습니다. REST API는 셸 스크립트 또는 다른 시스템에 통합하려는 경우 유용하게 사용할 수 있습니다.

Pipelines 데이터 저장소

Kubeflow Pipelines에는 머신 러닝 파이프 라인에 관련된 데이터 관리하기 위해서 다음과 같은 두 개의 저장소를 가지고 있습니다.

  • Metadata : Experiment, Run 등 Kubeflow Pipelines는 파이프 라인 메타 데이터를 MySQL 데이터베이스에 저장합니다.
  • Artifacts : 파이프라인 패키지, 메트릭, 뷰 등 아티팩트를 Minio 서버에 저장합니다.

Kubeflow Pipelines는 쿠버네티스의 퍼시스턴스 볼륨(PV)을 사용하여 MySQL 데이터베이스와 Minio 서버의 데이터를 저장합니다.

Orchestration Controllers

오케스트레이션 컨트롤러는 머신 러닝 워크 플로우 실행 다시 말해서, 파이프 라인을 완료하는데 필요한 컨테이너들을 실행시키는 역할을 하고 있습니다. 컨테이너들은 쿠버네티스의 포드 형태로 실행됩니다. 현재 Kubeflow Pipelines 에서는 Argo Workflow 를 워크플로우 컨트롤러로 사용하고 있습니다.