Kubeflow Pipelines – DSL 이해하기 #1

순차적 처리를 하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

순차적으로 작업을 하는 파이프 라인을 만들어 보겠습니다. 동전을 던지는 단계와 그 결과를 출력하는 단계로 구성 되어 있습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 파이썬 코드로 작성되었습니다. 랜덤 함수를 사용해서 0이면 앞면(heads), 1이면 뒷면(tails) 이라고 출력합니다. 그리고 출력 결과를 /tmp/output 파일에 저장합니다. 결과가 저장된 /tmp/output 파일을 다음 단계에서 사용하기 위해 file_outputs 으로 정의합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 단순히 입력 받은 메시지를 echo 명령어를 사용하여 화면에 출력합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. flip_coin_op()가 먼저 실행되고, 그 다음 print_op()가 실행됩니다. 그리고 flip_coin_op()의 출력 결과를 print_op()의 입력 파라미터로 사용하고 있습니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Sequential pipeline',
    description='A pipeline with two sequential steps.'
)
def sequential_pipeline():
    """A pipeline with two sequential steps."""

    flip = flip_coin_op()
    print_op(flip.output)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Sequential pipeline', __file__ + '.zip')

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

sequential.py

import kfp
from kfp import dsl


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Sequential pipeline',
    description='A pipeline with two sequential steps.'
)
def sequential_pipeline():
    """A pipeline with two sequential steps."""

    flip = flip_coin_op()
    print_op(flip.output)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Sequential pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Sequential pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

Graph 탭에서 flip-coin 단계를 선택하고, Input/Output 탭을 클릭하면 출력 결과를 볼 수 있습니다.


조건에 의해서 분기를 하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

조건에 의해서 분기를 파이프 라인을 만들어 보겠습니다. 동전을 던지는 단계와 그 결과에 따러 “승리” 또는 “패배” 를 출력하는 단계로 구성 되어 있습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. flip_coin_op()가 먼저 실행되고, 그 결과에 따라서 print_op()를 사용해서 “승리” 또는 “패배”를 출력하게 됩니다. 동전이 앞면인지 뒷면이지 판단하기 위해서 dsl.Condition() 을 사용하였습니다. 앞면이면 ‘YOUT WIN’을 출력하고, 뒷면이면 ‘YOU LOSE’를 출력합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Conditional execution pipeline',
    description='Shows how to use dsl.Condition().'
)
def condition_pipeline():
    flip = flip_coin_op()
    with dsl.Condition(flip.output == 'heads'):
        print_op('YOUT WIN')

    with dsl.Condition(flip.output == 'tails'):
        print_op('YOU LOSE')


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Condition pipeline', __file__ + '.zip')

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

condition.py

import kfp
from kfp import dsl


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Conditional execution pipeline',
    description='Shows how to use dsl.Condition().'
)
def condition_pipeline():
    flip = flip_coin_op()
    with dsl.Condition(flip.output == 'heads'):
        print_op('YOUT WIN')

    with dsl.Condition(flip.output == 'tails'):
        print_op('YOU LOSE')


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Condition pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Condition pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


병렬 루프를 사용하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

정적 항목 세트에 대한 병렬 루프를 사용하는 파이프 라인을 만들어 보겠습니다. 동전을 여러번 던지고, 그 결과를 json 형식으로 저장 단계와 그 결과들을 병렬 루프를 통해서 출력하는 단계로 구성 되어 있습니다.

flip_coins_op() 는 동전을 여러번 던지는 단계입니다. 동전을 던진 결과 값을 json 형식으로 저정합니다.

def flip_coins_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import json; import sys; import random; '
                   'json.dump([(\\'heads\\' if random.randint(0,1) == 1 else \\'tails\\') for i in range(10)], '
                   'open(\\'/tmp/output.json\\', \\'w\\'))"'],
        file_outputs={'output': '/tmp/output.json'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. flip_coins_op()가 먼저 실행되고, 그 결과가 json 형식으로 반환됩니다. json에 포함된 아이템의 개수 만큼 print_op() 를 실행합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Loop pipeline',
    description='A pipeline with parallel loop'
)
def loop_pipeline():
    flips = flip_coins_op()
    with dsl.ParallelFor(flips.output) as item:
        print_op(item)

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

loop.py

import kfp
from kfp import dsl


def flip_coins_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import json; import sys; import random; '
                   'json.dump([(\\'heads\\' if random.randint(0,1) == 1 else \\'tails\\') for i in range(10)], '
                   'open(\\'/tmp/output.json\\', \\'w\\'))"'],
        file_outputs={'output': '/tmp/output.json'}
    )



def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Loop pipeline',
    description='A pipeline with parallel loop'
)
def loop_pipeline():
    flips = flip_coins_op()
    with dsl.ParallelFor(flips.output) as item:
        print_op(item)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(loop_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Loop pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Loop pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


Exit Handler를 사용하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

Exit Handler 사용하는 파이프 라인을 만들어 보겠습니다. Exit Handler를 사용하면 작업 그룹이 종료 될 때 지정한 작업을 실행시킬 수 있습니다. 작업 그룹의 성공 여부와는 상관 없이 무조건 지정한 작업이 실행됩니다.

동전을 여러번 던지고, 그 결과를 json 형식으로 저장 단계와 그 결과들을 병렬 루프를 통해서 출력하는 단계로 구성 되어 있습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. dsl.ExitHandler()를 사용해서, 작업 그룹이 종료될때 실행할 작업을 지정할 수 있습니다. 작업 그룹은 flip_coin_op() 과 print_op(flip.output) 으로 구성되어 있습니다. 작업 그룹에 속한 작업들이 종료되면 dsl.ExitHandler(exit_op)에 지정한 exit_op 작업이 실행됩니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Exit handler',
    description=' The exit handler will run after the pipeline finishes (successfully or not)'
)
def sequential_pipeline():
    exit_op = print_op('Exit')

    with dsl.ExitHandler(exit_op):
        flip = flip_coin_op()
        print_op(flip.output)

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

exit_handler.py

import kfp
from kfp import dsl


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Exit handler',
    description=' The exit handler will run after the pipeline finishes (successfully or not)'
)
def sequential_pipeline():
    exit_op = print_op('Exit')

    with dsl.ExitHandler(exit_op):
        flip = flip_coin_op()
        print_op(flip.output)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(sequential_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Exit Handler', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Exit Handler” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


파이프 라인 파라미터 사용하기

파이프 라인을 구성하고 실행하기

파라미터를 입력 받아서, 파이프 라인을 실행할 수 있는 파이프 라인을 만들어 보겠습니다.

파이프 라인을 실행할 때 앞면(heads) 또는 뒷면(tails)을 파라미터로 입력받아서, 동전을 던진 결과와 같으면 승링를 다르면 패배를 출력하게 하였습니다.

flip_coin_op() 는 동전을 던지는 단계입니다. 이전 예제 코드와 동일합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

print_op(msg) 라는 입력받은 메시지를 화면으로 출력하는 단계입니다. 이전 예제 코드와 동일합니다.

def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

DSL을 사용하여 파이프 라인을 구성합니다. 파이프 라인 함수에 predict 파라미터를 입력 받을 수 있도록 추가하였습니다. 파이프 라인 함수가 호출되면 각 파라미터는 PipelineParam 객체가 됩니다.

@dsl.pipeline(
    name='Pipeline parameters',
    description='Pipeline parameters'
)
def condition_pipeline(
        predict : str = 'heads'):
    flip = flip_coin_op()
    with dsl.Condition(flip.output == predict):
        print_op('YOU WIN')

    with dsl.Condition(flip.output != predict):
        print_op('YOU LOSE')

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다. 파이프 라인 실행 시 파라미터를 넘기기 위해서, params 파라미터를 사용하였습니다.

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Pipeline parameters', __file__ + '.zip', params={'predict' : 'tails'})

파라미터는 Kubeflow Pipelines UI를 통해서 실행할 때도 입력할 수 있습니다.

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

parameters.py

import kfp
from kfp import dsl
from kfp.dsl import PipelineParam


def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    """Print a message."""
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


@dsl.pipeline(
    name='Pipeline parameters',
    description='Pipeline parameters'
)
def condition_pipeline(
        predict : str = 'heads'):
    flip = flip_coin_op()
    with dsl.Condition(flip.output == predict):
        print_op('YOU WIN')

    with dsl.Condition(flip.output != predict):
        print_op('YOU LOSE')


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(condition_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Pipeline parameters', __file__ + '.zip', params={'predict' : 'tails'})

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Pipeline parameters” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


댓글 남기기

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다