Kubeflow Pipelines – 재사용 가능한 컴포넌트

다양한 파이프 라인에서 재사용할 수 있는 컴포넌트에 대해서 알아 보겠습니다. 그리고 재사용 가능한 컴포넌트를 만들기 위한 컴포넌트 프로그램을 작성하는 방법과 컴포넌트를 정의하는 파일에 대해서 알아 보도록 하겠습니다.

재사용 가능한 컴포넌트 이해하기

재사용 가능한 컴포넌트를 만들면, 다양한 파이프라인에서 쉽게 가져다 사용할 수 있습니다. 재사용 가능한 컴포넌트를 만들기 위해서는 먼저 컴포넌트에서 사용할 프로그램을 작성하고 컨테이너화 합니다. 그리고 Kubeflow Pipelines 시스템에서 사용하는 컴포넌트 스펙을 정의하는 파일을 YAML 형식으로 작성하면 됩니다.

재사용 컴포넌트 만드는 단계

다음은 재사용 컴포넌트를 만드는 단계를 요약 한 것입니다.

  • 컴포넌트에서 사용할 프로그램을 작성하십시오. 프로그램은 파일이나 명령 행 인수를 사용하여 다른 컴퍼넌트와 데이터를 주고 받을 수 있습니다.
  • 프로그램을 컨테이너화 하십시오.
  • Kubeflow Pipelines 시스템의 컴포넌트를 설명하는 컴포넌트 스펙을 YAML 형식으로 작성하십시오.
  • Kubeflow Pipelines SDK 를 사용하여 파이프라인에서 사용할 컴포넌트를 로드하고, 파이프라인을 실행하십시오.

컴포넌트 프로그램

컴포넌트에서 사용할 프로그램을 작성해야 합니다. 합니다. 프로그램은 다른 컴포넌트로부터 데이터를 받기 위해서, 파일이나 명령행 인수를 사용해야 합니다.

컴포넌트 컨테이너화

컴포넌트를 만들기 위해서는 작성한 프로그램을 컨테이너 이미지로 패키지해야 합니다. 컨테이너화 된 컴포넌트는 파이프라인에서 독립적으로 실행됩니다. 컴포넌트는 동일한 프로세스에서 실행되지 않기 때문에, 메모리를 사용하여 컴포넌트간에 데이터를 직접 공유 할 수 없습니다. 그래서, 데이터를 네트워크를 통해 이동할 수 있도록 전달하는 데이터를 직렬화 해야합니다. 그러면, 다운스트림 컴포넌트에서 데이터를 역직렬화 해서 사용할 수 있습니다.

컴포넌트 스펙

Kubeflow Pipelines 시스템은 컴포넌트의 데이터 모델을 정의하기 위해서 YAML 형식의 파일을 사용합니다.

다음은 컴포넌트 정의의 주요 부분입니다.

  • Metadata : 이름, 설명등의 메타데이터를 정의합니다.
  • Interface : 입력과 출력을 위한 값들의 이름, 유형, 기본값등을 정의합니다.
  • Implementation : 제공된 컴포넌트 입력 값들을 가지고 컴포넌트를 실행하는 방법을 정의합니다. 컴포넌트의 실행이 완료되면 출력 값을 얻는 방법도 정의해야합니다.

다음은 컨퍼넌트 스펙을 정의한 YAML 파일입니다.

name: Keras - Train classifier
description: Trains classifier using Keras sequential model
inputs:
  - {name: training_set_features_path, type: {GcsPath: {data_type: TSV}}, description: 'Local or GCS path to the training set features table.'}
  - {name: training_set_labels_path, type: {GcsPath: {data_type: TSV}}, description: 'Local or GCS path to the training set labels (each label is a class index from 0 to num-classes - 1).'}
  - {name: output_model_uri, type: {GcsPath: {data_type: Keras model}}, description: 'Local or GCS path specifying where to save the trained model. The model (topology + weights + optimizer state) is saved in HDF5 format and can be loaded back by calling keras.models.load_model'} #Remove GcsUri and move to outputs once artifact passing support is checked in.
  - {name: model_config, type: {GcsPath: {data_type: Keras model config json}}, description: 'JSON string containing the serialized model structure. Can be obtained by calling model.to_json() on a Keras model.'}
  - {name: number_of_classes, type: Integer, description: 'Number of classifier classes.'}
  - {name: number_of_epochs, type: Integer, default: '100', description: 'Number of epochs to train the model. An epoch is an iteration over the entire `x` and `y` data provided.'}
  - {name: batch_size, type: Integer, default: '32', description: 'Number of samples per gradient update.'}
outputs:
  - {name: output_model_uri, type: {GcsPath: {data_type: Keras model}}, description: 'GCS path where the trained model has been saved. The model (topology + weights + optimizer state) is saved in HDF5 format and can be loaded back by calling keras.models.load_model'} #Remove GcsUri and make it a proper output once artifact passing support is checked in.
implementation:
  container:
    image: gcr.io/ml-pipeline/sample/keras/train_classifier
    command: [python3, /pipelines/component/src/train.py]
    args: [
      --training-set-features-path, {inputValue: training_set_features_path},
      --training-set-labels-path, {inputValue: training_set_labels_path},
      --output-model-path, {inputValue: output_model_uri},
      --model-config-json, {inputValue: model_config},
      --num-classes, {inputValue: number_of_classes},
      --num-epochs, {inputValue: number_of_epochs},
      --batch-size, {inputValue: batch_size},

      --output-model-path-file, {outputPath: output_model_uri},
    ]

컴포넌트 사용하기

Kubeflow Pipelines SDK를 이용하여 컴포넌트를 로드하면, 파이프 라인에서 컴포넌트를 사용할 수 있습니다.

#Load the component
train_op = comp.load_component(url='<https://raw.githubusercontent.com/Ark-kun/pipelines/Added-sample-component/components/sample/keras/train_classifier/component.yaml>')

#Use the component as part of the pipeline
def pipeline():
    train_task = train_op(
        training_set_features_path=os.path.join(testdata_root, 'training_set_features.tsv'),
        training_set_labels_path=os.path.join(testdata_root, 'training_set_labels.tsv'),
        output_model_uri=os.path.join(temp_dir_name, 'outputs/output_model/data'),
        model_config=Path(testdata_root).joinpath('model_config.json').read_text(),
        number_of_classes=2,
        number_of_epochs=10,
        batch_size=32,
    )

컴포넌트간에 데이터 전달하기

컴포넌트의 개념은 함수의 개념과 매우 유사합니다. 모든 컴포넌트는 입력 및 출력을 가질 수 있습니다. 컴포넌트 코드는 입력으로 전달 된 데이터를 가져 와서 출력용 데이터를 생성합니다. 파이프 라인은 실행된 컴포넌트의 출력을, 다른 컴포넌트의 입력으로 데이터를 전달하여 컴포넌트의 데이터를 서로 공유합니다. 이는 함수가 다른 함수를 호출하고 그 결과를 전달하는 방법과 매우 유사합니다. 파이프 라인 시스템은 실제 데이터 전달을 처리하는 반면 컴포넌트는 입력 데이터를 소비하고 출력 데이터를 생성합니다.

컨테이너화 된 프로그램들간의 데이터 전달

컴포넌트를 작성할 때 컴포넌트가 업스트림 및 다운 스트림 컴포넌트와 통신하는 방법에 대해 생각해야합니다. 즉, 입력 데이터를 소비하고 출력 데이터를 생성하는 방법입니다.

데이터 생성

데이터를 출력하려면 컴포넌트의 프로그램이 출력 데이터를 특정 위치에 기록하여, 시스템이 다른 컴포넌트에게 데이터를 전달할 수 있도록 해야합니다. 즉 프로그램은 데이터를 저장하는 경로를 하드 코딩해서는 안됩니다. 프로그램은 출력 데이터 경로를 명령행 인수로 받아서 처리해야합니다.

외부 시스템에서 데이터 생성

일부 시나리오에서는 컴포넌트의 출력 데이터를 외부 서비스에 저장할 수도 있습니다. 이 경우에 컴포넌트의 프로그램은 데이터를 외부 서비스에 저장한 후, 해당 데이터의 위치 식별자 정보를 출력해야합니다. 그래야 다른 컴포넌트에서 데이터를 가져올 수 있기 때문입니다. 이 경우에는 데이터가 Kubeflow Pipilines 시스템에 보관되지 않기 때문에, 파이프 라인 시스템은 데이터에 대한 일관 성 및 재현성을 보장 할 수 없습니다.

데이터 소비

명령행 프로그램이 데이터를 소비하는 방법은 일반적으로 두 가지가 있습니다.

  • 데이터가 작을 경우에는 보통 명령행 인수로 바로 전달합니다. program.py --param 100
  • 데이터 크거나, 이진 데이터일 경우에는 파일로 저장한 후, 파일 경로를 명령행 인수로 전달합니다. 파일로 저장한 데이터를 다른 컴포넌트에게 전달하려면, Kubeflow Pipelines 시스템이 이 정보를 알고 있어야 합니다.

재사용 가능한 컴포넌트 만들기

컴포넌트 파일 구성

다음은 Kubeflow Pipielins에서 권장하는, 컴포넌트 파일의 구성입니다. 반드시 이러한 방식으로 파일을 구성할 필요는 없습니다. 하지만 표준을 정해 놓으면, 이미지 작성, 테스트에 동일한 스크립트를 재사용할 수 있는 장정이 있습니다.

components/<component group>/<component name>/

    src/*            #Component source code files
    tests/*          #Unit tests
    run_tests.sh     #Small script that runs the tests
    README.md        #Documentation. Move to docs/ if multiple files needed

    Dockerfile       #Dockerfile to build the component container image
    build_image.sh   #Small script that runs docker build and docker push

    component.yaml   #Component definition in YAML format

프로그램 코드 작성

두 개의 입력 데이터와 하나의 출력 데이터가 있는 프로그램을 작성해 보겠습니다. 두 개의 입력 데이터는 명령행 인수로 바로 전달하는 작은 데이터와, 파일 경로를 명령행 인수로 전달하는 큰 데이테를 사용합니다. 이 예제는 파이썬 3 으로 작성되었습니다.

program.py

#!/usr/bin/env python3
import argparse
from pathlib import Path

def do_work(input1_file, output1_file, param1):
  for x in range(param1):
    line = next(input1_file)
    if not line:
      break
    _ = output1_file.write(line)

# Defining and parsing the command-line arguments
parser = argparse.ArgumentParser(description='Program description')
parser.add_argument('--input-path', type=str, help='Path of the local file containing the Input data.')
parser.add_argument('--param', type=int, default=100, help='Parameter.')
parser.add_argument('--output-path', type=str, help='Path of the local file where the Output data should be written.')
args = parser.parse_args()

Path(args.output1_path).parent.mkdir(parents=True, exist_ok=True)

with open(args.input1_path, 'r') as input1_file:
    with open(args.output1_path, 'w') as output1_file:
        do_work(input1_file, output1_file, args.param1)

이 프로그램의 명령행 호출은 다음과 같습니다

python3 program.py --input1-path <local file path to Input 1 data> \\
                   --param1 <value of Param1 input> \\
                   --output1-path <local file path for the Output 1 data>

Dockerfile을 작성하고, 프로그램을 컨테이너화하기

컨테이너 이미지를 만들기 위해서 Dockerfile 을 생성합니다. 파이썬 코드를 사용하고 있으므로, 베이스 이미지를 파이썬으로 하였습니다.

Dockerfile

FROM python:3.6

COPY ./src /pipelines/component/src

Dockerfile을 기반으로 컨테이너 이미지를 빌드하고, 해당 이미지를 컨테이너 이미지 레지스트리로 푸시하기 위해서 build_image.sh  스크립트를 작성하였습니다.

build_image.sh

#!/bin/bash -e
image_name=kangwoo/kfp-component # Specify the image name here
image_tag=latest
full_image_name=${image_name}:${image_tag}

cd "$(dirname "$0")"
docker build -t "${full_image_name}" .
docker push "$full_image_name"

# Output the strict image name (which contains the sha256 image digest)
docker inspect --format="{{index .RepoDigests 0}}" "${full_image_name}"

스크립트를 실행 가능하게 만듭니다.

chmod +x build_image.sh

build_image.sh  스크립트를 실행하면, 컨테이너 이미지가 빌드되고, 지정한 컨테이너 이미지 레시트리로 이미지가 푸시됩니다.

Sending build context to Docker daemon  7.168kB
Step 1/2 : FROM python:3.6
 ---> 1daf62e8cab5
Step 2/2 : COPY ./src /pipelines/component/src
 ---> Using cache
 ---> 2bc266c5c9d8
Successfully built 2bc266c5c9d8
Successfully tagged kangwoo/kfp-component:latest
The push refers to repository [docker.io/kangwoo/kfp-component]
cce013c10a7c: Preparing 
cce013c10a7c: Layer already exists 
... 
latest: digest: sha256:30ea205b7cb1253a36f82f1ec99f0eec87cadd95938317ee3c802f2b78cec368 size: 2424
kangwoo/kfp-component@sha256:30ea205b7cb1253a36f82f1ec99f0eec87cadd95938317ee3c802f2b78cec368

컴포넌트 정의 파일 작성

To create a component from your containerized program you need to write component specification in YAML format that describes the component for the Kubeflow Pipelines system.

컨테이너화 된 프로그램을 이용해서 컴포넌트로 만들려면, Kubeflow Pipelines 시스템에서 사용하는 YAML 형식의 컴포넌트 스펙을 작성해야합니다.

component.yaml파일을 생성하고, 컴포넌트의 구현(implementation) 섹션에서 사용할 컨테이너 이미지를 지정합니다. 그리고 명령(command) 섹션에서 컨테이에 포함된 프로그램을 실행하기 위해 명령을 지정합니다.

implementation:
  container:
    image: kangwoo/kfp-component:latest
    command: [
      python3, /pipelines/component/src/program.py,
      --input-path,  {inputPath:  input_1},
      --param,       {inputValue: parameter_1},
      --output-path, {outputPath: output_1},
    ]

command 섹션에는 앵글 괄호로 표현되는 플레이스홀더가 포함되어 있습니다. 플레이스홀더는 프로그램이 실행 되기 전에 특정 값 또는 경로로 대체됩니다. component.yaml 에서는 매핑 구문을 사용하여 플레이스홀더를 지정할 수 있습니다.

사용 가능한 플레이스홀더는 세 가지가 있습니다.

  • {inputValue: Some input name} : 이 플레이스홀더는 지정한 입력을 인수의 값으로 대체됩니다. 작은 데이터에 유용합니다.
  • {inputPath: Some input name} : 이 플레이스홀더는 입력 데이터를 컴포넌트로 전달하기 위해서 자동 생성된 로컬 파일의 경로로 대체됩니다. 즉, 파이프라인 시스템이 입력 인수 데이터를 파일로 쓰고, 해당 데이터 파일의 경로를 컴포넌트 프로그램에 전달하게 되는 것입니다.
  • {outputPath: Some output name}: 이 플레이스홀더는 프로그램이 출력 데이터를 저장해야 하는 자동 생성된 로컬 파일 경로로 대체됩니다.

명령행에 플레이스 홀더를 배치하는 것 외에도, 입력(inputs) 및 출력(outputs) 섹션에 해당 입력 및 출력 스펙을 추가해야합니다. 입력/출력 스펙에는 이름, 유형, 설명 및 기본값이 포함되어 있습니다. 이중에서 이름(name)은 반드시 포함되어야합니다. 입력/출력에 사용하는 이름은 자유로운 형식의 문자열이지만 YAML 문법을 따라야 합니다.

inputs:
- {name: input_1, type: String, description: 'Data for Input 1'}
- {name: parameter_1, type: Integer, default: '1', description: 'Parameter 1 description'}
outputs:
- {name: output_1, description: 'Output 1 data'}

컴포넌트의 이름과 설명 같은 메타 데이터를 추가합니다.

name: Multiply component
description: Multiplication.

component.yaml

name: Multiply component
description: Multiplication.
inputs:
- {name: input_1, type: String, description: 'Data for Input 1'}
- {name: parameter_1, type: Integer, default: '1', description: 'Parameter 1 description'}
outputs:
- {name: output_1, description: 'Output 1 data'}
implementation:
  container:
    image: kangwoo/kfp-component:latest
    command: [
      python3, /pipelines/component/src/program.py,
      --input-path,  {inputPath:  input_1},
      --param,       {inputValue: parameter_1},
      --output-path, {outputPath: output_1},
    ]

Kubeflow Pipelines SDK를 사용하여 파이프 라인에서 컴포넌트 사용하기

컴포넌트를 로드하고, 이를 사용하여 파이프 라인을 구성하는 방법을 알아보겠습니다. 예제 파이프라인은 3단계로 이루어져 있습니다.

1단계 number_op()는 1-10까지의 숫자를 출력합니다. 2단계인 multiply_op()는 출력된 숫자를 입력으로 받아서 곱하기 연산을 하고 그 결과를 출력합니다. 그리고 3단계인 print_op()는 출력된 곱하기 결과를 화면에 출력합니다.

2단계인 multiply_op()에서 직접 만든 재사용 컹포넌트를 사용하겠습니다.

1단계에 사용하는 number_op()는 1-10까지의 숫자를 출력합니다.

def number_op():
    return dsl.ContainerOp(
        name='Generate numbers',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "print(\\'1\\\\n2\\\\n3\\\\n4\\\\n5\\\\n6\\\\n7\\\\n8\\\\n9\\\\n10\\')" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )

3단계에서 사용하는 print_op()는 입력된 결과를 화면에 출력합니다

def print_op(msg):
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )

load_component_from_file 호출하여 컴포넌트를 로드합니다. 만약 파일이 아니라 URL을 사용한다면 load_component_from_url을 대신 사용할 수 있습니다. 컴포넌트를 로드하기 위해서는 component.yaml 파일에 접근할 수 있으면 됩니다. 그리고 파이프 라인이 실행되는 쿠버네티스 클러스터는 컴포넌트에 정의된 컨테이너 이미지에 접근할 수 있어야합니다.

component_root = './multiply'
multiply_op = kfp.components.load_component_from_file(os.path.join(component_root, 'component.yaml'))
# multiply_op = kfp.components.load_component_from_url('https://....../component.yaml')

DSL을 사용하여 파이프 라인을 구성합니다. load_component_from_file을 호출하여 로드한 multiply_op 컴포넌트를 사용합니다.

KFP SDK 사용하여 코드에서 파이프 라인을 컴파일하고, 바로 실행하였습니다.

@dsl.pipeline(
    name='My multiply component pipeline',
    description='A pipeline with my component.'
)
def multiply_pipeline():
    numbers = number_op()
    multiply_task = multiply_op(
        input_1=numbers.output,
        parameter_1='6',
    )
    print_op(multiply_task.outputs['output_1'])

multiply_op()의 출력은 multiply_task.outputs [ ‘output_1’] 형식으로 사용할 수 있습니다.

다음은 파이프라인을 구성하고 실행하는 전체 코드입니다.

multiply_pipeline.py

import os
import kfp
from kfp import dsl



def number_op():
    return dsl.ContainerOp(
        name='Generate numbers',
        image='python:alpine3.6',
        command=['sh', '-c'],
        arguments=['python -c "print(\\'1\\\\n2\\\\n3\\\\n4\\\\n5\\\\n6\\\\n7\\\\n8\\\\n9\\\\n10\\')" | tee /tmp/output'],
        file_outputs={'output': '/tmp/output'}
    )


def print_op(msg):
    return dsl.ContainerOp(
        name='Print',
        image='alpine:3.6',
        command=['echo', msg],
    )


component_root = './multiply'
multiply_op = kfp.components.load_component_from_file(os.path.join(component_root, 'component.yaml'))
# multiply_op = kfp.components.load_component_from_url('https://....../component.yaml')

@dsl.pipeline(
    name='My multiply component pipeline',
    description='A pipeline with my component.'
)
def multiply_pipeline():
    numbers = number_op()
    multiply_task = multiply_op(
        input_1=numbers.output,
        parameter_1='6',
    )
    print_op(multiply_task.outputs['output_1'])


if __name__ == '__main__':
    kfp.compiler.Compiler().compile(multiply_pipeline, __file__ + '.zip')

    client = kfp.Client()
    my_experiment = client.create_experiment(name='Basic Experiment')
    my_run = client.run_pipeline(my_experiment.id, 'Multiply component pipeline', __file__ + '.zip')

파이프 라인 실행 결과 확인하기

KFP UI 화면에서 결과를 조회해 볼 수 있습니다. 왼쪽 메뉴에서 Experiments 클릭하면 Experiment 목록을 조회할 수 있습니다. 예제에서 사용한 Experiment 이름이 “Sample Experiment” 이므로, Sample Experiment를 클릭하면 실행 목록을 확인 할 수 있습니다.

“Run name”이 “Multiply component pipeline” 인 것을 클릭하면 실행 결과를 확인할 수 있습니다.

댓글 남기기

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다