Retry를 사용하는 파이프 라인 만들기
파이프 라인을 구성하고 실행하기
Retry를 사용하는 파이프 라인을 만들어 보겠습니다. Retry를 사용하면 작업이 실패로 끝났을 때, 재시도 할 수 있도록 할 수 있습니다. 재시도 횟수는 사용자가 설정할 수 있습니다.
입력받은 시스템 종료 코드들 중에서 하나를 랜덤으로 선택해서 반환합니다.
def flip_coin_op(): return dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /tmp/output'], file_outputs={'output': '/tmp/output'} )
DSL을 사용하여 파이프 라인을 구성합니다. set_retry() 사용해서, 해당 작업이 실패했을 경우 재시작 하도록 설정하였습니다.
KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.
@dsl.pipeline( name='Retry random failures', description='A pipeline with retry.' ) def retry_pipeline(): random_exit_op('0,1,2,3').set_retry(10)
다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.
import kfp from kfp import dsl def random_exit_op(exit_codes): return dsl.ContainerOp( name='random_failure', image='python:alpine3.6', command=['python', '-c'], arguments=['import random; import sys; exit_code = int(random.choice(sys.argv[1].split(","))); print(exit_code); sys.exit(exit_code)', exit_codes] ) @dsl.pipeline( name='Retry random failures', description='A pipeline with retry.' ) def retry_pipeline(): random_exit_op('0,1,2,3').set_retry(10) if __name__ == '__main__': kfp.compiler.Compiler().compile(retry_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Retry pipeline', __file__ + '.zip')
파이프 라인 실행 결과 확인하기
KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Retry pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

쿠버네티스 리소스를 사용하는 파이프 라인 만들기
쿠버네티스 리소스를 사용하는 파이프 라인을 만들어 보겠습니다. 사용자는 쿠버네티스의 모든 리소스를 사용할 수 있습니다.
kfp.dsl.ResourceOp
이 클래스는 쿠버네티스 리소스를 사용하는 단계를 나타냅니다. Argo의 리소스 템플릿을 구현하고 있습니다.
classkfp.dsl.ResourceOp(k8s_resource=None, action: str = 'create', merge_strategy: str = None, success_condition: str = None, failure_condition: str = None, attribute_outputs: Dict[str, str] = None, **kwargs)
사용자는 이 기능을 이용하여 쿠버네티스 리소스의 일부 작업(get, create, apply, delete, replace, place)을 수행할 수 있습니다. 그리고 해당 작업을 수행하는 단계의 성공 또는 실패 했는지를 조건을 통해서 설정할 수 있습니다.
전달 인자
다음은 VolumeOp에서 사용하는 주요 전달 인자입니다.
k8s_resource
: 쿠버네티시 리소스를 정의한 것입니다. (필수)action
: 수행할 작업의 종류입니다. 기본값은create
입니다.merge_strategy
: 수행할 작업의 종류가patch
일 때 사용할 병합 전략입니다. (선택)success_condition
: 단계의 성공을 판별하는 조건입니다.(선택)failure_condition
: 단계의 실패를 판별하는 조건입니다. (선택)attribute_outputs
:[kfp.dsl.ContainerOp](<https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ContainerOp>)
의file_outputs
과 비슷합니다 . 출력 파리미터 이름을 쿠버네티스 객체의 JSON 경로에 매핑합니다.
출력
ResourceOps는 출력 파라미터를 생성 할 수 있습니다. 사용하는 쿠버네티스 리소스의 필드 값을 출력 할 수 있습니다. 예를 들면 다음과 같습니다.
job = kubernetes_client.V1Job(...) rop = kfp.dsl.ResourceOp( name="create-job", k8s_resource=job, action="create", attribute_outputs={"name": "{.metadata.name}"} )
기본적으로 ResourceOps는 리소스 이름과 리소스 사양을 출력합니다.

파이프 라인을 구성하고 실행하기
생성할 리소스의 매니페스트를 정의 하였습니다.
_job_manifest = """ { "apiVersion": "batch/v1", "kind": "Job", "metadata": { "generateName": "kfp" }, "spec": { "template": { "metadata": { "name": "resource-pipeline" }, "spec": { "containers": [{ "name": "mnist", "image": "kangwoo/mnist-simple:job", "command": ["python", "/app/mnist-simple.py"] }], "restartPolicy": "Never" } } } } """
DSL을 사용하여 파이프 라인을 구성합니다. dsl.ResourceOp() 사용해서, 쿠버네티스 리소소스를 생성하였습니다.
KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.
@dsl.pipeline( name='Kubernetes Resource', description='A pipeline with resource.' ) def resource_pipeline(): op = dsl.ResourceOp( name='resource-job', k8s_resource=json.loads(_job_manifest), action='create' )
다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.
import kfp from kfp import dsl import json _job_manifest = """ { "apiVersion": "batch/v1", "kind": "Job", "metadata": { "generateName": "kfp" }, "spec": { "template": { "metadata": { "name": "resource-pipeline" }, "spec": { "containers": [{ "name": "mnist", "image": "kangwoo/mnist-simple:job", "command": ["python", "/app/mnist-simple.py"] }], "restartPolicy": "Never" } } } } """ @dsl.pipeline( name='Kubernetes Resource', description='A pipeline with resource.' ) def resource_pipeline(): op = dsl.ResourceOp( name='resource-job', k8s_resource=json.loads(_job_manifest), action='create' ) if __name__ == '__main__': kfp.compiler.Compiler().compile(resource_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Resource pipeline', __file__ + '.zip')
파이프 라인 실행 결과 확인하기
KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Resource pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

퍼시스턴스 볼륨를 사용하는 파이프 라인 만들기
쿠버네티스 퍼시스턴스 볼륨을 사용하는 파이프 라인을 만들어 보겠습니다. dsl.VolumeOp() 이용하여 퍼시스턴스 볼륨을 쉽게 생성할 수 있습니다.
dsl.VolumeOp
VolumeOp 클래스는 ResourceOp 에서 확장되었습니다. 이 클래스를 사용하면 퍼시스턴스 볼륨을 쉽게 생성할 수 있습니다.
전달 인자
다음은 VolumeOp에서 사용하는 주요 전달 인자입니다.
resource_name
: 생성할 리소스에 사용할 이름입니다. 이 문자열 앞에 워크플로우 이름이 붙습니다. (필수)size
: 요청할 볼륨의 크기입니다. (필수)storage_class
: 사용할 스토리지 클래스입니다. (선택)modes
: 퍼시스턴스 볼륨의 접근 모드(accessModes
) 입니다.기본 값을VOLUME_MODE_RWM
입니다.VOLUME_MODE_RWO
:["ReadWriteOnce"]
VOLUME_MODE_RWM
:["ReadWriteMany"]
VOLUME_MODE_ROM
:["ReadOnlyMany"]
출력
쿠버네티스 리소스의 이름과 사양 이외에도 바인딩된 퍼시스턴스 볼륨의 스토리지 크기를 step.outputs [“size”] 로 출력합니다. 하지만 스토리지 제공자가 WaitForFirstConsumer 바인딩 모드를 지원하는 경우, 비어 있을 수 있습니다. 이 값은 비어 있지 않으면, 항상 요청된 크기보다 크거나 같습니다.

파이프 라인을 구성하고 실행하기
dsl.VolumeOp() 이용하여 퍼시스턴스 볼륨을 생성할 수 있습니다.
vop = dsl.VolumeOp( name="pipeline-volume", resource_name="pipeline-pvc", modes=dsl.VOLUME_MODE_RWO, size="100Mi" )
DSL을 사용하여 파이프 라인을 구성합니다. dsl.dsl.ContainerOp() 의 pvolumes 파라미터를 이용하여 볼륨을 마운트 할 수 있습니다.
step1 에서는 dsl.VolumeOp()으로 생성한 볼륨을 마운트하였고, step2에서는 step1에 마운트된 볼륨을 그대로 다시 마운트 하였습니다.
KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.
@dsl.pipeline( name='Volume pipeline', description='A pipeline with volume.' ) def volume_pipeline(): vop = dsl.VolumeOp( name="pipeline-volume", resource_name="pipeline-pvc", modes=dsl.VOLUME_MODE_RWO, size="100Mi" ) step1 = dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /data/output'], pvolumes={"/data": vop.volume} ) step2 = dsl.ContainerOp( name='Print', image='alpine:3.6', command=['cat', '/data/output'], pvolumes={"/data": step1.pvolume} )
다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.
import kfp from kfp import dsl @dsl.pipeline( name='Volume pipeline', description='A pipeline with volume.' ) def volume_pipeline(): vop = dsl.VolumeOp( name="pipeline-volume", resource_name="pipeline-pvc", modes=dsl.VOLUME_MODE_RWO, size="100Mi" ) step1 = dsl.ContainerOp( name='Flip coin', image='python:alpine3.6', command=['sh', '-c'], arguments=['python -c "import random; result = \\'heads\\' if random.randint(0,1) == 0 ' 'else \\'tails\\'; print(result)" | tee /data/output'], pvolumes={"/data": vop.volume} ) step2 = dsl.ContainerOp( name='Print', image='alpine:3.6', command=['cat', '/data/output'], pvolumes={"/data": step1.pvolume} ) if __name__ == '__main__': kfp.compiler.Compiler().compile(volume_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Basic Experiment') my_run = client.run_pipeline(my_experiment.id, 'Volume pipeline', __file__ + '.zip')
파이프 라인 실행 결과 확인하기
KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Basic Experiment” 이므로, Basic Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Volume pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

파이프라인에 환경 변수 사용하기
Kubeflow piepelins 에서 환경 변수를 설정하고 사용하는 방법에 대해서 알아 보겠습니다. 컴포넌트에서 환경 변수를 설정하려면, dsl.ContainerOp()의 add_env_variable() 메소드를 사용하면 됩니다. kubernetes.client.models 패키지에 있는 V1EnvVar 인스턴스를 생성한 후, add_env_variable() 메소드를 이용하여 환경 변수를 컴포넌트에 추가할 수 있습니다.
파이프 라인을 구성하고 실행하기
생성할 리소스의 매니페스트를 정의 하였습니다.
import kfp.dsl as dsl from kubernetes.client.models import V1EnvVar @dsl.pipeline( name='Env example', description='A pipline showing how to use environment variables' ) def environment_pipeline(): env_var = V1EnvVar(name='example_env', value='env_variable') container_op = logg_env_function_op().add_env_variable(env_var)
더 많은 환경 변수를 컴포넌트에 전달하려면 add_env_variable () 더 추가하면 됩니다.
컴포넌트에 추가한 환경 변수를 출력하기 위하여 echo를 사용하였습니다.
def print_env_op(): return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['sh', '-c', 'echo $example_env'], )
다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.
enviroment_variables.py
import kfp from kfp import dsl import kfp.dsl as dsl from kubernetes.client.models import V1EnvVar def print_env_op(): return dsl.ContainerOp( name='Print', image='alpine:3.6', command=['sh', '-c', 'echo $example_env'], ) @dsl.pipeline( name='Env example', description='A pipline showing how to use environment variables' ) def environment_pipeline(): env_var = V1EnvVar(name='example_env', value='env_variable') print_env_op().add_env_variable(env_var) if __name__ == '__main__': kfp.compiler.Compiler().compile(environment_pipeline, __file__ + '.zip') client = kfp.Client() my_experiment = client.create_experiment(name='Sample Experiment') my_run = client.run_pipeline(my_experiment.id, 'Environment pipeline', __file__ + '.zip')
파이프 라인 실행 결과 확인하기
KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Environment pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

ContainerOp에 사이드카 추가하기
파이프 라인을 구성하고 실행하기
dsl.Sidecar()를 이용하여 사이드카를 생성합니다.
echo = dsl.Sidecar( name="echo", image="hashicorp/http-echo:latest", args=['-text="hello world"'], )
dsl.ContainerOp()의 sidcars 파라미터를 이용하여 생성한 사이드카를 추가합니다.
op1 = dsl.ContainerOp( name="download", image="busybox:latest", command=["sh", "-c"], arguments=[ "sleep %s; wget localhost:5678 -O /tmp/results.txt" % sleep_sec ], # sleep for X sec and call the sidecar and save results to output sidecars=[echo], file_outputs={"downloaded": "/tmp/results.txt"}, )
다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.
import kfp import kfp.dsl as dsl @dsl.pipeline( name="pipeline_with_sidecar", description="A pipeline that demonstrates how to add a sidecar to an operation." ) def pipeline_with_sidecar(sleep_sec: int = 30): # sidecar with sevice that reply "hello world" to any GET request echo = dsl.Sidecar( name="echo", image="hashicorp/http-echo:latest", args=['-text="hello world"'], ) # container op with sidecar op1 = dsl.ContainerOp( name="download", image="busybox:latest", command=["sh", "-c"], arguments=[ "sleep %s; wget localhost:5678 -O /tmp/results.txt" % sleep_sec ], # sleep for X sec and call the sidecar and save results to output sidecars=[echo], file_outputs={"downloaded": "/tmp/results.txt"}, ) op2 = dsl.ContainerOp( name="echo", image="library/bash", command=["sh", "-c"], arguments=["echo %s" % op1.output], # print out content of op1 output ) if __name__ == '__main__': kfp.compiler.Compiler().compile(pipeline_with_sidecar, __file__ + '.yaml')
파이프 라인 실행 결과 확인하기
KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Sidecar pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

파이프 라인의 기본 Artifact 저장소를 변경하기
KFP 는 파이프 라인에서 사용하는 아티팩트들은 내부에 설치된 minio에 저장하고 있습니다. 아티팩트의 기본 저장소를 바꾸려면 argo workflow의 workflow-controller configmap을 수정하면 됩니다.
다음 명령어를 실행하면 아티팩트 저장소의 설정 정보를 수정할 수 있습니다.
kubectl -n kubeflow edit configmap workflow-controller-configmap
다음은 아티팩트 저장소의 설정 정보입니다.
... data: config: | { artifactRepository: { s3: { bucket: mlpipeline, keyPrefix: artifacts, endpoint: minio-service.kubeflow:9000, insecure: true, accessKeySecret: { name: mlpipeline-minio-artifact, key: accesskey }, secretKeySecret: { name: mlpipeline-minio-artifact, key: secretkey } } } } ...
자세한 사항은 https://github.com/argoproj/argo/blob/master/docs/configure-artifact-repository.md 를 참고 하실 수 있습니다.