파이프라인 컴포넌트에서 PVC 사용하기
파이프라인 컴포넌트에서 모델을 학습하고 PV에 저장하려면, PVC를 사용하면 됩니다. PV이 마운트 되면, 내부 파일시스템처럼 접근하여 데이터를 저장하고 읽을 수 있습니다.
PVC 를 생성하기
PV를 사용하기 위하여, 쿠버네티스 리소스인 PVC
를 생성해 보겠습니다.
다음은 100Mi의 용량을 가지는 PersistentVolumeClaim
매니페스트입니다.
kfp-pvc.yaml
kind: PersistentVolumeClaim apiVersion: v1 metadata: name: kfp-pvc spec: accessModes: - ReadWriteOnce resources: requests: storage: 100Mi
kubectl
을 사용하여 kubeflow
네임스페이스 PVC 를 생성하겠습니다.
kubectl -n kubeflow apply -f kfp-pvc.yaml
텐서플로우 mnist 모델을 학습하고 S3에 저장하기
mnist 모델을 S3에 저장하겠습니다. 코드는 앞서 사용한 Tensorflow 코드와 동일하기 때문에 자세한 설명은 생략하겠습니다.
tensorflow_mnist.py
from __future__ import absolute_import, division, print_function, unicode_literals import argparse import os import tensorflow as tf def train(): print("TensorFlow version: ", tf.__version__) parser = argparse.ArgumentParser() parser.add_argument('--model_path', default='./model', type=str) args = parser.parse_args() version = 1 export_path = os.path.join(args.model_path, str(version)) (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10, activation='softmax') ]) model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01), loss='sparse_categorical_crossentropy', metrics=['accuracy']) print("Training...") training_history = model.fit(x_train, y_train, batch_size=64, epochs=10, validation_split=0.2) print('\\nEvaluate on test data') results = model.evaluate(x_test, y_test, batch_size=128) print('test loss, test acc:', results) model.save(export_path) print('"Saved model to {}'.format(export_path)) if __name__ == '__main__': train()
컨테이너 이미지를 만들기
컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우를 기본 이미지로 사용합니다.
Dockerfile
FROM tensorflow/tensorflow:2.1.0-py3 RUN mkdir -p /app ADD tensorflow_mnist.py /app/
컨테이너 이미지를 빌드하겠습니다.
docker build -t kangwoo/kfp-mnist-storage:0.0.1 .
빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.
docker push kangwoo/kfp-mnist-storage:0.0.1
컴포넌트 작성
Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 텐서플로우가 실행되는 컴포트넌트에서 PV에 접근할 수 있도록, PVC를 마운트 합니다.
pvc_name = "kfp-pvc" volume_name = 'pipeline' volume_mount_path = '/mnt/pipeline' dsl.ContainerOp( name='mnist_pvc', image='kangwoo/kfp-mnist-storage:0.0.1', arguments=['--model', '/mnt/pipeline/kfp/mnist/model'] ).add_volume(V1Volume(name=volume_name, persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name))) \\ .add_volume_mount(V1VolumeMount(mount_path=volume_mount_path, name=volume_name))
파이프라인 작성
Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.
import kfp from kfp import dsl from kubernetes.client.models import V1PersistentVolumeClaimVolumeSource, \\ V1Volume, V1VolumeMount def pipeline_pvc(): pvc_name = "kfp-pvc" volume_name = 'pipeline' volume_mount_path = '/mnt/pipeline' dsl.ContainerOp( name='mnist_pvc', image='kangwoo/kfp-mnist-storage:0.0.1', arguments=['--model', '/mnt/pipeline/kfp/mnist/model'] ).add_volume(V1Volume(name=volume_name, persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name))) \\ .add_volume_mount(V1VolumeMount(mount_path=volume_mount_path, name=volume_name)) if __name__ == '__main__': my_run = kfp.Client().create_run_from_pipeline_func(pipeline_pvc, arguments={}, experiment_name='Sample Experiment')
파이프라인을 실행하면 학습된 모델이 지정한 위치에 저장됩니다.
onprem.mount_pvc() 사용하기
앞 예제에서는 PVC를 직접 마운트해 주었습니다. kfp
에서 제공하는 onprem.mount_pvc()
메소드를 사용하면 보다 간단히 사용할 수 있습니다.
다음은 onprem.mount_pvc()
를 사용하는 컴포넌트 예제입니다.
import kfp from kfp import dsl from kfp import onprem def pipeline_pmount_pvc(): pvc_name = "kfp-pvc" volume_name = 'pipeline' volume_mount_path = '/mnt/pipeline' dsl.ContainerOp( name='mnist_mount_pvc', image='kangwoo/kfp-mnist-storage:0.0.1', arguments=['--model', '/mnt/pipeline/kfp/mnist/model'] ).apply(onprem.mount_pvc(pvc_name, volume_name=volume_name, volume_mount_path=volume_mount_path)) if __name__ == '__main__': my_run = kfp.Client().create_run_from_pipeline_func(pipeline_pmount_pvc, arguments={}, experiment_name='Sample Experiment')
파이프라인 컴포넌트에서 S3 사용하기
파이프라인 컴포넌트에서 모델을 학습하고 S3에 저장하려면, S3에 접속할 수 있는 정보가 필요합니다. 환경 변수를 이용하여 접속 정보를 컴포넌트에 전달할 수 있습니다.
S3 Secret 을 생성하기
S3에 접속하여 데이터를 저장하거나 가져오려면 접속 정보가 필요합니다. 중요한 정보인 AWS_ACCESS_KEY_ID
와 AWS_SECRET_ACCESS_KEY
는 쿠버네티스트 Secret
리소스에 저장하겠습니다.
다음은 Kubeflow에서 제공하는 minio를 s3 저장소로 사용하는 Secret
매니페스트 입니다. data
섹션에 있는 AWS_ACCESS_KEY_ID
와 AWS_SECRET_ACCESS_KEY
필드의 값은 BASE64 로 인코딩 된 값을 사용해야합니다.
export AWS_ACCESS_KEY_ID=minio export AWS_SECRET_ACCESS_KEY=minio123 kubectl -n kubeflow create secret generic kfp-aws-secret \\ --from-literal=AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \\ --from-literal=AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
생성한 kfp-aws-secret
을 조회해 보겠습니다.
kubectl -n kubeflow get secret kfp-aws-secret
정상적으로 조회되면, 다음과 같은 결과를 확인할 수 있습니다.
apiVersion: v1 kind: Secret metadata: name: kfp-aws-secret type: Opaque data: AWS_ACCESS_KEY_ID: bWluaW8= AWS_SECRET_ACCESS_KEY: bWluaW8xMjM=
텐서플로우 mnist 모델을 학습하고 S3에 저장하기
mnist 모델을 S3에 저장하겠습니다. 코드와 컨테이너 이미지는 앞서 사용한 PVC 저장 코드와 동일하기 때문에, “파이프라인 작성” 단계로 건너 뛰어도 됩니다.
tensorflow_mnist.py
from __future__ import absolute_import, division, print_function, unicode_literals import argparse import os import tensorflow as tf def train(): print("TensorFlow version: ", tf.__version__) parser = argparse.ArgumentParser() parser.add_argument('--model_path', default='./model', type=str) args = parser.parse_args() version = 1 export_path = os.path.join(args.model_path, str(version)) (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10, activation='softmax') ]) model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01), loss='sparse_categorical_crossentropy', metrics=['accuracy']) print("Training...") training_history = model.fit(x_train, y_train, batch_size=64, epochs=10, validation_split=0.2) print('\\nEvaluate on test data') results = model.evaluate(x_test, y_test, batch_size=128) print('test loss, test acc:', results) model.save(export_path) print('"Saved model to {}'.format(export_path)) if __name__ == '__main__': train()
컨테이너 이미지를 만들기
컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우를 기본 이미지로 사용합니다.
Dockerfile
FROM tensorflow/tensorflow:2.1.0-py3 RUN mkdir -p /app ADD tensorflow_mnist.py /app/
컨테이너 이미지를 빌드하겠습니다.
docker build -t kangwoo/kfp-mnist-storage:0.0.1 .
빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.
docker push kangwoo/kfp-mnist-storage:0.0.1
컴포넌트 작성
Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 텐서플로우가 실행되는 컴포트넌트에서 s3에 접근할 수 있도록, s3 설정 정보를 환경 변수로 넘겨 주었습니다.
secret_name = "kfp-aws-secret" s3_endpoint = 'minio-service.kubeflow.svc.cluster.local:9000' minio_endpoint = "http://" + s3_endpoint minio_region = "us-east-1" dsl.ContainerOp( name='tensorboard', image='kangwoo/kfp-mnist-s3:0.0.1', arguments=['--model', 's3://tensorflow/kfp/mnist/model'] ).add_env_variable(V1EnvVar(name='S3_ENDPOINT', value=s3_endpoint)) \\ .add_env_variable(V1EnvVar(name='AWS_ENDPOINT_URL', value=minio_endpoint)) \\ .add_env_variable(V1EnvVar(name='AWS_ACCESS_KEY_ID', value_from=V1EnvVarSource( secret_key_ref=V1SecretKeySelector(name=secret_name, key='AWS_ACCESS_KEY_ID')))) \\ .add_env_variable(V1EnvVar(name='AWS_SECRET_ACCESS_KEY', value_from=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(name=secret_name, key='AWS_SECRET_ACCESS_KEY')))) \\ .add_env_variable(V1EnvVar(name='AWS_REGION', value=minio_region)) \\ .add_env_variable(V1EnvVar(name='S3_USE_HTTPS', value='0')) \\ .add_env_variable(V1EnvVar(name='S3_VERIFY_SSL', value='0'))
파이프라인 작성
Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.
import kfp from kfp import dsl from kubernetes.client.models import V1EnvVar, V1EnvVarSource, V1SecretKeySelector def pipeline_s3(): secret_name = "kfp-aws-secret" s3_endpoint = 'minio-service.kubeflow.svc.cluster.local:9000' minio_endpoint = "http://" + s3_endpoint minio_region = "us-east-1" dsl.ContainerOp( name='mnist-s3', image='kangwoo/kfp-mnist-storage:0.0.1', arguments=['--model', 's3://tensorflow/kfp/mnist/model'] ).add_env_variable(V1EnvVar(name='S3_ENDPOINT', value=s3_endpoint)) \\ .add_env_variable(V1EnvVar(name='AWS_ENDPOINT_URL', value=minio_endpoint)) \\ .add_env_variable(V1EnvVar(name='AWS_ACCESS_KEY_ID', value_from=V1EnvVarSource( secret_key_ref=V1SecretKeySelector(name=secret_name, key='AWS_ACCESS_KEY_ID')))) \\ .add_env_variable(V1EnvVar(name='AWS_SECRET_ACCESS_KEY', value_from=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(name=secret_name, key='AWS_SECRET_ACCESS_KEY')))) \\ .add_env_variable(V1EnvVar(name='AWS_REGION', value=minio_region)) \\ .add_env_variable(V1EnvVar(name='S3_USE_HTTPS', value='0')) \\ .add_env_variable(V1EnvVar(name='S3_VERIFY_SSL', value='0')) if __name__ == '__main__': my_run = kfp.Client().create_run_from_pipeline_func(pipeline_s3, arguments={}, experiment_name='Sample Experiment')
파이프라인을 실행하면 학습된 모델이 지정한 위치에 저장됩니다.
aws.use_aws_secret() 사용하기
앞 예제에서는 환경 변수에 직접 값들을 설정해 주었습니다. kfp
에서 제공하는 aws.use_aws_secret()
메소드를 사용하면 보다 간단히 인증 정보를 설정할 수 있습니다. 하지만 현재 버전에서는 AWS_ACCESS_KEY_ID
와 AWS_SECRET_ACCESS_KEY
값 만을 변경할 수 있기 때문에 aws 에서 제공하는 s3만 사용할 수 있습니다. 다시 말해서 minio는 사용할 수 없습니다.
다음은 aws.use_aws_secret()
를 사용하는 컴포넌트 예제입니다.
import kfp from kfp import aws from kfp import dsl def pipeline_use_aws_secret(): secret_name = "kfp-aws-secret" dsl.ContainerOp( name='mnist_use_aws_secret', image='kangwoo/kfp-mnist-storage:0.0.1', arguments=['--model', 's3://tensorflow/kfp/mnist/model'] ).apply(aws.use_aws_secret(secret_name, aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY')) if __name__ == '__main__': my_run = kfp.Client().create_run_from_pipeline_func(pipeline_use_aws_secret, arguments={}, experiment_name='Sample Experiment')
파이프라인 컴포넌트에서 GCS 사용하기
파이프라인 컴포넌트에서 모델을 학습하고 GCS에 저장하려면, GCS에 접속할 수 있는 정보가 필요합니다. 환경 변수를 이용하여 접속 정보를 컴포넌트에 전달할 수 있습니다.
GCS Secret 을 생성하고 서비스 계정에 추가하기
컴포넌트에서 GCS에 접속하여 데이터를 저장하거나 가져오려면 접속 정보가 필요합니다.
구글 클라우드의 JSON 서비스 계정 키를 다운로드 합니다. 이 서비스 계정은 사용할 GCS에 접근할 권한이 부여되어 있어야합니다.
다음은 서비스 계정 키를 gcp-sa-credentials.json
파일로 다운로드 하는 명령어 입니다. [SA-NAME] 에는 서비스 계정의 아이디를, [PROJECT-ID] 에는 프로젝트 아이디를 입력하면 됩니다.
gcloud iam service-accounts keys create gcp-sa-credentials.json \\ --iam-account [SA-NAME]@[PROJECT-ID].iam.gserviceaccount.com
다운로드 받은 구글 클라우드의 서비스 계정 키를, 쿠버네티스 Secret
에 등록합니다.
다음은 kubeflow 네임스페이스 Secret
을 생성하는 명령어 입니다.
kubectl -n kubeflow create secret generic kfp-gcp-sa \\ --from-file=gcloud-application-credentials.json=gcp-sa-credentials.json
텐서플로우 mnist 모델을 학습하고 GCS에 저장하기
mnist 모델을 GCS에 저장하겠습니다. 코드와 컨테이너 이미지는 앞서 사용한 S3 저장 코드와 동일하기 때문에, “파이프라인 작성” 단계로 건너 뛰어도 됩니다.
tensorflow_mnist.py
from __future__ import absolute_import, division, print_function, unicode_literals import argparse import os import tensorflow as tf def train(): print("TensorFlow version: ", tf.__version__) parser = argparse.ArgumentParser() parser.add_argument('--model_path', default='./model', type=str) args = parser.parse_args() version = 1 export_path = os.path.join(args.model_path, str(version)) (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10, activation='softmax') ]) model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01), loss='sparse_categorical_crossentropy', metrics=['accuracy']) print("Training...") training_history = model.fit(x_train, y_train, batch_size=64, epochs=10, validation_split=0.2) print('\\nEvaluate on test data') results = model.evaluate(x_test, y_test, batch_size=128) print('test loss, test acc:', results) model.save(export_path) print('"Saved model to {}'.format(export_path)) if __name__ == '__main__': train()
컨테이너 이미지를 만들기
컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우를 기본 이미지로 사용합니다.
Dockerfile
FROM tensorflow/tensorflow:2.1.0-py3 RUN mkdir -p /app ADD tensorflow_mnist.py /app/
컨테이너 이미지를 빌드하겠습니다.
docker build -t kangwoo/kfp-mnist-storage:0.0.1 .
빌드한 컨테이너 이미지를 컨테이너 이미지 레지스트리에 업로드 합니다.
docker push kangwoo/kfp-mnist-storage:0.0.1
컴포넌트 작성
Kubeflow Pipelines DSL을 사용하여 컴포넌트를 작성합니다. 컴포넌트에서 사용하는 컨테이너 이미지를 정의합니다. 텐서플로우가 실행되는 컴포트넌트에서 gcs에 접근할 수 있도록, gcs 설정 정보를 환경 변수로 넘겨 주었습니다. kfp-gcp-sa
를 볼륨 마운트 한 다음, GOOGLE_APPLICATION_CREDENTIALS
라는 환경 변수에 인증 정보가 들어 있는 json 파일의 위치를 설정해 줍니다.
GCSCredentialFileName = "gcloud-application-credentials.json" GCSCredentialVolumeName = "user-gcp-sa" GCSCredentialVolumeMountPath = "/var/secrets/" GCSCredentialEnvKey = "GOOGLE_APPLICATION_CREDENTIALS" GCSCredentialFilePath = os.path.join(GCSCredentialVolumeMountPath, GCSCredentialFileName) secret_name = 'kfp-gcp-sa' dsl.ContainerOp( name='mnist-gcs', image='kangwoo/kfp-mnist-storage:0.0.1', arguments=['--model', 'gcs://kfp-bucket/kfp/mnist/model'] ).add_volume(V1Volume(name=GCSCredentialVolumeName, secret=V1SecretVolumeSource(secret_name=secret_name))) \\ .add_volume_mount(V1VolumeMount(name=GCSCredentialVolumeName, mount_path=GCSCredentialVolumeMountPath)) \\ .add_env_variable(V1EnvVar(name=GCSCredentialEnvKey, value=GCSCredentialFilePath))
파이프라인 작성
Kubeflow Pipelines DSL을 사용하여 파이프 라인 함수를 작성합니다. 파이프 라인을 정의하고 사용하는 컴포넌트들을 추가합니다. Kubeflow Pipelines SDK 를 사용하여 파이프라인을 빌드 한 후, 업로드하고 실행합니다.
import os import kfp from kfp import dsl from kubernetes.client.models import V1EnvVar, V1VolumeMount, V1Volume, \\ V1SecretVolumeSource def pipeline_gcs(): GCSCredentialFileName = "gcloud-application-credentials.json" GCSCredentialVolumeName = "user-gcp-sa" GCSCredentialVolumeMountPath = "/var/secrets/" GCSCredentialEnvKey = "GOOGLE_APPLICATION_CREDENTIALS" GCSCredentialFilePath = os.path.join(GCSCredentialVolumeMountPath, GCSCredentialFileName) secret_name = 'kfp-gcp-sa' dsl.ContainerOp( name='mnist-gcs', image='kangwoo/kfp-mnist-storage:0.0.1', arguments=['--model', 'gs://kfp-bucket/kfp/mnist/model'] ).add_volume(V1Volume(name=GCSCredentialVolumeName, secret=V1SecretVolumeSource(secret_name=secret_name))) \\ .add_volume_mount(V1VolumeMount(name=GCSCredentialVolumeName, mount_path=GCSCredentialVolumeMountPath)) \\ .add_env_variable(V1EnvVar(name=GCSCredentialEnvKey, value=GCSCredentialFilePath)) if __name__ == '__main__': my_run = kfp.Client().create_run_from_pipeline_func(pipeline_gcs, arguments={}, experiment_name='Sample Experiment')
gcp.use_gcp_secret() 사용하기
앞 예제에서는 환경 변수에 직접 값들을 설정해 주었습니다. kfp
에서 제공하는 gcp.use_gcp_secret()
메소드를 사용하면 보다 간단히 인증 정보를 설정할 수 있습니다.
다음은 gcp.use_gcp_secret()
를 사용하는 컴포넌트 예제입니다.
import kfp from kfp import dsl from kfp import gcp def pipeline_use_gcp_secret(): secret_name = 'kfp-gcp-sa' secret_file_path_in_volume = '/gcloud-application-credentials.json' dsl.ContainerOp( name='mnist_use_gcp_secret', image='kangwoo/kfp-mnist-storage:0.0.1', arguments=['--model', 'gs://kfp-bucket/kfp/mnist/model'] ).apply(gcp.use_gcp_secret(secret_name, secret_file_path_in_volume=secret_file_path_in_volume)) if __name__ == '__main__': my_run = kfp.Client().create_run_from_pipeline_func(pipeline_use_gcp_secret, arguments={}, experiment_name='Sample Experiment')
한꺼번에 외부 저장소 추가하기
파이프라인의 단계가 많아지면 일일히 외부 저장소를 추가해 주는 작업이 번거로울 수 있습니다. 그럴 경우에는 다음 예제처럼 반복문을 이용해서 저장소를 추가해 줄 수 있습니다.
ingest = dsl.ContainerOp(...) transformation = dsl.ContainerOp(...) train = dsl.ContainerOp(...) steps = [ingest, transformation, train] for step in steps: step.apply(oprem.mount_pvc(pvc_name, 'local-storage', '/mnt'))