Kubeflow Pipelines – DSL 이해하기 #2

Retry를 사용하는 파이프 라인 만들기

파이프 라인을 구성하고 실행하기

Retry를 사용하는 파이프 라인을 만들어 보겠습니다. Retry를 사용하면 작업이 실패로 끝났을 때, 재시도 할 수 있도록 할 수 있습니다. 재시도 횟수는 사용자가 설정할 수 있습니다.

입력받은 시스템 종료 코드들 중에서 하나를 랜덤으로 선택해서 반환합니다.

def flip_coin_op():
    return dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                  'else \\'tails\\'; print(result)" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

DSL을 사용하여 파이프 라인을 구성합니다. set_retry() 사용해서, 해당 작업이 실패했을 경우 재시작 하도록 설정하였습니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Retry random failures',
    description='A pipeline with retry.'
)
def retry_pipeline():
    random_exit_op('0,1,2,3').set_retry(10)

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

retry.py

import kfp
from kfp import dsl


def random_exit_op(exit_codes):
    return dsl.ContainerOp(
        name='random_failure',
        image='python:alpine3.6',
        command=['python', '-c'],
        arguments=['import random; import sys; exit_code = int(random.choice(sys.argv[1].split(","))); print(exit_code); sys.exit(exit_code)', exit_codes]
    )


@dsl.pipeline(
    name='Retry random failures',
    description='A pipeline with retry.'
)
def retry_pipeline():
    random_exit_op('0,1,2,3').set_retry(10)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(retry_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Retry pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Retry pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


쿠버네티스 리소스를 사용하는 파이프 라인 만들기

쿠버네티스 리소스를 사용하는 파이프 라인을 만들어 보겠습니다. 사용자는 쿠버네티스의 모든 리소스를 사용할 수 있습니다.

kfp.dsl.ResourceOp

이 클래스는 쿠버네티스 리소스를 사용하는 단계를 나타냅니다. Argo의 리소스 템플릿을 구현하고 있습니다.

classkfp.dsl.ResourceOp(k8s_resource=None, action: str = 'create', merge_strategy: str = None, success_condition: str = None, failure_condition: str = None, attribute_outputs: Dict[str, str] = None, **kwargs)

사용자는 이 기능을 이용하여 쿠버네티스 리소스의 일부 작업(get, create, apply, delete, replace, place)을 수행할 수 있습니다. 그리고 해당 작업을 수행하는 단계의 성공 또는 실패 했는지를 조건을 통해서 설정할 수 있습니다.

전달 인자

다음은 VolumeOp에서 사용하는 주요 전달 인자입니다.

  • k8s_resource: 쿠버네티시 리소스를 정의한 것입니다. (필수)
  • action: 수행할 작업의 종류입니다. 기본값은 create 입니다.
  • merge_strategy: 수행할 작업의 종류가 patch 일 때 사용할 병합 전략입니다. (선택)
  • success_condition: 단계의 성공을 판별하는 조건입니다.(선택)
  • failure_condition: 단계의 실패를 판별하는 조건입니다. (선택)
  • attribute_outputs: [kfp.dsl.ContainerOp](<https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ContainerOp>)file_outputs 과 비슷합니다 . 출력 파리미터 이름을 쿠버네티스 객체의 JSON 경로에 매핑합니다.

출력

ResourceOps는 출력 파라미터를 생성 할 수 있습니다. 사용하는 쿠버네티스 리소스의 필드 값을 출력 할 수 있습니다. 예를 들면 다음과 같습니다.

job = kubernetes_client.V1Job(...)

rop = kfp.dsl.ResourceOp(
    name="create-job",
    k8s_resource=job,
    action="create",
    attribute_outputs={"name": "{.metadata.name}"}
)

기본적으로 ResourceOps는 리소스 이름과 리소스 사양을 출력합니다.

파이프 라인을 구성하고 실행하기

생성할 리소스의 매니페스트를 정의 하였습니다.

_job_manifest = """
{
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
        "generateName": "kfp"
    },
    "spec": {
        "template": {
            "metadata": {
                "name": "resource-pipeline"
            },
            "spec": {
                "containers": [{
                    "name": "mnist",
                    "image": "kangwoo/mnist-simple:job",
                    "command": ["python", "/app/mnist-simple.py"]
                }],
                "restartPolicy": "Never"
            }
        }   
    }
}
"""

DSL을 사용하여 파이프 라인을 구성합니다. dsl.ResourceOp() 사용해서, 쿠버네티스 리소소스를 생성하였습니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Kubernetes Resource',
    description='A pipeline with resource.'
)
def resource_pipeline():
    op = dsl.ResourceOp(
        name='resource-job',
        k8s_resource=json.loads(_job_manifest),
        action='create'
    )

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

resource.py

import kfp
from kfp import dsl
import json


_job_manifest = """
{
    "apiVersion": "batch/v1",
    "kind": "Job",
    "metadata": {
        "generateName": "kfp"
    },
    "spec": {
        "template": {
            "metadata": {
                "name": "resource-pipeline"
            },
            "spec": {
                "containers": [{
                    "name": "mnist",
                    "image": "kangwoo/mnist-simple:job",
                    "command": ["python", "/app/mnist-simple.py"]
                }],
                "restartPolicy": "Never"
            }
        }   
    }
}
"""

@dsl.pipeline(
    name='Kubernetes Resource',
    description='A pipeline with resource.'
)
def resource_pipeline():
    op = dsl.ResourceOp(
        name='resource-job',
        k8s_resource=json.loads(_job_manifest),
        action='create'
    )


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(resource_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Resource pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Resource pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


퍼시스턴스 볼륨를 사용하는 파이프 라인 만들기

쿠버네티스 퍼시스턴스 볼륨을 사용하는 파이프 라인을 만들어 보겠습니다. dsl.VolumeOp() 이용하여 퍼시스턴스 볼륨을 쉽게 생성할 수 있습니다.

dsl.VolumeOp

VolumeOp 클래스는 ResourceOp 에서 확장되었습니다. 이 클래스를 사용하면 퍼시스턴스 볼륨을 쉽게 생성할 수 있습니다.

전달 인자

다음은 VolumeOp에서 사용하는 주요 전달 인자입니다.

  • resource_name: 생성할 리소스에 사용할 이름입니다. 이 문자열 앞에 워크플로우 이름이 붙습니다. (필수)
  • size: 요청할 볼륨의 크기입니다. (필수)
  • storage_class: 사용할 스토리지 클래스입니다. (선택)
  • modes: 퍼시스턴스 볼륨의 접근 모드( accessModes) 입니다.기본 값을  VOLUME_MODE_RWM 입니다.
    • VOLUME_MODE_RWO["ReadWriteOnce"]
    • VOLUME_MODE_RWM["ReadWriteMany"]
    • VOLUME_MODE_ROM["ReadOnlyMany"]

출력

쿠버네티스 리소스의 이름과 사양 이외에도 바인딩된 퍼시스턴스 볼륨의 스토리지 크기를 step.outputs [“size”] 로 출력합니다. 하지만 스토리지 제공자가 WaitForFirstConsumer 바인딩 모드를 지원하는 경우, 비어 있을 수 있습니다. 이 값은 비어 있지 않으면, 항상 요청된 크기보다 크거나 같습니다.

파이프 라인을 구성하고 실행하기

dsl.VolumeOp() 이용하여 퍼시스턴스 볼륨을 생성할 수 있습니다.

vop = dsl.VolumeOp(
        name="pipeline-volume",
        resource_name="pipeline-pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="100Mi"
    )

DSL을 사용하여 파이프 라인을 구성합니다. dsl.dsl.ContainerOp() 의 pvolumes 파라미터를 이용하여 볼륨을 마운트 할 수 있습니다.

step1 에서는 dsl.VolumeOp()으로 생성한 볼륨을 마운트하였고, step2에서는 step1에 마운트된 볼륨을 그대로 다시 마운트 하였습니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='Volume pipeline',
    description='A pipeline with volume.'
)
def volume_pipeline():
    vop = dsl.VolumeOp(
        name="pipeline-volume",
        resource_name="pipeline-pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="100Mi"
    )

    step1 = dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                   'else \\'tails\\'; print(result)" | tee /data/output'],
        pvolumes={"/data": vop.volume}
    )

    step2 = dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['cat', '/data/output'],
        pvolumes={"/data": step1.pvolume}
    )

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

volume.py

import kfp
from kfp import dsl


@dsl.pipeline(
    name='Volume pipeline',
    description='A pipeline with volume.'
)
def volume_pipeline():
    vop = dsl.VolumeOp(
        name="pipeline-volume",
        resource_name="pipeline-pvc",
        modes=dsl.VOLUME_MODE_RWO,
        size="100Mi"
    )

    step1 = dsl.ContainerOp(
        name='Flip coin',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 '
                   'else \\'tails\\'; print(result)" | tee /data/output'],
        pvolumes={"/data": vop.volume}
    )

    step2 = dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['cat', '/data/output'],
        pvolumes={"/data": step1.pvolume}
    )


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(volume_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Volume pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Volume pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


파이프라인에 환경 변수 사용하기

Kubeflow piepelins 에서 환경 변수를 설정하고 사용하는 방법에 대해서 알아 보겠습니다. 컴포넌트에서 환경 변수를 설정하려면, dsl.ContainerOp()의 add_env_variable() 메소드를 사용하면 됩니다. kubernetes.client.models 패키지에 있는 V1EnvVar 인스턴스를 생성한 후, add_env_variable() 메소드를 이용하여 환경 변수를 컴포넌트에 추가할 수 있습니다.

파이프 라인을 구성하고 실행하기

생성할 리소스의 매니페스트를 정의 하였습니다.

import kfp.dsl as dsl
from kubernetes.client.models import V1EnvVar

@dsl.pipeline(
  name='Env example',
  description='A pipline showing how to use environment variables'
)
def environment_pipeline():
  env_var = V1EnvVar(name='example_env', value='env_variable')
 
  container_op = logg_env_function_op().add_env_variable(env_var)

더 많은 환경 변수를 컴포넌트에 전달하려면 add_env_variable () 더 추가하면 됩니다.

컴포넌트에 추가한 환경 변수를 출력하기 위하여 echo를 사용하였습니다.

def print_env_op():
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['sh', '-c', 'echo $example_env'],
    )

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

enviroment_variables.py

import kfp
from kfp import dsl

import kfp.dsl as dsl
from kubernetes.client.models import V1EnvVar


def print_env_op():
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['sh', '-c', 'echo $example_env'],
    )


@dsl.pipeline(
  name='Env example',
  description='A pipline showing how to use environment variables'
)
def environment_pipeline():
    env_var = V1EnvVar(name='example_env', value='env_variable')

    print_env_op().add_env_variable(env_var)


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(environment_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Sample Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Environment pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Environment pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.


ContainerOp에 사이드카 추가하기

파이프 라인을 구성하고 실행하기

dsl.Sidecar()를 이용하여 사이드카를 생성합니다.

	echo = dsl.Sidecar(
        name="echo",
        image="hashicorp/http-echo:latest",
        args=['-text="hello world"'],
    )

dsl.ContainerOp()의 sidcars 파라미터를 이용하여 생성한 사이드카를 추가합니다.

	op1 = dsl.ContainerOp(
        name="download",
        image="busybox:latest",
        command=["sh", "-c"],
        arguments=[
            "sleep %s; wget localhost:5678 -O /tmp/results.txt" % sleep_sec
        ],  # sleep for X sec and call the sidecar and save results to output
        sidecars=[echo],
        file_outputs={"downloaded": "/tmp/results.txt"},
    )

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

sidecar.py

import kfp
import kfp.dsl as dsl

@dsl.pipeline(
    name="pipeline_with_sidecar", 
    description="A pipeline that demonstrates how to add a sidecar to an operation."
)
def pipeline_with_sidecar(sleep_sec: int = 30):

    # sidecar with sevice that reply "hello world" to any GET request
    echo = dsl.Sidecar(
        name="echo",
        image="hashicorp/http-echo:latest",
        args=['-text="hello world"'],
    )

    # container op with sidecar
    op1 = dsl.ContainerOp(
        name="download",
        image="busybox:latest",
        command=["sh", "-c"],
        arguments=[
            "sleep %s; wget localhost:5678 -O /tmp/results.txt" % sleep_sec
        ],  # sleep for X sec and call the sidecar and save results to output
        sidecars=[echo],
        file_outputs={"downloaded": "/tmp/results.txt"},
    )

    op2 = dsl.ContainerOp(
        name="echo",
        image="library/bash",
        command=["sh", "-c"],
        arguments=["echo %s" % op1.output],  # print out content of op1 output
    )

if __name__ == '__main__':
    kfp.compiler.Compiler().compile(pipeline_with_sidecar, __file__ + '.yaml')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Sidecar pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.



파이프 라인의 기본 Artifact 저장소를 변경하기

KFP 는 파이프 라인에서 사용하는 아티팩트들은 내부에 설치된 minio에 저장하고 있습니다. 아티팩트의 기본 저장소를 바꾸려면 argo workflow의 workflow-controller configmap을 수정하면 됩니다.

다음 명령어를 실행하면 아티팩트 저장소의 설정 정보를 수정할 수 있습니다.

kubectl -n kubeflow edit configmap workflow-controller-configmap

다음은 아티팩트 저장소의 설정 정보입니다.

...
data:
  config: |
    {
    artifactRepository:
    {
        s3: {
            bucket: mlpipeline,
            keyPrefix: artifacts,
            endpoint: minio-service.kubeflow:9000,
            insecure: true,
            accessKeySecret: {
                name: mlpipeline-minio-artifact,
                key: accesskey
            },
            secretKeySecret: {
                name: mlpipeline-minio-artifact,
                key: secretkey
            }
        }
    }
    }
...

자세한 사항은 https://github.com/argoproj/argo/blob/master/docs/configure-artifact-repository.md 를 참고 하실 수 있습니다.

댓글 남기기

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다