머신 러닝 파이프라인을 만들기 위하여, 캐글의 집 값 데이터를 사용해 보도록 하겠습니다.
집 값 예측은, 미국 아이오와 주의 에임스에 있는 주거용 주택의 정보를 이용하여 주택의 판매가격을 예측하는 Competition입니다.
이 장에서는 집 값 예측 결과를 캐글에 제출하기 위한 파이프라인을 구성해 볼 것입니다. 그리고 파이프라인에서 Katib를 이용하여 모델의 하이퍼 파라미터 튜닝을 할 것입니다. 데이터를 저장하기 위해서, PVC와 S3 두 개 모두 사용하겠습니다.
데이터 분석 보다는 전체적인 파이프라인을 만드는 것에 중점을 두고 설명하겠습니다.
사전 준비
House Prices: Advanced Regression Techniques 접속
예제에서 사용할 데이터가 있는 집 값예측 Competition 의 주소는 다음과 같습니다.
<https://www.kaggle.com/c/house-prices-advanced-regression-techniques>
해당 주소로 직접 접속하셔도 되고, 상단에 있는 검색바를 이용하여 검색하셔도 됩니다.
다음은 검색바를 이용하여, “House Prices: Advanced Regression Techniques” 을 검색한 결과입니다.

“House Prices: Advanced Regression Techniques” Competition 페이지 접속하면 다음과 같은 화면을 볼 수 있습니다.

데이터는 “Data” 탭에서 받을 수 있습니다. “Data” 탭을 클릭하면, 데이터에 대한 자세한 설명과 다운로드 받는 방법을 볼 수 있습니다.

주피터 노트북
주피터에서 새로운 Terminal
을 엽니다.
주피터 노트북에서 타이타닉 데이터 다운로드하기
작업 디렉토리를 생성한 다음, 타이타닉 데이터를 다운로드 합니다. house-prices-advanced-regression-techniques.zip
라는 파일이 다운로드 됩니다.
mkdir -p ~/workspace/house-prices/kaggle cd ~/workspace/house-prices/kaggle kaggle competitions download -c house-prices-advanced-regression-techniques
Downloading house-prices-advanced-regression-techniques.zip to /home/jovyan/workspace/house-prices/kaggle 0%| | 0.00/34.1k [00:00<?, ?B/s] 100%|██████████████████████████████████████████████████████████████████████████████████| 34.1k/34.1k [00:00<00:00, 842kB/s]
다운로드한 파일의 압축을 풀겠습니다. 모델 학습을 위한 train.csv
파일과, 예측에 사용할 데이터인 test.csv
그리고, 캐글에 제출할 파일의 형식을 보여주는 sample_submission.csv
파일이 생성됩니다.
unzip house-prices-advanced-regression-techniques.zip
Archive: house-prices-advanced-regression-techniques.zip inflating: data_description.txt inflating: sample_submission.csv inflating: test.csv inflating: train.csv
파이썬 패키지 설치
머신 러닝 모델 코드를 작성하기 위한 패키지들을 설치합니다. 이 장에서는 xgboost
패키지와 pandas_profiling
패키지를 추가로 사용합니다. 만약 설치되어 있지 않다면, 다음 명령어를 실행하여 패키지를 설치합니다.
pip install xgboost pandas_profiling --user
데이터 전처리와 모델 작성
문제 정의하기
주어진 데이터를 바탕으로, 집 값을 예측하는 문제입니다.
데이터 전처리
머신 러닝 모델에서 데이터를 사용할 수 있도록, 데이터를 전처리 해 보겠습니다. 먼저 캐글에서 제공한 데이터를 살펴보도록 하겠습니다. 이번에는 pandas_profiling
라는 패키지를 사용해 보겠습니다.
집 값 데이터 분석 및 검증
먼저 집 값 데이터에 대해 이해하기 위하여, 간단히 탐색해보도록 하겠습니다.
주피터 노트북을 생성합니다. 노트북의 위치는 ~/workspace/house-prices
입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices
에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.
주피터 노트북을 이용하여 데이터를 분석 및 검증해 보겠습니다.
판다스를 이용하여, 다운받은 캐글 데이터를 읽어 오겠습니다.
In[] :
import os import pandas as pd input_path='./kaggle' train_data = pd.read_csv(os.path.join(input_path, 'train.csv')) test_data = pd.read_csv(os.path.join(input_path, 'test.csv'))
pandas_profiling
을 이용하여, 데이터의 프로파일링 리포트를 출력하겠습니다.
In[] :
import pandas_profiling train_data.profile_report()
프로파일링 리포트의 결과는 다음과 같습니다. 단 한 줄의 명령어로 데이터를 분석해 볼 수 있습니다.

리포트 결과를 보면 알 수 있듯이, 여러 타입의 데이터가 존재하고, 결측값도 존재하는 것을 확인할 수 있습니다.
데이터 처리
캐글에서 좋은 점수를 받는게 목적이 아니기 때문에, 과감히 ojbect
타입의 데이터를 삭제하겠습니다.
In []:
train_data = train_data.drop([], axis=1).select_dtypes(exclude=['object']) train_X = train_data.drop(['SalePrice'], axis=1) train_y = train_data['SalePrice'] test_data = test_data.drop([], axis=1).select_dtypes(exclude=['object'])
결측값들도 SimpleImputer
를 이용하여 간단히 처리하겠습니다. strategy='median'
을 지정하였기 때문에 결측값들은 중간값으로 대치됩니다.
In []:
imputer = SimpleImputer(strategy='median') imputer.fit(train_X) train_X = pd.DataFrame(imputer.transform(train_X), columns=train_X.columns, index=train_X.index) train_data = pd.concat([train_X, train_y], axis=1) test_data = pd.DataFrame(imputer.transform(test_data), columns=test_data.columns, index=test_data.index) train_data = train_data.astype({'Id': int}) test_data = test_data.astype({'Id': int})
모델 학습 (Train)
준비한 데이터를 이용하여 모델을 학습해 보겠습니다.
먼저, 모델 학습을 위하여 데이터를 학습 세트와 테스트 세트로 나누겠습니다. sklearn
의 train_test_split()
를 사용하면, 간단히 나눌 수 있습니다.
In []:
from sklearn.model_selection import train_test_split X = train_data.drop(['Id', 'SalePrice'], axis=1) y = train_data['SalePrice'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
xgboost
의 XGBRegressor
을 사용하여, 모델을 학습 시켜 보겠습니다.
In []:
from xgboost import XGBRegressor model = XGBRegressor() model.fit(X_train, y_train, eval_set=[(X_test, y_test)])
정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.
[0] validation_0-rmse:180608 [1] validation_0-rmse:164048 ... [98] validation_0-rmse:25150.7 [99] validation_0-rmse:25156.1
이제 캐글에서 제공한 test
데이터를 가지고 예측해 보도록 하겠습니다.
pred = model.predict(test_data.drop(['Id'], axis=1))
예측한 결과물을 가지고, 캐글에 제공할 submission.csv
파일을 생성해 보겠습니다.
submission = pd.DataFrame({'Id': test_data['Id'], 'SalePrice': pred}) submission.to_csv('submission.csv', index=False)
submission
을 조회해 보겠습니다.
submission.head()
정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.

생성한 submission.csv
파일을 캐글에 제출하겠습니다. kaggle
명령어를 사용하여 제출합니다. kaggle
명령어가 PATH
에 포함되어 있지 않기 때문에 전체 경로를 적어 주었습니다.
!/home/jovyan/.local/bin/kaggle competitions submit -c house-prices-advanced-regression-techniques -f submission.csv -m "Message"
캐글의 “My Submissions” 탭을 클릭하면, 제출한 내용들을 확인할 수 있습니다.

파이프라인 만들기
앞서 작성한 코드들을 바탕으로 하여 파이프라인을 구성해 보겠습니다. 각각의 단계를 컴포넌트로 구성한 다음, 파이프라인을 작성하고 실행해 보겠습니다.
파이프라인의 단계는 다음과 같습니다.
- 데이터 다운로드 : 캐글에서 데이터를 다운로드 합니다.
- 데이터 압축풀기 : 캐글에서 다운로드한 데이터의 압축을 풉니다.
- 데이터 변환 :
train.csv
과test.csv
의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다. - 모델 학습 : 변환된
train.csv
데이터를 이용하여 모델을 학습니다. - 예측 : 변환된
test.csv
데이터를 이용하여 예측합니다. 그리고 예측한 결과를submission.csv
파일로 저장합니다. - 캐글 제출 : 생성된
submission.csv
파일을 캐글에 제출합니다.
기본 이미지 만들기
데이터 변환, 모덱 학습, 예측 컴포넌트에서 사용할 기본 이미지를 만들어 보겠습니다. 파이프라인에서도 필요한 이미지를 빌드할 수 있지만, 기본 이미지를 만들어서 사용하는게 더 효율적입니다. docker
명령어를 이용하여, 컨테이너 이미지를 빌드하기 때문에, docker
명령어가 실행 가능한 곳에서 작업을 해야합니다.
먼저 베이스 이미지 디렉토리를 만듭니다.
mkdir -p ~/workspace/base/xgboost cd ~/workspace/base/xgboost
필요한 파이썬 패키지 목록을 requirements.txt
파일로 작성합니다.
requirements.txt
scikit-learn joblib numpy pandas fire xgboost
컨테이너 이미지 빌드를 위하여 Dockerfile
파일을 작성합니다. 파이썬 패키지 목록인 requirements.txt
파일을 추가하고, 해당 파일을 이용하여 패키지를 설치합니다.
Dockerfile
FROM python:3.6-slim RUN apt-get update -y && \\ apt-get install -y libgomp1 WORKDIR /app COPY requirements.txt /app RUN pip --no-cache-dir install -r requirements.txt
컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하기 위하여 build_image.sh
파일을 작성합니다.
build_image.sh
#!/bin/bash -e image_name=kangwoo/xgboost image_tag=0.0.1 full_image_name=${image_name}:${image_tag} base_image_tag=3.6-slim cd "$(dirname "$0")" docker build --build-arg BASE_IMAGE_TAG=$base_image_tag -t "$full_image_name" . docker push "$full_image_name"
build_image.sh
파일을 실행하여, 컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하겠습니다.
chmod +x build_image.sh ./build_image.sh
데이터 전처리 파이프라인 작성하기
데이터 전처리 파이프라인에서는 세개의 컴포넌트를 사용합니다.
- Download : 캐글에서 데이터를 다운로드 받습니다.
- Unzip : 압축을 풉니다.
- Transofrm : 데이터를 전처리한 후 S3에 저장합니다.
모든 데이터를 S3에 저장할 수 있으나, 앞서 만든 Download
컴포넌트와 Unzip
컴포넌트가 S3를 지원하기 않기 때문에, 퍼시스턴스 볼륨과 S3를 같이 사용하였습니다. AWS 접속하여 사용할 S3 버킷을 생성합니다.
파이프라인의 전체적인 흐름은 다음과 같습니다.

파이프라인의 데이터 변환 작업이 kubeflow
네임스페이스 실행되므로, 해당 작업이 S3에 접근할 수 있도록, kubeflow
네임스페이스에 aws-secret
을 생성해줘야합니다.
export AWS_ACCESS_KEY_ID=<YOUR_AWS_ACCESS_KEY_ID> export AWS_SECRET_ACCESS_KEY=<YOUR_AWS_SECRET_ACCESS_KEY> kubectl -n kubeflow create secret generic aws-secret \\ --from-literal=AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \\ --from-literal=AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
데이터 전처리 파이프라인 컴포넌트 작성하기
주피터 노트북을 생성합니다. 파일 이름은 transform_pipeline_s3.ipynb
입니다. 노트북의 위치는 ~/workspace/house-prices
입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices
에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.
패키지 추가
파이프라인과 컴포넌트를 사용하기 위한 패키지를 추가합니다.
In []:
from kfp import components from kfp import dsl
캐글 데이터 다운로드 컴포넌트
캐글에서 데이터를 다운로드 하는 컴포넌트를 정의합니다. 앞서 “타이타닉 생존 예측”에서 만들 컴포넌트를 재사용 합니다.
In []:
download_op = components.load_component_from_file('../components/kaggle/competitions_downloader/component.yaml')
데이터 압축풀기 컴포넌트
unzip
명령어를 사용하여 압축을 풀겠습니다.
In []:
def unzip_op(filename, exdir): return dsl.ContainerOp(name='Unzip', image='garthk/unzip:latest', command=['unzip', '-o', filename, '-d', exdir])
데이터 변환 컴포넌트
이번에는 [transform.py](<http://transform.py>)
라는 파이썬 파일을 생성한 후, 페어링을 이용하여 컨테이너 이미지를 빌드한 다음, 파이프라인에서 사용하도록 하겠습니다.
train.csv
과 test.csv
의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다. 캐글 데이터의 경로를 input_path
파라미터로 입력 받아 데이터를 변환합니다. 데이터 변환에 쓰이는 코드들은 앞에서 작성한 코드와 동일합니다. 변환할 데이터를 저장할 경로를 output_path
파라미터로 입력 받아 변환된 데이터를 저장합니다. 컨테이너 이미지를 빠르게 빌드하기 위하여, 필요한 패키지가 포함된 기본 이미지를 미리 만들어서 사용하였습니다.
주피터 노트북 셀에서 %%writefile transform.py
을 이용하여transform.py
파일을 생성합니다.
In []:
%%writefile transform.py import argparse import os from tempfile import TemporaryDirectory import pandas as pd from sklearn.impute import SimpleImputer def transform(input_path, output_path): train_data = pd.read_csv(os.path.join(input_path, 'train.csv')) test_data = pd.read_csv(os.path.join(input_path, 'test.csv')) train_data.dropna(axis=0, subset=['SalePrice'], inplace=True) train_data = train_data.drop([], axis=1).select_dtypes(exclude=['object']) train_X = train_data.drop(['SalePrice'], axis=1) train_y = train_data['SalePrice'] test_data = test_data.drop([], axis=1).select_dtypes(exclude=['object']) imputer = SimpleImputer(strategy='median') imputer.fit(train_X) train_X = pd.DataFrame(imputer.transform(train_X), columns=train_X.columns, index=train_X.index) train_data = pd.concat([train_X, train_y], axis=1) test_data = pd.DataFrame(imputer.transform(test_data), columns=test_data.columns, index=test_data.index) train_data = train_data.astype({'Id': int}) test_data = test_data.astype({'Id': int}) access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] from minio import Minio minio_client = Minio('s3.amazonaws.com', access_key=access_key, secret_key=secret_key) from urllib.parse import urlparse url = urlparse(output_path, allow_fragments=False) bucket_name = url.netloc object_name = url.path.lstrip('/') with TemporaryDirectory() as tmpdir: tmp_train_data = os.path.join(tmpdir, 'train.csv') tmp_test_data = os.path.join(tmpdir, 'test.csv') train_data.to_csv(tmp_train_data, index=False) test_data.to_csv(tmp_test_data, index=False) minio_client.fput_object(bucket_name, os.path.join(object_name, 'train.csv'), tmp_train_data) minio_client.fput_object(bucket_name, os.path.join(object_name, 'test.csv'), tmp_test_data) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--input_path', default='./kaggle', type=str) parser.add_argument('--output_path', default='./input', type=str) args = parser.parse_args() transform(args.input_path, args.output_path)
변환이 완료된 데이터를 S3에 저장하기 위하여, minio
라이브러리를 사용하였습니다. 그리고 S3에 접속하기 위한 자격 증명 정보는 AWS_ACCESS_KEY_ID
와 AWS_SECRET_ACCESS_KEY
라는 환경 변수를 이용하여 넘겨받습니다.
페어링을 이용하여, transform.py
파일이 포함된 컨테이너 이미지를 빌드하고 푸시하겠습니다. 주피터 노트북에서 페어링을 이용하여, 컨테이너 이미지를 푸시하려면, 별도의 설정이 되어 있야 합니다. 만약 설정이 되어 있지 않다면, “05-Kubeflow Fairing – 주피터 노트북에서 Kubeflow Fairing 설정하기“를 참고하셔서 설정하시기 바랍니다.
In []:
import uuid from kubeflow import fairing from kubeflow.fairing.kubernetes import utils as k8s_utils from kubeflow.fairing.preprocessors import base from kubeflow.fairing.builders.append import append CONTAINER_REGISTRY = 'kangwoo' preprocessor = base.BasePreProcessor(executable="transform.py") builder = append.AppendBuilder(registry=CONTAINER_REGISTRY, image_name="house-prices-transform", base_image="kangwoo/xgboost:0.82", preprocessor=preprocessor) builder.build() transform_image = builder.image_tag print(transform_image)
데이터 전처리 파이프라인 생성하기
PVC와 캐글 토큰 확인하기
파이프라인 컴포넌트에서 이 PVC를 이용하여, 데이터를 저장하고 읽어 올 것입니다. PVC는 dsl.VolumeOp()
를 이용하여 생성한 후, 작업이 끝나면 삭제하도록 하겠습니다.
그리고 캐글 API 사용을 위한 토큰도 앞서 “타이타닉 생존 예측”에서 생성한 kaggle-secret
을 사용하겠습니다.
파이프라인 작성하기
주피터 노트북으로 돌아가서 파이프라인을 작성해 보겠습니다.
In []:
import os import string import kfp import kfp.dsl as dsl from kfp import components from kfp import onprem from kfp import aws from kubernetes.client.models import V1Volume from kubernetes.client.models import V1SecretVolumeSource from kubernetes.client.models import V1VolumeMount @dsl.pipeline( name='House Prices Transofrm Pipeline', description='House Prices Transofrm Pipeline' ) def house_prices_transform_pipeline(): secret_name = "aws-secret" competition_name = 'house-prices-advanced-regression-techniques' kaggle_data_path = os.path.join('/data/competitions', competition_name, 'kaggle') version = 'v0.0.1' ouput_data_path = os.path.join('s3://kfp-bucket/data/competitions', competition_name, 'input', version) volume_task = dsl.VolumeOp( name="house-prices-volume", resource_name="house-prices-pvc", modes=dsl.VOLUME_MODE_RWO, size="100Mi" ) download_task = download_op(competition=competition_name, path=kaggle_data_path)\\ .add_pvolumes({"/data": volume_task.volume})\\ .add_volume(V1Volume(name='kaggle-secret', secret=V1SecretVolumeSource(secret_name='kaggle-secret')))\\ .add_volume_mount(V1VolumeMount(name='kaggle-secret', mount_path='/root/.kaggle')) with dsl.Condition(download_task.outputs['downloaded'] == 'True'): unzip_task = unzip_op(os.path.join(kaggle_data_path, competition_name + '.zip'), kaggle_data_path)\\ .add_pvolumes({"/data": download_task.pvolume})\\ transform_task = dsl.ContainerOp( name='Transform', image=transform_image, command=['python', '/app/transform.py'], arguments=['--input_path', kaggle_data_path, '--output_path', ouput_data_path], pvolumes={"/data": unzip_task.pvolume} ).apply(aws.use_aws_secret(secret_name, aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY')) volume_task.delete().after(transform_task) if __name__ == '__main__': kfp.compiler.Compiler().compile(house_prices_transform_pipeline, 'house-prices-transform.zip') client = kfp.Client() my_experiment = client.create_experiment(name='House Prices Experiment') my_run = client.run_pipeline(my_experiment.id, 'House Prices Transform Pipeline', 'house-prices-transform.zip')

생성된 링크를 클릭하시면, 다음과 같은 화면을 확인할 수 있습니다.

S3 저장소의 해당 버킷을 조회해 보면, 다음과 같이 2개의 데이터 파일이 생성된 것을 확인할 수 있습니다.

모델 학습 및 캐글 제출 파이프라인 컴포넌트 작성하기
모델 학습 및 캐글 제출 파이프라인에서는 세개의 컴포넌트를 사용합니다.
- Katib Launcher: 하이퍼파리미터 튜닝을 위하여,
Experiment
를 생성해 줍니다. - HP Out: 하이퍼파리미터 튜닝 결과 값을 출력해 줍니다.
- Train: 하이퍼파리미터 값을 입력 받아 모델을 학습합니다. 학습한 모델은 S3에 저장합니다.
- Predict : S3에서 모델을 가져와서, 예측을 수행합니다. 예측 결과 값을 퍼시스턴스 볼륨에 저장합니다.
- Submit : 예측 결가 값을 캐글에 제출합니다.
모든 데이터를 S3에 저장할 수 있으나, 앞서 만든 Submit
컴포넌트가 S3를 지원하기 않기 때문에, 퍼시스턴스 볼륨과 S3를 같이 사용하였습니다. 그리고 하이퍼파라미터 튜닝을 위한 Experiment
리소스를 파이프라인이 실행되는 kubeflow
네임스가 아닌, 별도의 admin
네임스페이스에 생성하였습니다.
파이프라인의 전체적인 흐름은 다음과 같습니다.

하이퍼 파리미터 튜닝 작업이 admin
네임스페이스 실행되므로, 해당 작업들이 S3에 접근할 수 있도록, admin
네임스페이스에도 aws-secret
을 생성해줘야합니다.
export AWS_ACCESS_KEY_ID=<YOUR_AWS_ACCESS_KEY_ID> export AWS_SECRET_ACCESS_KEY=<YOUR_AWS_SECRET_ACCESS_KEY> kubectl -n admin create secret generic aws-secret \\ --from-literal=AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \\ --from-literal=AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
모델 학습 및 캐글 제출 파이프라인 컴포넌트 작성하기
주피터 노트북을 생성합니다. 파일 이름은 train_pipeline_s3.ipynb
입니다. 노트북의 위치는 ~/workspace/house-prices
입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices
에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.
모델 학습과 예측 컴포넌트
train.py
파일을 생성한 후, 페어링을 이용하여 컨테이너 이미지를 빌드한 다음, 파이프라인에서 사용하도록 하겠습니다.
변환된 train.csv
데이터를 이용하여 모델을 학습니다. 변환된 데이터의 경로를 path
파리미터로 입력받에 데이터를 읽어옵니다. 그리고 데이터를 학습 세트와 테스트 세트로 나눈다음, 모델을 학습니다. 그리고 하이퍼 파라미터로서 n_estimators
와 learning_rate
를 입력 받아, 모델 학습시 사용합니다. 그리고 mode
를 입력 받아, train
일 경우에는 모델 학습을, predict
일 경우에는 예측을 실행합니다. S3에서 데이터를 읽고, 저장하기 위하여 minio
라이브러리를 사용하였습니다.
주피터 노트북 셀에서 %%writefile train.py
을 이용하여 train.py
파일을 생성합니다.
In []:
%%writefile train.py import argparse import json import os from datetime import datetime, timezone from tempfile import TemporaryDirectory import pandas as pd from sklearn.metrics import r2_score from sklearn.model_selection import train_test_split from xgboost import XGBRegressor def read_train_input(input_path): minio_client = get_minio_client() bucket_name, object_name = get_bucket_name_and_object_name(input_path) with TemporaryDirectory() as tmpdir: tmp_train_data_file = os.path.join(tmpdir, 'train.csv') minio_client.fget_object(bucket_name, os.path.join(object_name, 'train.csv'), tmp_train_data_file) data = pd.read_csv(tmp_train_data_file) X = data.drop(['Id', 'SalePrice'], axis=1) y = data['SalePrice'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) return X_train, y_train, X_test, y_test def read_test_input(input_path='./input'): minio_client = get_minio_client() bucket_name, object_name = get_bucket_name_and_object_name(input_path) with TemporaryDirectory() as tmpdir: tmp_test_data_file = os.path.join(tmpdir, 'test.csv') minio_client.fget_object(bucket_name, os.path.join(object_name, 'test.csv'), tmp_test_data_file) data = pd.read_csv(tmp_test_data_file) X = data.drop(['Id'], axis=1) id = data['Id'] return X, id def train_model(X_train, y_train, X_test, y_test, n_estimators, learning_rate, early_stopping_rounds): model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate) model.fit(X_train, y_train, early_stopping_rounds=early_stopping_rounds, eval_set=[(X_test, y_test)]) print('Best RMSE on eval: {} with {} rounds'.format(model.best_score, model.best_iteration + 1)) return model def eval_model(model, X_test, y_test): predictions = model.predict(X_test) local_time = datetime.now(timezone.utc).astimezone().isoformat() score = r2_score(predictions, y_test) print('{} r2_score={}'.format(local_time, score)) return score MODE_FILENAME = 'model.bst' def load_model(model_path): minio_client = get_minio_client() bucket_name, object_name = get_bucket_name_and_object_name(model_path) model = XGBRegressor() with TemporaryDirectory() as tmpdir: tmp_model_file = os.path.join(tmpdir, MODE_FILENAME) minio_client.fget_object(bucket_name, os.path.join(object_name, MODE_FILENAME), tmp_model_file) model.load_model(tmp_model_file) print('Load modelfrom ', os.path.join(model_path, MODE_FILENAME)) return model def save_model(model, model_path): minio_client = get_minio_client() bucket_name, object_name = get_bucket_name_and_object_name(model_path) with TemporaryDirectory() as tmpdir: tmp_model_file = os.path.join(tmpdir, MODE_FILENAME) model.save_model(tmp_model_file) minio_client.fput_object(bucket_name, os.path.join(object_name, MODE_FILENAME), tmp_model_file) print('Save model to', os.path.join(model_path, MODE_FILENAME)) def get_minio_client(): access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] from minio import Minio return Minio('s3.amazonaws.com',access_key=access_key, secret_key=secret_key) def get_bucket_name_and_object_name(path): from urllib.parse import urlparse url = urlparse(path, allow_fragments=False) bucket_name = url.netloc object_name = url.path.lstrip('/') return bucket_name, object_name class ModelServe(object): def __init__(self, model_path=None, n_estimators=100, learning_rate=0.1, early_stopping_rounds=40): self.model_path = model_path self.n_estimators = n_estimators self.learning_rate = learning_rate self.early_stopping_rounds = early_stopping_rounds print("model_path={}".format(self.model_path)) print("n_estimators={}".format(self.n_estimators)) print("learning_rate={}".format(self.learning_rate)) print("early_stopping_rounds={}".format(self.early_stopping_rounds)) self.model = None # self._workspace = None # self.exec = self.create_execution() def train(self, X_train, y_train, X_test, y_test): # self.exec.log_input(metadata.DataSet( # description="xgboost synthetic data", # name="synthetic data", # owner="someone@kubeflow.org", # uri="file://path/to/dataset", # version="v0.0.1")) model = train_model(X_train, y_train, X_test, y_test, self.n_estimators, self.learning_rate, self.early_stopping_rounds) score = eval_model(model, X_test, y_test) # self.exec.log_output(metadata.Metrics( # name="traing- valuation", # owner="someone@kubeflow.org", # description="training evaluation for xgboost synthetic", # uri="file://path/to/metrics", # metrics_type=metadata.Metrics.VALIDATION, # values={"mean_absolute_error": mae})) if self.model_path: save_model(model, self.model_path) # self.exec.log_output(metadata.Model( # name="model", # description="prediction model using synthetic data", # owner="someone@kubeflow.org", # uri='file://path/to/model', # model_type="linear_regression", # training_framework={ # "name": "xgboost", # "version": "0.90" # }, # hyperparameters={ # "learning_rate": self.learning_rate, # "n_estimators": self.n_estimators # }, # version=datetime.utcnow().isoformat("T"))) def predict(self, X, feature_names): if not self.model: self.model = load_model(self.model_path) # Do any preprocessing prediction = self.model.predict(data=X) # Do any postprocessing return prediction # @property # def workspace(self): # if not self._workspace: # self._workspace = create_workspace() # return self._workspace # def create_execution(self): # r = metadata.Run( # workspace=self.workspace, # name="xgboost-synthetic-run" + datetime.utcnow().isoformat("T"), # description="a notebook run") # return metadata.Execution( # name = "execution" + datetime.utcnow().isoformat("T"), # workspace=self.workspace, # run=r, # description="execution for training xgboost-synthetic") if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--input_path', default='./input', type=str) parser.add_argument('--hyperparameters', required=False, type=str) parser.add_argument('--n_estimators', default='100', type=int) parser.add_argument('--learning_rate', default='0.1', type=float) parser.add_argument('--model_path', required=False, type=str) parser.add_argument('--mode', default='train', choices=['train', 'predict']) parser.add_argument('--submission_path', default='./', type=str) args = parser.parse_args() if args.hyperparameters: hp_json = json.loads(args.hyperparameters) print("use json hyperparameters", hp_json) hyperparameters = {} for n in hp_json: hyperparameters[n['name']] = n['value'] n_estimators = int(hyperparameters['--n_estimators']) learning_rate = float(hyperparameters['--learning_rate']) else: n_estimators = args.n_estimators learning_rate = args.learning_rate if args.mode == 'predict': test, id = read_test_input(args.input_path) model = ModelServe(model_path=args.model_path) pred = model.predict(test, None) submission = pd.concat([id, pd.Series(pred, name='SalePrice')], axis=1) print(submission) if not os.path.exists(args.submission_path): os.makedirs(args.submission_path) submission.to_csv(os.path.join(args.submission_path, 'submission.csv'), index=False) else: X_train, y_train, X_test, y_test = read_train_input(args.input_path) model = ModelServe(model_path=args.model_path, n_estimators=n_estimators, learning_rate=learning_rate) model.train(X_train, y_train, X_test, y_test)
페어링을 이용하여, [train.py](<http://train.py>)
파일이 포함된 컨테이너 이미지를 빌드하고 푸시하겠습니다. 주피터 노트북에서 페어링을 이용하여, 컨테이너 이미지를 푸시하려면, 별도의 설정이 되어 있야 합니다. 만약 설정이 되어 있지 않다면, “05-Kubeflow Fairing – 주피터 노트북에서 Kubeflow Fairing 설정하기“를 참고하셔서 설정하시기 바랍니다.
In []:
import uuid from kubeflow import fairing from kubeflow.fairing.kubernetes import utils as k8s_utils from kubeflow.fairing.preprocessors import base from kubeflow.fairing.builders.append import append CONTAINER_REGISTRY = 'kangwoo' preprocessor = base.BasePreProcessor(executable="train.py") builder = append.AppendBuilder(registry=CONTAINER_REGISTRY, image_name="house-prices-train", base_image="kangwoo/xgboost:0.82", preprocessor=preprocessor) builder.build() train_image = builder.image_tag print(train_image)
파이프라인 생성하기
파이프라인 작성하기
파이프라인을 작성해 보겠습니다. Kubeflow 에서 제공하는 katib-launcher
컴포넌트를 이용하여, 하이퍼 파리미터 튜닝 작업을 실행합니다. 튜닝 작업이 끝난 후, 가장 좋은 점수의 하이퍼 파라미터값을 넘겨 받습니다. 이 값을 이용하여 모델을 다시 학습한 후, 캐글에 제출할 예측 데이터를 생성합니다.
In []:
import os import string import kfp import kfp.dsl as dsl from kfp import components from kfp import onprem from kfp import aws from kubernetes.client.models import V1Volume from kubernetes.client.models import V1SecretVolumeSource from kubernetes.client.models import V1VolumeMount @dsl.pipeline( name='House Prices Train Pipeline', description='House Prices Train Pipeline' ) def house_prices_train_pipeline(name="house-prices-train", namespace="admin", goal=0.95, parallel_trial_count=2, max_trial_count=12, experiment_timeout_minutes=60, delete_after_done=True): secret_name = "aws-secret" competition_name = 'house-prices-advanced-regression-techniques' data_version='v0.0.1' input_data_path = os.path.join('s3://kfp-bucket/data/competitions', competition_name, 'input', data_version) objective_config = { "type": "maximize", "goal": goal, "objectiveMetricName": "r2_score" } algorithm_config = {'algorithmName' : 'random'} parameters = [ {"name": "--learning_rate", "parameterType": "double", "feasibleSpace": {"min": "0.01","max": "0.2"}}, {"name": "--n_estimators", "parameterType": "int", "feasibleSpace": {"min": "10", "max": "200"}} ] metrics_collector = { "collector": { "kind": "StdOut" } } trial_template_params = {'train_image':train_image, 'secret_name': secret_name, 'input_data_path': input_data_path} trial_template = string.Template(''' goTemplate: rawTemplate: |- apiVersion: batch/v1 kind: Job metadata: name: {{.Trial}} namespace: {{.NameSpace}} spec: template: spec: containers: - name: {{.Trial}} image: $train_image command: - "python" - "/app/train.py" - "--input_path" - $input_data_path {{- with .HyperParameters}} {{- range .}} - "{{.Name}}={{.Value}}" {{- end}} {{- end}} env: - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: $secret_name key: AWS_ACCESS_KEY_ID - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: $secret_name key: AWS_SECRET_ACCESS_KEY restartPolicy: Never ''').safe_substitute(trial_template_params) katib_experiment_launcher_op = components.load_component_from_url('<https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml>') hp_task = katib_experiment_launcher_op( experiment_name=name, experiment_namespace=namespace, parallel_trial_count=parallel_trial_count, max_trial_count=max_trial_count, objective=str(objective_config), algorithm=str(algorithm_config), trial_template=str(trial_template), parameters=str(parameters), metrics_collector=str(metrics_collector), experiment_timeout_minutes=experiment_timeout_minutes, delete_finished_experiment=delete_after_done) hp_out_task = dsl.ContainerOp( name="hp out", image="library/bash:4.4.23", command=["sh", "-c"], arguments=["echo hyperparameter: %s" % hp_task.output], ) model_version = 'v0.0.1' model_path = os.path.join('s3://kfp-bucket/data/competitions', competition_name, 'models', model_version) train_task = dsl.ContainerOp( name='train', image=train_image, command=['python', '/app/train.py'], arguments=['--input_path', input_data_path, '--hyperparameters', hp_task.output, '--model_path', model_path] ).apply(aws.use_aws_secret(secret_name, aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY')) submission_path = os.path.join('/data/competitions', competition_name, 'submissions', model_version) volume_task = dsl.VolumeOp( name="house-prices-volume", resource_name="house-prices-pvc", modes=dsl.VOLUME_MODE_RWO, size="100Mi" ) predict_task = dsl.ContainerOp( name='predict', image=train_image, command=['python', '/app/train.py'], arguments=['--input_path', input_data_path, '--mode', 'predict', '--model_path', model_path, '--submission_path', submission_path], pvolumes={"/data": volume_task.volume} ).apply(aws.use_aws_secret(secret_name, aws_access_key_id_name='AWS_ACCESS_KEY_ID', aws_secret_access_key_name='AWS_SECRET_ACCESS_KEY')).after(train_task) submit_op = components.load_component('../components/kaggle/competitions_submit/component.yaml') submit_task = submit_op(competition=competition_name, path=submission_path, message='RunId {{workflow.uid}}')\\ .add_pvolumes({"/data": predict_task.pvolume})\\ .add_volume(V1Volume(name='kaggle-secret', secret=V1SecretVolumeSource(secret_name='kaggle-secret')))\\ .add_volume_mount(V1VolumeMount(name='kaggle-secret', mount_path='/root/.kaggle')).after(predict_task) volume_task.delete().after(submit_task) if __name__ == '__main__': kfp.compiler.Compiler().compile(house_prices_train_pipeline, 'house-prices-train.zip') client = kfp.Client() my_experiment = client.create_experiment(name='House Prices Experiment') my_run = client.run_pipeline(my_experiment.id, 'House Prices Train Pipeline', 'house-prices-train.zip')

생성된 링크를 클릭하시면, 다음과 같은 화면을 확인할 수 있습니다.

Katib 모니터 화면에서 튜닝 결과도 확인할 수 있습니다.

“hp-out” 단계를 클릭하면, 다음과 같은 하이퍼파리미터 튜닝 결과 값을 확인해 볼 수 있습니다.
hyperparameter: [{name: --learning_rate, value: 0.1985601684459975}, {name: --n_estimators, value: 183}]
파이프라인 실행이 완료되면, 예측 결과가 캐글에 제출 됩니다.
캐글의 “My Submissions” 탭을 클릭하면, 제출한 내용들을 확인할 수 있습니다.
