잠시 빌려쓰고, 낡아지면 버려지는 이 한몸. 빈손으로 태어나 결국 빈손으로 털고 돌아가는, 이 인생의 고갯길은, 그 어떤것도 내 것이 될 수 없고, 누구의 것도 될 수 없는, 구름과 바람같은 덧없는 인생살이인데, 하물며 이 마음은 오죽하랴. 그냥 잠시 허허 웃고 살다가리
이 설정 파일을 이용하여, Kubeflow를 설치하면 멀티 사용자 기능을 사용할 수 있습니다. 사용자 인증을 위하여 dex 라는 것을 이용하고 있습니다. dex 는 OpenID Connect 를 지원하는 식별 서비스로서, LDAP, Github, SAML 등의 여러 가지의 인증 방식과 연동이 가능합니다. 기존에 사용하고 있는 인증 방식을 Kubeflow에 연동할 수 있습니다. 즉, 회사에서 LDAP 이라는 인증 방식을 사용하고 있었다면, dex와 LDAP 을 연동해서, LDAP에서 사용하던 계정으로, kubeflow에 로그인할 수 있다는 것입니다.
사용자 확인하기
Kubeflow와 함께 설치되는 dex 는 사용자 정보를 파일에 직접 저장하고 있습니다. 이 파일은 ConfigMap에 저장되어 있습니다.
dex는 쿠버네티스의 auth 라는 네임스페이스 설치됩니다. dex 라는 ConfigMap을 조회하면 설정 정보를 볼 수 있습니다.
staticPasswords 필드를 보면 사용자 이메일과 패스워드의 해시값을 확인할 수 있습니다. 패스워드는 “BCrypt”를 이용하여 해시값을 생성한 후 사용하고 있습니다. 참고로 설치할때 기본으로 생성되는 이메일은 “admin@kubeflow.org“이고, 패스워드는 “12341234”입니다.
이 정보를 이용해서 다음과 같은 화면에서 로그인할 수 있습니다.
만약, kubeflow를 외부망에 노출시킬것이라면, 꼭 이 기본 계정의 패스워드를 변경해야합니다.
패스워드 해시값 생성하기
패스워드 해시값은 다양한 방법으로 만들수 있습니다.
만약 파이썬을 사용하고 있다면 bcrypt 라이브러리를 사용할 수 있습니다.
다음은 bcrypt 패키지를 설치하고, “PASSWORD”의 해시값을 생성하는 예제입니다.
즉, 두 개의 Pod은 같은 것이지만, DeepEqual 을 이용하여 비교를 하면 다른것으로 판별됩니다.
두 개의 Pod가 진짜 다른것인지를 판별하기 위해서는 다음과 같이 할 수도 있습니다.
if sameName(expected, actual) &&
sameContainers(expected, actual) && ...
필요한 부분만 직접 비교를 하는 것입니다. 비교할 필드가 1,2개 정도면 이 방법도 사용할 수 있습니다만, 현실세계에서는 대부분 많은 필드를 가지고 있기 때문에 효율적이지 않습니다.
두 리소스를 비교하는 가장 스마트한 방법은 리소스의 Hash값을 이용하는 것입니다. 다음은 HashObject 함수를 이용하여, 리소스의 해시값을 계산한 후 비교하는 코드입니다.
hash := HashObject(expected)
expected.Annotations["ResourceHash"] = hash
actualHash := actual.Annotations["ResourceHash"]
if actualHash != hash {
// need to update...
}
다음은 HashObject 함수입니다.
// HashObject writes the specified object to a hash using the spew library
// which follows pointers and prints actual values of the nested objects
// ensuring the hash does not change when a pointer changes.
// The returned hash can be used for object comparisons.
//
// This is inspired by controller revisions in StatefulSets:
// <https://github.com/kubernetes/kubernetes/blob/8de1569ddae62e8fab559fe6bd210a5d6100a277/pkg/controller/history/controller_history.go#L89-L101>
func HashObject(object interface{}) string {
hf := fnv.New32()
printer := spew.ConfigState{
Indent: " ",
SortKeys: true,
DisableMethods: true,
SpewKeys: true,
}
_, _ = printer.Fprintf(hf, "%#v", object)
return fmt.Sprint(hf.Sum32())
}
ARP 캐시는 로컬 네트워크에서, 패킷을 효율적으로 전환할 수 있도록 도와줍니다. 시스템이 어떤 IPv4 주소가 어떤 MAC 또는 하드웨어 주소를 가지는지를 파악하는 방법입니다. 예를 든다면, “10.233.134.150” 라는 IP는 “fe:16:48:a9:a6:5b”는 하드웨어 주소를 가지고 있는것입니다.
이 캐시의 내용은 arp -n 또는 /proc/net/arp 또는 ip -4 neigh 를 실행하면 확인할 수 있습니다.
명령어 실행 결과
다음 명령어를 실행하면 결과입니다.
arp -n
# arp -n
Address HWtype HWaddress Flags Mask Iface
10.233.134.149 ether fe:16:48:a9:a6:5b C eth0
10.233.176.254 ether fe:16:48:a9:a6:5b C eth0
...
10.233.134.150 ether fe:16:48:a9:a6:5b C eth0
10.233.134.119 ether fe:16:48:a9:a6:5b C eth0
10.244.4.0 ether 8e:c4:59:6d:ee:83 CM flannel.1
...
# ip -4 neigh
10.233.134.149 dev eth0 lladdr fe:16:48:a9:a6:5b REACHABLE
10.233.176.254 dev eth0 lladdr fe:16:48:a9:a6:5b STALE
...
10.233.134.119 dev eth0 lladdr fe:16:48:a9:a6:5b REACHABLE
10.244.4.0 dev flannel.1 lladdr 8e:c4:59:6d:ee:83 PERMANENT
...
캐시 크기 기본값
대부분의 경우 이 캐시의 크기는 중요하지 않습니다. 하지만, 많은 디바이스가 있는 네트워크 세그먼트의 경우, 이 옵션을 조정해줘야 원활한 서비스를 할 수 있습니다.
쿠버네티스 같은 컨테이너 환경에서 많은 컨티이너를 사용하거나, 많은 대상과 네트워크 통신이 필요할 때 이 값을 조정해줘야합니다.
(저의 경우 쿠버네티스에서 flannel.1 TX dropped 이 발생하여, 원인을 찾던 중에 발견하게 되었습니다.)
ARP 캐시의 기본값을 1024입니다. /proc/sys/net/ipv4/neigh/default/gc_thresh3 에서 확인할 수 있습니다.
LENS(https://k8slens.dev/) 를 사용하다가, 쿠버네티스 노드에 root 권한의 shell로 접속할 수 있는 기능을 발견하였습니다.
이게 어떻게 가능한 일이지 궁금해서, 간단히 조사를 해보았습니다.
알고 보니 원리는 간단하였습니다.
권한이 있는 컨테이너(privileged containers)를 실행시키고, 그 컨테이너로 접속을 하는 것이었습니다.
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
8f5f3c2b4c08 alpine "nsenter -t 1 -m -u …" 53 seconds ago Up 52 seconds k8s_shell_node-shell-1fa79f9a-837e-4f46-a66a-c98e05fb5e5e_kube-system_2f3e6447-3046-11eb-951e-fa164857d866_0
TFX는 파이프라인 컴포넌트를 만드는 데 사용할 수 있는 파이썬 라이브러리를 제공하고 있습니다. 이 라이브러리를 사용하여 파이프라인의 컴포넌트를 생성하면, 보다 쉽게 파이프라인을 구성할 수 있으며, 사용자는 사용자 코드를 작성하는데 보다 더 집중할 수 있습니다.
TFX 라이브러리는 다음과 같습니다.
TensorFlow Data Validation (TFDV) : TFDV는 머신러닝 데이터를 분석하고 검증하는 것을 도와주는 라이브러리 입니다. 확장성이 뛰어나고 텐서플로우, TFX와 잘 작동하도록 설계되었습니다. TFDV는 다음을 포함하고 있습니다.
학습 및 테스트 데이터의 요약 통계에 대한 계산.
데이터 분산 및 통계를 위한 뷰어와의 통합 및 데이터 세트 쌍(Facets)의 측면 비교.
필요한 값, 범위 및 어휘와 같은 데이터에 대한 기대치를 설명하는 자동화된 데이터 스키마 생성
스키마를 검사하는 데 도움이 되는 스키마 뷰어.
누락된 피처, 범위를 벗어난 값 또는 잘못된 기능 유형과 같은 이상 징후를 식별하기 위한 이상 징후 탐지.
이상 징후 뷰어를 통해 이상 징후가 있는 피처를 확인.
TensorFlow Transform (TFT) : TFT는 TensorFlow로 데이터를 사전 처리하는 라이브러리 입니다. TFT는 다음과 같이 전체 데이터를 변환할 때 유용합니다.
평균 및 표준 편차로 입력 값을 정규화.
모든 입력 값에 대한 어휘를 생성하여 문자열을 정수로 변환.
관측된 데이터 분포를 기반으로 소수 데이터를 정수로 변환.
TensorFlow : 텐서플로우는 TFX에서 모델을 학습할 때 사용합니다. 학습 데이터와 모델링 코드를 가지고, SavedModel 이라는 결과를 생성합니다.
TensorFlow Model Analysis (TFMA) : TFMA는 텐서플로우 모델을 평가하기 위한 라이브러리 입니다. 텐서플로우로 EvalSavedModel을 생성하는데 사용되며, 이는 분석의 기반이 됩니다. 사용자들은 학습에 정의된 것과 동일한 지표를 사용하여, 분산된 방식으로 대량의 데이터에 대한 모델을 평가할 수 있습니다. 이러한 측정지표는 여러 조각의 데이터를 통해 계산할 수 있으며 주피터 노트북에서 시각화할 수 있습니다.
TensorFlow Metadata (TFMD) : TFMD는 텐서플로우로 머신러닝 모델을 학습할 때 유용한 메타데이터에 대한 표준 표현을 제공합니다. 메타데이터는 입력 데이터 분석 중에 생성할 수 있으며, 데이터 검증, 탐색 및 변환을 위해 사용될 수 있습니다. 메타데이터 직렬화 형식은 다음과 같습니다.
표 형식의 데이터를 설명하는 스키마 (예 tf.Examples)
데이터 세트에 대한 통계 요약 모음
ML Metadata (MLMD) : MLMD는 머신 러닝 개발자 및 데이터 사이언티스트가 워크플로우와 관련된 메타데이터를 기록하고 검색하기 위한 라이브러리입니다. 대부분의 메타데이터는 TFMD 표현을 사용합니다. MLMD는 SQL-Lite, MySQL 같은 데이터 저장소를 사용하여 데이터를 저장합니다.
데이터 세트
예제에서 사용한 데이터 세트는 시카고 시가 발표한 택시 여행 데이터 세트 입니다. 해당 사이트는 애플리케이션에서 데이터를 사용하기 위해, 원본 데이터를 수정한 데이터를 제공하고 있습니다. 원본 소스는 시카고 시의 공식 웹사이트인 www.cityofchicago.org 에서 확인할 수 있습니다.
Kubeflow에서 TFX 파이프라인 실행하기
TFX는 다양한 환경에서 실행할 수 있습니다. 로컬에서도 실행할 수 있고, Airflow 환경에서 실행할 수 있으며, Kubeflow 에서도 실행할 수 있습니다. 예제에서는 Kubeflow 환경에서 시카고 택시 파이프라인을 실행해 볼 것입니다. 만약 다양환 환경에 대해 관심 있으면, “Chicago Taxi Example” 페이지를 참고하시기 바랍니다.
TFX 컴포넌트 사용하기
설정
필요한 패키지를 추가하고, 사용할 데이터를 다운로드하겠습니다.
먼저 주피터 노트북을 생성하겠습니다.
패키지 설치하기
tfx 패키지를 설치합니다.
In []:
!pip install "tfx>=0.21.1,<0.22" --user
패키지 추가하기
TFX 컴포넌트와 필요한 패키지를 추가합니다.
In []:
import os
import pprint
import tempfile
import urllib
import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()
import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input
%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip
# This is the root directory for your TFX pip package installation.
_tfx_root = tfx.__path__[0]
# This is the directory containing the TFX Chicago Taxi Pipeline example.
_taxi_root = os.path.join(_tfx_root, 'examples/chicago_taxi_pipeline')
# This is the path where your model will be pushed for serving.
# Set up logging.
absl.logging.set_verbosity(absl.logging.INFO)
["train", "eval"] /tmp/tfx-interactive-2020-05-07T09_40_15.820980-gawc9si3/CsvExampleGen/examples/1
This cell will be skipped during export to pipeline.
그리고 다음과 같이 학습에 사용될 데이터 세트를 조회해 볼 수 있습니다.
In []:
# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
features {
feature {
key: "company"
value {
bytes_list {
value: "Taxi Affiliation Services"
}
}
}
feature {
key: "dropoff_census_tract"
value {
}
}
feature {
key: "dropoff_community_area"
value {
}
}
feature {
key: "dropoff_latitude"
value {
}
}
...
StatisticsGen
StatisticsGen 컴포넌트는 다운스트림 컴포넌트로서, 데이터 분석을 위해 데이터 세트에 대한 통계를 계산합니다. 텐서플로우 데이터 검증 라이브러리(TFDV:TensorFlow Data Validation)를 사용합니다.
StatisticsGen은 앞서 ExampleGen 에서 생성한 데이터 세트를 입력값으로 사용합니다.
Transform 은 ExampleGen 에서 생성한 데이터와 SchemaGen의 스키마 그리고 사용자가 정의 변환 코드를 입력값으로 사용합니다.
다음은 사용자 정의 변환 코드 입니다. 먼저 피처 엔지니어링을 위하여 몇가지 상수를 정의하겠습니다.
In []:
_taxi_constants_module_file = 'taxi_constants.py'
In []:
%%writefile {_taxi_constants_module_file}
# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]
CATEGORICAL_FEATURE_KEYS = [
'trip_start_hour', 'trip_start_day', 'trip_start_month',
'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
'dropoff_community_area'
]
DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']
# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10
BUCKET_FEATURE_KEYS = [
'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
'dropoff_longitude'
]
# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000
# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10
VOCAB_FEATURE_KEYS = [
'payment_type',
'company',
]
# Keys
LABEL_KEY = 'tips'
FARE_KEY = 'fare'
def transformed_name(key):
return key + '_xf'
그리고 데이터를 입력 받아 변환 작업을 하는 코드를 작성하겠습니다.
In []:
_taxi_transform_module_file = 'taxi_transform.py'
In []:
%%writefile {_taxi_transform_module_file}
import tensorflow as tf
import tensorflow_transform as tft
import taxi_constants
_DENSE_FLOAT_FEATURE_KEYS = taxi_constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = taxi_constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEYS = taxi_constants.BUCKET_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = taxi_constants.CATEGORICAL_FEATURE_KEYS
_FARE_KEY = taxi_constants.FARE_KEY
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
outputs = {}
for key in _DENSE_FLOAT_FEATURE_KEYS:
# Preserve this feature as a dense float, setting nan's to the mean.
outputs[_transformed_name(key)] = tft.scale_to_z_score(
_fill_in_missing(inputs[key]))
for key in _VOCAB_FEATURE_KEYS:
# Build a vocabulary for this feature.
outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
_fill_in_missing(inputs[key]),
top_k=_VOCAB_SIZE,
num_oov_buckets=_OOV_SIZE)
for key in _BUCKET_FEATURE_KEYS:
outputs[_transformed_name(key)] = tft.bucketize(
_fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT,
always_return_num_quantiles=False)
for key in _CATEGORICAL_FEATURE_KEYS:
outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])
# Was this passenger a big tipper?
taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
tips = _fill_in_missing(inputs[_LABEL_KEY])
outputs[_transformed_name(_LABEL_KEY)] = tf.where(
tf.math.is_nan(taxi_fare),
tf.cast(tf.zeros_like(taxi_fare), tf.int64),
# Test if the tip was > 20% of the fare.
tf.cast(
tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))
return outputs
def _fill_in_missing(x):
"""Replace missing values in a SparseTensor.
Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
Args:
x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1
in the second dimension.
Returns:
A rank 1 tensor where missing values of `x` have been filled in.
"""
default_value = '' if x.dtype == tf.string else 0
return tf.squeeze(
tf.sparse.to_dense(
tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
default_value),
axis=1)
이제 변환 작업을 하는 코드를 Transform 컴포넌트에 전달하여, 데이터를 변환하겠습니다.
transform_graph : transform_graph는 사전 처리 작업을 수행할 수 있는 그래프입니다.
transformed_examples : transformed_examples 는 사전 처리된 학습 데이터와 평가 데이터를 나타냅니다.
Trainer
Trainer 컴포넌트는 텐서플로우로 정의한 모델을 학습 시킵니다. Trainer 는 SchemaGen의 스키마와 Transform 에서 변환된 데이터와 그래프, 학습 파라미터 그리고 사용자 정의 모델 코드를 입력값으로 사용합니다.
다음은 사용자 정의 모델 코드입니다.
In []:
_taxi_trainer_module_file = 'taxi_trainer.py'
In []:
%%writefile {_taxi_trainer_module_file}
from typing import List, Text
import os
import absl
import datetime
import tensorflow as tf
import tensorflow_transform as tft
from tfx.components.trainer.executor import TrainerFnArgs
import taxi_constants
_DENSE_FLOAT_FEATURE_KEYS = taxi_constants.DENSE_FLOAT_FEATURE_KEYS
_VOCAB_FEATURE_KEYS = taxi_constants.VOCAB_FEATURE_KEYS
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEYS = taxi_constants.BUCKET_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = taxi_constants.CATEGORICAL_FEATURE_KEYS
_MAX_CATEGORICAL_FEATURE_VALUES = taxi_constants.MAX_CATEGORICAL_FEATURE_VALUES
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name
def _transformed_names(keys):
return [_transformed_name(key) for key in keys]
def _gzip_reader_fn(filenames):
"""Small utility returning a record reader that can read gzip'ed files."""
return tf.data.TFRecordDataset(
filenames,
compression_type='GZIP')
def _get_serve_tf_examples_fn(model, tf_transform_output):
"""Returns a function that parses a serialized tf.Example and applies TFT."""
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
"""Returns the output to be used in the serving signature."""
feature_spec = tf_transform_output.raw_feature_spec()
feature_spec.pop(_LABEL_KEY)
parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
transformed_features = model.tft_layer(parsed_features)
transformed_features.pop(_transformed_name(_LABEL_KEY))
return model(transformed_features)
return serve_tf_examples_fn
def _input_fn(file_pattern: Text,
tf_transform_output: tft.TFTransformOutput,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for tuning/training.
Args:
file_pattern: input tfrecord file pattern.
tf_transform_output: A TFTransformOutput.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
transformed_feature_spec = (
tf_transform_output.transformed_feature_spec().copy())
dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=file_pattern,
batch_size=batch_size,
features=transformed_feature_spec,
reader=_gzip_reader_fn,
label_key=_transformed_name(_LABEL_KEY))
return dataset
def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
"""Creates a DNN Keras model for classifying taxi data.
Args:
hidden_units: [int], the layer sizes of the DNN (input layer first).
Returns:
A keras Model.
"""
real_valued_columns = [
tf.feature_column.numeric_column(key, shape=())
for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
]
categorical_columns = [
tf.feature_column.categorical_column_with_identity(
key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
for key in _transformed_names(_VOCAB_FEATURE_KEYS)
]
categorical_columns += [
tf.feature_column.categorical_column_with_identity(
key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
for key in _transformed_names(_BUCKET_FEATURE_KEYS)
]
categorical_columns += [
tf.feature_column.categorical_column_with_identity( # pylint: disable=g-complex-comprehension
key,
num_buckets=num_buckets,
default_value=0) for key, num_buckets in zip(
_transformed_names(_CATEGORICAL_FEATURE_KEYS),
_MAX_CATEGORICAL_FEATURE_VALUES)
]
indicator_column = [
tf.feature_column.indicator_column(categorical_column)
for categorical_column in categorical_columns
]
model = _wide_and_deep_classifier(
wide_columns=indicator_column,
deep_columns=real_valued_columns,
dnn_hidden_units=hidden_units or [100, 70, 50, 25])
return model
def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units):
"""Build a simple keras wide and deep model.
Args:
wide_columns: Feature columns wrapped in indicator_column for wide (linear)
part of the model.
deep_columns: Feature columns for deep part of the model.
dnn_hidden_units: [int], the layer sizes of the hidden DNN.
Returns:
A Wide and Deep Keras model
"""
# Following values are hard coded for simplicity in this example,
# However prefarably they should be passsed in as hparams.
# Keras needs the feature definitions at compile time.
input_layers = {
colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
}
input_layers.update({
colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
for colname in _transformed_names(_VOCAB_FEATURE_KEYS)
})
input_layers.update({
colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
for colname in _transformed_names(_BUCKET_FEATURE_KEYS)
})
input_layers.update({
colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS)
})
deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
for numnodes in dnn_hidden_units:
deep = tf.keras.layers.Dense(numnodes)(deep)
wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)
output = tf.keras.layers.Dense(
1, activation='sigmoid')(
tf.keras.layers.concatenate([deep, wide]))
model = tf.keras.Model(input_layers, output)
model.compile(
loss='binary_crossentropy',
optimizer=tf.keras.optimizers.Adam(lr=0.001),
metrics=[tf.keras.metrics.BinaryAccuracy()])
model.summary(print_fn=absl.logging.info)
return model
# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# Number of nodes in the first layer of the DNN
first_dnn_layer_size = 100
num_dnn_layers = 4
dnn_decay_factor = 0.7
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)
model = _build_keras_model(
# Construct layers sizes with exponetial decay
hidden_units=[
max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
for i in range(num_dnn_layers)
])
# This log path might change in the future.
log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=log_dir, update_freq='batch')
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps,
callbacks=[tensorboard_callback])
signatures = {
'serving_default':
_get_serve_tf_examples_fn(model,
tf_transform_output).get_concrete_function(
tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples')),
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
Evaluator 컴포넌트는 평가 데이터 세트를 이용하여 모델 성능 지표를 계산합니다. TFMA(TensorFlow Model Analysis) 라이브러리를 사용합니다. Evaluator 는 새로 학습된 모델이 이전 모델보다 더 나은지 선택적으로 검증할 수 있습니다. 이는 매일 모델을 자동으로 학습하고 검증할 수 있는 생산 파이프라인 설정에서 유용합니다. 현재 노트북에서는 하나의 모델만 학습하므로 Evaluator가 자동으로 모델에 “good”라고 라벨을 붙입니다.
Evaluator 는 ExampleGen 에서 생성한 데이터와 Trainer 가 학습한 모델 그리고 슬라이싱 설정을 입력값으로 사용합니다. 슬라이싱 설정을 통해 피처 값에 대한 메트릭을 슬라이스할 수 있습니다.
In []:
eval_config = tfma.EvalConfig(
model_specs=[
# This assumes a serving model with signature 'serving_default'. If
# using estimator based EvalSavedModel, add signature_name: 'eval' and
# remove the label_key.
tfma.ModelSpec(label_key='tips')
],
metrics_specs=[
tfma.MetricsSpec(
# The metrics added here are in addition to those saved with the
# model (assuming either a keras model or EvalSavedModel is used).
# Any metrics added into the saved model (for example using
# model.compile(..., metrics=[...]), etc) will be computed
# automatically.
metrics=[
tfma.MetricConfig(class_name='ExampleCount')
],
# To add validation thresholds for metrics saved with the model,
# add them keyed by metric name to the thresholds map.
thresholds = {
'binary_accuracy': tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.5}),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-10}))
}
)
],
slicing_specs=[
# An empty slice spec means the overall slice, i.e. the whole dataset.
tfma.SlicingSpec(),
# Data can be sliced along a feature column. In this case, data is
# sliced along feature column trip_start_hour.
tfma.SlicingSpec(feature_keys=['trip_start_hour'])
])
Evaluator 에 설정값을 넘겨서 실행 시키겠습니다.
In []:
# Use TFMA to compute a evaluation statistics over features of a model and
# validate them against a baseline.
# The model resolver is only required if performing model validation in addition
# to evaluation. In this case we validate against the latest blessed model. If
# no model has been blessed before (as in this case) the evaluator will make our
# candidate the first blessed model.
model_resolver = ResolverNode(
instance_name='latest_blessed_model_resolver',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing))
context.run(model_resolver)
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config)
context.run(evaluator)
Pusher
Pusher 컴포넌트는 일반적으로 TFX 파이프라인의 마지막 단계에 위치해 있습니다. 모델이 유효성 검사를 통과했는지 여부를 확인하고, 합격하면 _serving_model_dir로 모델을 내보냅니다.
앞서 작성한 코드들을 이용하여, TFX 파이프라인을 생성해 보겠습니다. 그리고 KubeflowDagRunner 를 이용하여 TFX 파이프라인을 Kubeflow 파이프라인 패키지로 변환하겠습니다.
모듈 파일 생성하기
TFX 파이프라인 컴포넌트에서 사용할 모듈 파일을 생성하겠습니다. 앞서 작성한 코드를 묶어 하나의 파일로 생성하겠습니다. Transform 컴포넌트에서 사용했던 taxi_constants.py 와 taxi_transform.py 파일 그리고 Trainer 컴포넌트에서 사용했던 taxi_trainer.py 파일을 합쳐서 taxi_utils.py 파일을 생성하겠습니다.
taxi_utils.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from typing import List, Text
import absl
import tensorflow as tf
import tensorflow_transform as tft
from tfx.components.trainer.executor import TrainerFnArgs
# Categorical features are assumed to each have a maximum value in the dataset.
_MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]
_CATEGORICAL_FEATURE_KEYS = [
'trip_start_hour', 'trip_start_day', 'trip_start_month',
'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
'dropoff_community_area'
]
_DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']
# Number of buckets used by tf.transform for encoding each feature.
_FEATURE_BUCKET_COUNT = 10
_BUCKET_FEATURE_KEYS = [
'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
'dropoff_longitude'
]
# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
_VOCAB_SIZE = 1000
# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
_OOV_SIZE = 10
_VOCAB_FEATURE_KEYS = [
'payment_type',
'company',
]
# Keys
_LABEL_KEY = 'tips'
_FARE_KEY = 'fare'
def _transformed_name(key):
return key + '_xf'
def _transformed_names(keys):
return [_transformed_name(key) for key in keys]
def _gzip_reader_fn(filenames):
"""Small utility returning a record reader that can read gzip'ed files."""
return tf.data.TFRecordDataset(
filenames,
compression_type='GZIP')
def _fill_in_missing(x):
"""Replace missing values in a SparseTensor.
Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
Args:
x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1
in the second dimension.
Returns:
A rank 1 tensor where missing values of `x` have been filled in.
"""
default_value = '' if x.dtype == tf.string else 0
return tf.squeeze(
tf.sparse.to_dense(
tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
default_value),
axis=1)
def _get_serve_tf_examples_fn(model, tf_transform_output):
"""Returns a function that parses a serialized tf.Example and applies TFT."""
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
"""Returns the output to be used in the serving signature."""
feature_spec = tf_transform_output.raw_feature_spec()
feature_spec.pop(_LABEL_KEY)
parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
transformed_features = model.tft_layer(parsed_features)
# TODO(b/148082271): Remove this line once TFT 0.22 is used.
transformed_features.pop(_transformed_name(_LABEL_KEY), None)
return model(transformed_features)
return serve_tf_examples_fn
def _input_fn(file_pattern: Text,
tf_transform_output: tft.TFTransformOutput,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for tuning/training.
Args:
file_pattern: input tfrecord file pattern.
tf_transform_output: A TFTransformOutput.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
transformed_feature_spec = (
tf_transform_output.transformed_feature_spec().copy())
dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=file_pattern,
batch_size=batch_size,
features=transformed_feature_spec,
reader=_gzip_reader_fn,
label_key=_transformed_name(_LABEL_KEY))
return dataset
def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
"""Creates a DNN Keras model for classifying taxi data.
Args:
hidden_units: [int], the layer sizes of the DNN (input layer first).
Returns:
A keras Model.
"""
real_valued_columns = [
tf.feature_column.numeric_column(key, shape=())
for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
]
categorical_columns = [
tf.feature_column.categorical_column_with_identity(
key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
for key in _transformed_names(_VOCAB_FEATURE_KEYS)
]
categorical_columns += [
tf.feature_column.categorical_column_with_identity(
key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
for key in _transformed_names(_BUCKET_FEATURE_KEYS)
]
categorical_columns += [
tf.feature_column.categorical_column_with_identity( # pylint: disable=g-complex-comprehension
key,
num_buckets=num_buckets,
default_value=0) for key, num_buckets in zip(
_transformed_names(_CATEGORICAL_FEATURE_KEYS),
_MAX_CATEGORICAL_FEATURE_VALUES)
]
indicator_column = [
tf.feature_column.indicator_column(categorical_column)
for categorical_column in categorical_columns
]
model = _wide_and_deep_classifier(
wide_columns=indicator_column,
deep_columns=real_valued_columns,
dnn_hidden_units=hidden_units or [100, 70, 50, 25])
return model
def _wide_and_deep_classifier(wide_columns, deep_columns, dnn_hidden_units):
"""Build a simple keras wide and deep model.
Args:
wide_columns: Feature columns wrapped in indicator_column for wide (linear)
part of the model.
deep_columns: Feature columns for deep part of the model.
dnn_hidden_units: [int], the layer sizes of the hidden DNN.
Returns:
A Wide and Deep Keras model
"""
# Following values are hard coded for simplicity in this example,
# However prefarably they should be passsed in as hparams.
# Keras needs the feature definitions at compile time.
input_layers = {
colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
for colname in _transformed_names(_DENSE_FLOAT_FEATURE_KEYS)
}
input_layers.update({
colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
for colname in _transformed_names(_VOCAB_FEATURE_KEYS)
})
input_layers.update({
colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
for colname in _transformed_names(_BUCKET_FEATURE_KEYS)
})
input_layers.update({
colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
for colname in _transformed_names(_CATEGORICAL_FEATURE_KEYS)
})
# TODO(b/144500510): SparseFeatures for feature columns + Keras.
deep = tf.keras.layers.DenseFeatures(deep_columns)(input_layers)
for numnodes in dnn_hidden_units:
deep = tf.keras.layers.Dense(numnodes)(deep)
wide = tf.keras.layers.DenseFeatures(wide_columns)(input_layers)
output = tf.keras.layers.Dense(
1, activation='sigmoid')(
tf.keras.layers.concatenate([deep, wide]))
model = tf.keras.Model(input_layers, output)
model.compile(
loss='binary_crossentropy',
optimizer=tf.keras.optimizers.Adam(lr=0.001),
metrics=[tf.keras.metrics.BinaryAccuracy()])
model.summary(print_fn=absl.logging.info)
return model
# TFX Transform will call this function.
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
outputs = {}
for key in _DENSE_FLOAT_FEATURE_KEYS:
# Preserve this feature as a dense float, setting nan's to the mean.
outputs[_transformed_name(key)] = tft.scale_to_z_score(
_fill_in_missing(inputs[key]))
for key in _VOCAB_FEATURE_KEYS:
# Build a vocabulary for this feature.
outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
_fill_in_missing(inputs[key]),
top_k=_VOCAB_SIZE,
num_oov_buckets=_OOV_SIZE)
for key in _BUCKET_FEATURE_KEYS:
outputs[_transformed_name(key)] = tft.bucketize(
_fill_in_missing(inputs[key]),
_FEATURE_BUCKET_COUNT,
always_return_num_quantiles=False)
for key in _CATEGORICAL_FEATURE_KEYS:
outputs[_transformed_name(key)] = _fill_in_missing(inputs[key])
# Was this passenger a big tipper?
taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
tips = _fill_in_missing(inputs[_LABEL_KEY])
outputs[_transformed_name(_LABEL_KEY)] = tf.where(
tf.math.is_nan(taxi_fare),
tf.cast(tf.zeros_like(taxi_fare), tf.int64),
# Test if the tip was > 20% of the fare.
tf.cast(
tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))
return outputs
# TFX Trainer will call this function.
def trainer_fn(fn_args: TrainerFnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
# Number of nodes in the first layer of the DNN
first_dnn_layer_size = 100
num_dnn_layers = 4
dnn_decay_factor = 0.7
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
model = _build_keras_model(
# Construct layers sizes with exponetial decay
hidden_units=[
max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
for i in range(num_dnn_layers)
])
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps)
signatures = {
'serving_default':
_get_serve_tf_examples_fn(model,
tf_transform_output).get_concrete_function(
tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples')),
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
패키지 추가하기
새로운 주피터 노트북을 생성하고, TFX 컴포넌트와 필요한 패키지를 추가합니다.
In []:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
from typing import Text
from kfp import onprem
import tensorflow_model_analysis as tfma
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import InfraValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import infra_validator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input
def _create_pipeline(pipeline_name: Text, pipeline_root: Text, data_root: Text,
module_file: Text, serving_model_dir: Text,
direct_num_workers: int) -> pipeline.Pipeline:
"""Implements the chicago taxi pipeline with TFX and Kubeflow Pipelines."""
examples = external_input(data_root)
# Brings data into the pipeline or otherwise joins/converts training data.
example_gen = CsvExampleGen(input=examples)
# Computes statistics over data for visualization and example validation.
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
# Generates schema based on statistics files.
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False)
# Performs anomaly detection based on statistics and data schema.
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
# Performs transformations and feature engineering in training and serving.
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=module_file)
# Uses user-provided Python function that implements a model using TF-Learn
# to train a model on Google Cloud AI Platform.
trainer = Trainer(
module_file=module_file,
custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000))
# Uses TFMA to compute a evaluation statistics over features of a model and
# perform quality validation of a candidate model (compared to a baseline).
eval_config = tfma.EvalConfig(
model_specs=[
# This assumes a serving model with signature 'serving_default'. If
# using estimator based EvalSavedModel, add signature_name: 'eval' and
# remove the label_key.
tfma.ModelSpec(label_key='tips')
],
metrics_specs=[
tfma.MetricsSpec(
# The metrics added here are in addition to those saved with the
# model (assuming either a keras model or EvalSavedModel is used).
# Any metrics added into the saved model (for example using
# model.compile(..., metrics=[...]), etc) will be computed
# automatically.
metrics=[
tfma.MetricConfig(class_name='ExampleCount')
],
# To add validation thresholds for metrics saved with the model,
# add them keyed by metric name to the thresholds map.
thresholds = {
'binary_accuracy': tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.5}),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-10}))
}
)
],
slicing_specs=[
# An empty slice spec means the overall slice, i.e. the whole dataset.
tfma.SlicingSpec(),
# Data can be sliced along a feature column. In this case, data is
# sliced along feature column trip_start_hour.
tfma.SlicingSpec(feature_keys=['trip_start_hour'])
])
# Get the latest blessed model for model validation.
model_resolver = ResolverNode(
instance_name='latest_blessed_model_resolver',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing))
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
baseline_model=model_resolver.outputs['model'],
# Change threshold will be ignored if there is no baseline (first run).
eval_config=eval_config)
# Performs infra validation of a candidate model to prevent unservable model
# from being pushed. In order to use InfraValidator component, persistent
# volume and its claim that the pipeline is using should be a ReadWriteMany
# access mode.
infra_validator = InfraValidator(
model=trainer.outputs['model'],
examples=example_gen.outputs['examples'],
serving_spec=infra_validator_pb2.ServingSpec(
tensorflow_serving=infra_validator_pb2.TensorFlowServing(
tags=['latest']),
kubernetes=infra_validator_pb2.KubernetesConfig()),
request_spec=infra_validator_pb2.RequestSpec(
tensorflow_serving=infra_validator_pb2.TensorFlowServingRequestSpec())
)
# Checks whether the model passed the validation steps and pushes the model
# to Google Cloud AI Platform if check passed.
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
infra_blessing=infra_validator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=serving_model_dir)))
return pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[
example_gen,
statistics_gen,
schema_gen,
example_validator,
transform,
trainer,
model_resolver,
evaluator,
infra_validator,
pusher,
],
beam_pipeline_args=['--direct_num_workers=%d' % direct_num_workers],
)
TFX 에서 사용할 메타데이터 저장소의 정보를 지정합니다. Kubeflow의 메타데이터를 사용하겠습니다.
KubeflowDagRunner 를 사용하여, 작성한 TFX 파이프라인을 Kubeflow 파이프라인으로 생성합니다. 해당 셀이 실행되면, chicago_taxi_pipeline_kubeflow_pvc.tar.gz 라는 Kubeflow 파이프라인 패키지가 생성됩니다.
In []:
if __name__ == '__main__':
# This pipeline automatically injects the Kubeflow TFX image if the
# environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
# cli tool exports the environment variable to pass to the pipelines.
tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config=metadata_config,
# Specify custom docker image to use.
tfx_image=tfx_image,
pipeline_operator_funcs=(
[
onprem.mount_pvc(_persistent_volume_claim, _persistent_volume,
_persistent_volume_mount)
]))
kubeflow_dag_runner.KubeflowDagRunner(config=runner_config).run(
_create_pipeline(
pipeline_name=_pipeline_name,
pipeline_root=_pipeline_root,
data_root=_data_root,
module_file=_module_file,
serving_model_dir=_serving_model_dir,
# 0 means auto-detect based on the number of CPUs available during
# execution time.
direct_num_workers=0))
파이프라인을 실행하기에 앞서, 필요한 파일을 퍼시스턴스 볼륨에 복사하도록 하겠습니다. 학습을 위한 데이터 파일과 사용자 정의 코드가 들어 있는 파이썬 파일을 복사합니다.
퍼시스턴스 볼륨에 파일을 업로드하는 방법은 다양하게 존재하기 때문에, 편하신 방법을 사용하시면 됩니다. 예제에서는 퍼시스턴스 볼륨에 파일을 업로드하기 위해서, “PHP File Manager” 를 사용하였습니다. 파일 매니저를 POD로 실행한 다음, 웹 브라우저를 이용해서 파일을 업로드하겠습니다.
먼저 파일 매니저 POD 매니페스트를 작성합니다. chicago-taxi-pvc 를 이용하여 볼륨을 마운트 합니다.
그리고 앞서 생성한 taxi_utils.py 파일을 /tfx/taxi_utils.py 경로로 업로드 합니다.
포트 포워딩을 종료하고, 파일 매니저를 삭제하겠습니다.
kubectl -n kubeflow delete pod filemanager
파이프라인 실행하기
KFP SDK를 이용하여, 생성한 파이프라인 패키지를 실행해 보겠습니다.
In []:
import kfp
run_result = kfp.Client(
host=None # replace with Kubeflow Pipelines endpoint if this notebook is run outside of the Kubeflow cluster.
).create_run_from_pipeline_package('chicago_taxi_pipeline_kubeflow_pvc.tar.gz', arguments={})
먼저, 모델 학습을 위하여 데이터를 학습 세트와 테스트 세트로 나누겠습니다. sklearn의 train_test_split() 를 사용하면, 간단히 나눌 수 있습니다.
In []:
from sklearn.model_selection import train_test_split
X = train_data.drop(['Id', 'SalePrice'], axis=1)
y = train_data['SalePrice']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
xgboost 의 XGBRegressor을 사용하여, 모델을 학습 시켜 보겠습니다.
In []:
from xgboost import XGBRegressor
model = XGBRegressor()
model.fit(X_train, y_train, eval_set=[(X_test, y_test)])
캐글의 “My Submissions” 탭을 클릭하면, 제출한 내용들을 확인할 수 있습니다.
파이프라인 만들기
앞서 작성한 코드들을 바탕으로 하여 파이프라인을 구성해 보겠습니다. 각각의 단계를 컴포넌트로 구성한 다음, 파이프라인을 작성하고 실행해 보겠습니다.
파이프라인의 단계는 다음과 같습니다.
데이터 다운로드 : 캐글에서 데이터를 다운로드 합니다.
데이터 압축풀기 : 캐글에서 다운로드한 데이터의 압축을 풉니다.
데이터 변환 : train.csv 과 test.csv 의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다.
모델 학습 : 변환된 train.csv 데이터를 이용하여 모델을 학습니다.
예측 : 변환된 test.csv 데이터를 이용하여 예측합니다. 그리고 예측한 결과를 submission.csv 파일로 저장합니다.
캐글 제출 : 생성된 submission.csv 파일을 캐글에 제출합니다.
기본 이미지 만들기
데이터 변환, 모덱 학습, 예측 컴포넌트에서 사용할 기본 이미지를 만들어 보겠습니다. 파이프라인에서도 필요한 이미지를 빌드할 수 있지만, 기본 이미지를 만들어서 사용하는게 더 효율적입니다. docker 명령어를 이용하여, 컨테이너 이미지를 빌드하기 때문에, docker 명령어가 실행 가능한 곳에서 작업을 해야합니다.
먼저 베이스 이미지 디렉토리를 만듭니다.
mkdir -p ~/workspace/base/xgboost
cd ~/workspace/base/xgboost
필요한 파이썬 패키지 목록을 requirements.txt 파일로 작성합니다.
requirements.txt
scikit-learn
joblib
numpy
pandas
fire
xgboost
컨테이너 이미지 빌드를 위하여 Dockerfile 파일을 작성합니다. 파이썬 패키지 목록인 requirements.txt 파일을 추가하고, 해당 파일을 이용하여 패키지를 설치합니다.
Dockerfile
FROM python:3.6-slim
RUN apt-get update -y && \\
apt-get install -y libgomp1
WORKDIR /app
COPY requirements.txt /app
RUN pip --no-cache-dir install -r requirements.txt
컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하기 위하여 build_image.sh 파일을 작성합니다.
주피터 노트북을 생성합니다. 파일 이름은 transform_pipeline_s3.ipynb 입니다. 노트북의 위치는 ~/workspace/house-prices입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.
패키지 추가
파이프라인과 컴포넌트를 사용하기 위한 패키지를 추가합니다.
In []:
from kfp import components
from kfp import dsl
캐글 데이터 다운로드 컴포넌트
캐글에서 데이터를 다운로드 하는 컴포넌트를 정의합니다. 앞서 “타이타닉 생존 예측”에서 만들 컴포넌트를 재사용 합니다.
이번에는 [transform.py](<http://transform.py>) 라는 파이썬 파일을 생성한 후, 페어링을 이용하여 컨테이너 이미지를 빌드한 다음, 파이프라인에서 사용하도록 하겠습니다.
train.csv 과 test.csv 의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다. 캐글 데이터의 경로를 input_path 파라미터로 입력 받아 데이터를 변환합니다. 데이터 변환에 쓰이는 코드들은 앞에서 작성한 코드와 동일합니다. 변환할 데이터를 저장할 경로를 output_path 파라미터로 입력 받아 변환된 데이터를 저장합니다. 컨테이너 이미지를 빠르게 빌드하기 위하여, 필요한 패키지가 포함된 기본 이미지를 미리 만들어서 사용하였습니다.
주피터 노트북 셀에서 %%writefile transform.py 을 이용하여transform.py 파일을 생성합니다.
S3 저장소의 해당 버킷을 조회해 보면, 다음과 같이 2개의 데이터 파일이 생성된 것을 확인할 수 있습니다.
모델 학습 및 캐글 제출 파이프라인 컴포넌트 작성하기
모델 학습 및 캐글 제출 파이프라인에서는 세개의 컴포넌트를 사용합니다.
Katib Launcher: 하이퍼파리미터 튜닝을 위하여, Experiment 를 생성해 줍니다.
HP Out: 하이퍼파리미터 튜닝 결과 값을 출력해 줍니다.
Train: 하이퍼파리미터 값을 입력 받아 모델을 학습합니다. 학습한 모델은 S3에 저장합니다.
Predict : S3에서 모델을 가져와서, 예측을 수행합니다. 예측 결과 값을 퍼시스턴스 볼륨에 저장합니다.
Submit : 예측 결가 값을 캐글에 제출합니다.
모든 데이터를 S3에 저장할 수 있으나, 앞서 만든 Submit 컴포넌트가 S3를 지원하기 않기 때문에, 퍼시스턴스 볼륨과 S3를 같이 사용하였습니다. 그리고 하이퍼파라미터 튜닝을 위한 Experiment 리소스를 파이프라인이 실행되는 kubeflow 네임스가 아닌, 별도의 admin 네임스페이스에 생성하였습니다.
파이프라인의 전체적인 흐름은 다음과 같습니다.
하이퍼 파리미터 튜닝 작업이 admin 네임스페이스 실행되므로, 해당 작업들이 S3에 접근할 수 있도록, admin 네임스페이스에도 aws-secret 을 생성해줘야합니다.
주피터 노트북을 생성합니다. 파일 이름은 train_pipeline_s3.ipynb 입니다. 노트북의 위치는 ~/workspace/house-prices입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/house-prices 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.
모델 학습과 예측 컴포넌트
train.py 파일을 생성한 후, 페어링을 이용하여 컨테이너 이미지를 빌드한 다음, 파이프라인에서 사용하도록 하겠습니다.
변환된 train.csv 데이터를 이용하여 모델을 학습니다. 변환된 데이터의 경로를 path 파리미터로 입력받에 데이터를 읽어옵니다. 그리고 데이터를 학습 세트와 테스트 세트로 나눈다음, 모델을 학습니다. 그리고 하이퍼 파라미터로서 n_estimators 와 learning_rate 를 입력 받아, 모델 학습시 사용합니다. 그리고 mode 를 입력 받아, train 일 경우에는 모델 학습을, predict 일 경우에는 예측을 실행합니다. S3에서 데이터를 읽고, 저장하기 위하여 minio 라이브러리를 사용하였습니다.
주피터 노트북 셀에서 %%writefile train.py 을 이용하여 train.py 파일을 생성합니다.
In []:
%%writefile train.py
import argparse
import json
import os
from datetime import datetime, timezone
from tempfile import TemporaryDirectory
import pandas as pd
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor
def read_train_input(input_path):
minio_client = get_minio_client()
bucket_name, object_name = get_bucket_name_and_object_name(input_path)
with TemporaryDirectory() as tmpdir:
tmp_train_data_file = os.path.join(tmpdir, 'train.csv')
minio_client.fget_object(bucket_name, os.path.join(object_name, 'train.csv'), tmp_train_data_file)
data = pd.read_csv(tmp_train_data_file)
X = data.drop(['Id', 'SalePrice'], axis=1)
y = data['SalePrice']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
return X_train, y_train, X_test, y_test
def read_test_input(input_path='./input'):
minio_client = get_minio_client()
bucket_name, object_name = get_bucket_name_and_object_name(input_path)
with TemporaryDirectory() as tmpdir:
tmp_test_data_file = os.path.join(tmpdir, 'test.csv')
minio_client.fget_object(bucket_name, os.path.join(object_name, 'test.csv'), tmp_test_data_file)
data = pd.read_csv(tmp_test_data_file)
X = data.drop(['Id'], axis=1)
id = data['Id']
return X, id
def train_model(X_train, y_train,
X_test, y_test,
n_estimators, learning_rate, early_stopping_rounds):
model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)
model.fit(X_train, y_train,
early_stopping_rounds=early_stopping_rounds, eval_set=[(X_test, y_test)])
print('Best RMSE on eval: {} with {} rounds'.format(model.best_score, model.best_iteration + 1))
return model
def eval_model(model, X_test, y_test):
predictions = model.predict(X_test)
local_time = datetime.now(timezone.utc).astimezone().isoformat()
score = r2_score(predictions, y_test)
print('{} r2_score={}'.format(local_time, score))
return score
MODE_FILENAME = 'model.bst'
def load_model(model_path):
minio_client = get_minio_client()
bucket_name, object_name = get_bucket_name_and_object_name(model_path)
model = XGBRegressor()
with TemporaryDirectory() as tmpdir:
tmp_model_file = os.path.join(tmpdir, MODE_FILENAME)
minio_client.fget_object(bucket_name, os.path.join(object_name, MODE_FILENAME), tmp_model_file)
model.load_model(tmp_model_file)
print('Load modelfrom ', os.path.join(model_path, MODE_FILENAME))
return model
def save_model(model, model_path):
minio_client = get_minio_client()
bucket_name, object_name = get_bucket_name_and_object_name(model_path)
with TemporaryDirectory() as tmpdir:
tmp_model_file = os.path.join(tmpdir, MODE_FILENAME)
model.save_model(tmp_model_file)
minio_client.fput_object(bucket_name, os.path.join(object_name, MODE_FILENAME), tmp_model_file)
print('Save model to', os.path.join(model_path, MODE_FILENAME))
def get_minio_client():
access_key = os.environ['AWS_ACCESS_KEY_ID']
secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
from minio import Minio
return Minio('s3.amazonaws.com',access_key=access_key, secret_key=secret_key)
def get_bucket_name_and_object_name(path):
from urllib.parse import urlparse
url = urlparse(path, allow_fragments=False)
bucket_name = url.netloc
object_name = url.path.lstrip('/')
return bucket_name, object_name
class ModelServe(object):
def __init__(self, model_path=None, n_estimators=100, learning_rate=0.1, early_stopping_rounds=40):
self.model_path = model_path
self.n_estimators = n_estimators
self.learning_rate = learning_rate
self.early_stopping_rounds = early_stopping_rounds
print("model_path={}".format(self.model_path))
print("n_estimators={}".format(self.n_estimators))
print("learning_rate={}".format(self.learning_rate))
print("early_stopping_rounds={}".format(self.early_stopping_rounds))
self.model = None
# self._workspace = None
# self.exec = self.create_execution()
def train(self, X_train, y_train, X_test, y_test):
# self.exec.log_input(metadata.DataSet(
# description="xgboost synthetic data",
# name="synthetic data",
# owner="someone@kubeflow.org",
# uri="file://path/to/dataset",
# version="v0.0.1"))
model = train_model(X_train,
y_train,
X_test,
y_test,
self.n_estimators,
self.learning_rate,
self.early_stopping_rounds)
score = eval_model(model, X_test, y_test)
# self.exec.log_output(metadata.Metrics(
# name="traing- valuation",
# owner="someone@kubeflow.org",
# description="training evaluation for xgboost synthetic",
# uri="file://path/to/metrics",
# metrics_type=metadata.Metrics.VALIDATION,
# values={"mean_absolute_error": mae}))
if self.model_path:
save_model(model, self.model_path)
# self.exec.log_output(metadata.Model(
# name="model",
# description="prediction model using synthetic data",
# owner="someone@kubeflow.org",
# uri='file://path/to/model',
# model_type="linear_regression",
# training_framework={
# "name": "xgboost",
# "version": "0.90"
# },
# hyperparameters={
# "learning_rate": self.learning_rate,
# "n_estimators": self.n_estimators
# },
# version=datetime.utcnow().isoformat("T")))
def predict(self, X, feature_names):
if not self.model:
self.model = load_model(self.model_path)
# Do any preprocessing
prediction = self.model.predict(data=X)
# Do any postprocessing
return prediction
# @property
# def workspace(self):
# if not self._workspace:
# self._workspace = create_workspace()
# return self._workspace
# def create_execution(self):
# r = metadata.Run(
# workspace=self.workspace,
# name="xgboost-synthetic-run" + datetime.utcnow().isoformat("T"),
# description="a notebook run")
# return metadata.Execution(
# name = "execution" + datetime.utcnow().isoformat("T"),
# workspace=self.workspace,
# run=r,
# description="execution for training xgboost-synthetic")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--input_path', default='./input', type=str)
parser.add_argument('--hyperparameters', required=False, type=str)
parser.add_argument('--n_estimators', default='100', type=int)
parser.add_argument('--learning_rate', default='0.1', type=float)
parser.add_argument('--model_path', required=False, type=str)
parser.add_argument('--mode', default='train', choices=['train', 'predict'])
parser.add_argument('--submission_path', default='./', type=str)
args = parser.parse_args()
if args.hyperparameters:
hp_json = json.loads(args.hyperparameters)
print("use json hyperparameters", hp_json)
hyperparameters = {}
for n in hp_json:
hyperparameters[n['name']] = n['value']
n_estimators = int(hyperparameters['--n_estimators'])
learning_rate = float(hyperparameters['--learning_rate'])
else:
n_estimators = args.n_estimators
learning_rate = args.learning_rate
if args.mode == 'predict':
test, id = read_test_input(args.input_path)
model = ModelServe(model_path=args.model_path)
pred = model.predict(test, None)
submission = pd.concat([id, pd.Series(pred, name='SalePrice')], axis=1)
print(submission)
if not os.path.exists(args.submission_path):
os.makedirs(args.submission_path)
submission.to_csv(os.path.join(args.submission_path, 'submission.csv'), index=False)
else:
X_train, y_train, X_test, y_test = read_train_input(args.input_path)
model = ModelServe(model_path=args.model_path, n_estimators=n_estimators, learning_rate=learning_rate)
model.train(X_train, y_train, X_test, y_test)
페어링을 이용하여, [train.py](<http://train.py>) 파일이 포함된 컨테이너 이미지를 빌드하고 푸시하겠습니다. 주피터 노트북에서 페어링을 이용하여, 컨테이너 이미지를 푸시하려면, 별도의 설정이 되어 있야 합니다. 만약 설정이 되어 있지 않다면, “05-Kubeflow Fairing – 주피터 노트북에서 Kubeflow Fairing 설정하기“를 참고하셔서 설정하시기 바랍니다.
In []:
import uuid
from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils
from kubeflow.fairing.preprocessors import base
from kubeflow.fairing.builders.append import append
CONTAINER_REGISTRY = 'kangwoo'
preprocessor = base.BasePreProcessor(executable="train.py")
builder = append.AppendBuilder(registry=CONTAINER_REGISTRY, image_name="house-prices-train",
base_image="kangwoo/xgboost:0.82", preprocessor=preprocessor)
builder.build()
train_image = builder.image_tag
print(train_image)
파이프라인 생성하기
파이프라인 작성하기
파이프라인을 작성해 보겠습니다. Kubeflow 에서 제공하는 katib-launcher 컴포넌트를 이용하여, 하이퍼 파리미터 튜닝 작업을 실행합니다. 튜닝 작업이 끝난 후, 가장 좋은 점수의 하이퍼 파라미터값을 넘겨 받습니다. 이 값을 이용하여 모델을 다시 학습한 후, 캐글에 제출할 예측 데이터를 생성합니다.
간혹 쿠버네티스의 리소스의 삭제가 불가능해지는 경우가 있습니다. 예를 들어, 커스텀 리소스를 등록하여 사용하다가, 의존성을 무시한채 컴포넌트를 제거해버려서 특정 리소스를 삭제하지 못하는 경우 입니다. 아무런 생각없이 다시 설치할 생각에, 네임스페이스를 제거해버렸다가, 영원히 삭제되지 않는 고통을 경험해 볼 수 있습니다.
이럴 경우 최후의 방법을 사용해야 하는데, 바로 etcd 의 리소스 데이터를 삭제해 버리는 것입니다. 정상적인 방법은 아니니, 꼭 필요한 경우가 아니면 사용하지 않는 것을 추천 드립니다.
다음은 etcdctl을 이용해서, 특정 리소스를 삭제하는 예제입니다.
tekton-pipelines 이라는 네임스페이스에 있는 echo-hello-world-task-run 라는 taskruns 커스텀 리소스를 삭제하는 것입니다.
etcd는 쿠버네티스 클러스터의 중요한 컴포넌트로서, 클러스터의 상태를 저장하고 있습니다. 클러스터에 필요한 설정 정보와, 포드와 서비스 같은 각 리소스들의 명세와 상태 정보 등을 저장하고 있습니다. 이 글에서는 etcd가 이러한 정보들을 어떻게 저장하는지에 대해서 간단히 알아볼 것입니다.
etcd
etcd 는 분산 시스템에서 사용할 수 있는 분산형 키-값 (key-value) 저장소 입니다 CoreOS에서 클러스터를 관리하기 위해서 사용했으며, 요즘은 쿠버네티스의 기본 데이터 저장소로 많이 사용하고 있습니다. etcd는 고가용성을 위하여 클러스터로 설치됩니다. 여러 노드의 통신은 래프트(Raft) 알고리즘에 의해 처리합니다. 연결된 노드들 중 리더를 선정하여 클러스터를 관리합니다. 데이터는 분산되어 저장하기 때문에 시스템 오류에 대한 내성을 가지고 있습니다. 클러스터의 노드는 홀수개로 이루어져야 하며, 최소 3개 이상의 노드가 필요합니다.
쿠버네티스는 etcd를 기본 데이터 저장소로 사용합니다. 그래서 쿠버네티스를 설치하기 위해서는 etcd 가 필요합니다. 별도의 etcd 클러스터를 쿠버네티스 외부에 설치한 후 쿠버네티스에서 사용할 수 있습니다. 또는 쿠버네티스를 설치할때 컨트롤플레인 노드에 etcd를 포드로 같이 설치할 수 있습니다.
etcd 클러스터를 별도로 설치한 경우
다음은 그림은 etcd 클러스터를 외부에 설치한 경우를 나타낸 것입니다. 쿠버네티스 외부에 etcd 클러스터가 존재하고 kube-apiserver에서 해당 etcd 클러스터에 접속하여 데이터를 저장하고 읽어옵니다.
etcd 클러스터를 컨트롤 플레인 노드에 포드를 설치한 경우
다음 그림은 etcd 클러스터를 쿠버네티스 컨트롤 플레인 노드에 같이 설치한 경우를 나타낸 것입니다. 컨트롤 플레인 노드에 스태틱 포드(static pod) 형태로 etcd가 실행됩니다.
kube-apiserver 는 포드를 생성하는 요청을 받은 후, 포드의 매니페스트를 etcd로 저장합니다.
kube-scheduler 는 kube-apiserver 를 통해 포드 정보를 감시(watch)하고 있다가, 새로운 포드가 등록 된것을 감지하고, 포드가 실행 가능한 노드로 배치합니다. 포드의 정보에 배치될 노드의 정보를 추가한 후,kube-apiserver 를 통해 정보를 업데이트 합니다. kube-apiserver 는 업데이트 된 정보를 다시 etcd에 저장합니다.
kubelet 은 자신의 노드에 배치된 파드의 정보를 감지하고, 컨테이너를 실행시킵니다. 그리고, kube-apiserver 를 통해 상태 정보를 업데이트 합니다. kube-apiserver 는 업데이트 된 정보를 다시 etcd에 저장합니다.
쿠버네티스에서 etcd 정보 조회하기
쿠버네티스에서 사용되고 있는 etcd의 데이터를 조회하는 방법에 대해서 알아보겠습니다.
예제에 사용하는 쿠버네티스는 etcd가 컨트롤 플레인 노드에 스태틱 포드(static pod)로 같이 설치된 클러스터입니다.
쿠버네티스 클러스터가 정상적으로 설치되었다면, kube-system 네임스페이스에서 etcd 포드를 확인할 수 있습니다. 포드 이름은 일반적으로 etcd-노드명 입니다.
$ kubectl -n kube-system get pod
NAME READY STATUS RESTARTS AGE
...
etcd-node001 1/1 Running 0 229d
etcd-node002 1/1 Running 0 229d
etcd-node003 1/1 Running 1 229d
...
미러 포드(mirror pod)로 등록되어 있다면, etcd 포드를 확인할 수 있습니다.
etcdctl 사용하기
etcd 포드들 중 하나에 ps aux 명령어를 실행하여 etcd 서버의 주소를 확인해 보겠습니다.
$ kubectl -n kube-system exec -it etcd-node001 -- ps aux
PID USER TIME COMMAND
1 root 9d20 etcd --advertise-client-urls=https://10.203.163.116:2379,h
39283 root 0:00 ps aux
--advertise-client-urls 플래그의 값을 사용하여, etcd 클러스터에 명령을 실행해 보겠습니다. etcd 클러스터에 명령을 실행하기 위해서 etcdctl 이라는 유틸리티를 사용하겠습니다.
etcd의 데이터를 파일로 저장하기
etcdctl 실행할 때, v3 API를 사용하기 위해 ETCDCTL_API=3 을 붙입니다. 그리고, 접속 주소랑 인증을 위한 인증서 파일 경로를 플래그로 넘겨줍니다.
포드의 이벤트는 kubectl describe pod 포드네임 명령어를 이용하여 확인해 볼 수 있습니다.
$ kubectl describe pod nginx
“Events:”라는 부분에 포드 관련 이벤트가 조회되는 것을 확인할 수 있습니다. 이 이벤트는 시간이 지나면 자동으로 삭제되기 때문에, 안 보일수도 있습니다. (이벤트 지속 시간의 기본 설정 값은 1시간입니다.)
Name: nginx
Namespace: default
Priority: 0
...
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 2m37s default-scheduler Successfully assigned default/nginx to worker-001
Normal Pulling 2m36s kubelet, worker-001 Pulling image "nginx:1.19.2"
Normal Pulled 2m28s kubelet, worker-001 Successfully pulled image "nginx:1.19.2"
Normal Created 2m28s kubelet, worker-001 Created container nginx
Normal Started 2m28s kubelet, worker-001 Started container nginx
특정 키로 조회해보기
특정 키의 값을 바로 조회해 볼 수도 있습니다. etcdctl 의 get 플래그에 키를 지정하면 됩니다. default 네임스페이스에 있는 nginx 라는 포드의 정보를 조회해 보겠습니다.
다음은 검색바를 이용하여, “Titanic: Machine Learning from Disaster” 을 검색한 결과입니다.
“Titanic: Machine Learning from Disaster” Competition 페이지 접속하면 다음과 같은 화면을 볼 수 있습니다.
데이터는 “Data” 탭에서 받을 수 있습니다. “Data” 탭을 클릭하면, 데이터에 대한 자세한 설명과 다운로드 받는 방법을 볼 수 있습니다.
주피터 노트북
주피터에서 새로운 Terminal 을 엽니다.
kaggle 을 이용하여 데이터를 받기 위해서, kaggle 패키지를 설치합니다.
pip install kaggle --user
정상적으로 설치되면, 다음과 같은 응답 결과를 확인할 수 있습니다.
Collecting kaggle
Downloading <https://files.pythonhosted.org/packages/62/ab/bb20f9b9e24f9a6250f95a432f8d9a7d745f8d24039d7a5a6eaadb7783ba/kaggle-1.5.6.tar.gz> (58kB)
....
Successfully built kaggle python-slugify
Installing collected packages: urllib3, tqdm, text-unidecode, python-slugify, kaggle
WARNING: The script tqdm is installed in '/home/jovyan/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
WARNING: The script slugify is installed in '/home/jovyan/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
WARNING: The script kaggle is installed in '/home/jovyan/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
Successfully installed kaggle-1.5.6 python-slugify-4.0.0 text-unidecode-1.3 tqdm-4.45.0 urllib3-1.24.3
kaggle 명령어를 실행하기 위해서 PATH 에 추가해 줍니다. kaggle 를 user 디렉토리에 설치하였기 때문에, PATH 에 안잡혀 있을 수 있습니다. 사용하는 주피터 노트북 이미지에 따라서 다르기 때문에, 이미 PATH 에 추가되어 있다면, 다음 단계로 넘어가셔도 됩니다.
export PATH=${PATH}:/home/jovyan/.local/bin
캐글 API 토큰 생성하기
주피터 터미널에서 kaggle 명령어를 이용하여, 데이터를 받으려면, API 토큰이 필요합니다. 캐글 사이트의 우측 상단에 있는 “My Profile”을 클릭합니다.
My Account
“My Profile” 페이지 접속하면, 토큰을 생성할 수 있습니다.
“Create New API Token”을 클릭하여 토큰을 생성합니다. 토큰이 생성되면 “kaggle.json” 파일로 자동 다운로드 됩니다.
주어진 데이터를 바탕으로, 타이타닉호에 탑승했던 승객이 타이타닉 침몰에서 살아 남았는지를 예측하는 문제입니다.
데이터 수집(Data Ingestion)
Data Ingestion은 사용하거나 저장하기 위해서 데이터를 입수하고 가져오는 과정입니다. 간단히 얘기해서 머신 러닝에 사용할 데이터를 가져온다고 할 수 있습니다. 예제에서는 캐글에서 제공하는 데이터를 다운로드 받아 사용합니다.
데이터 분석 및 검증 (Data Analysis and Validation)
데이터 분석이란 데이터의 분포를 이해한다는 것을 의미합니다. 데이터에 대한 통계 정보들, 예를 들어, 각 컬럼들이 어떤한 값들을 얼마 만큼 가지고 있는지, 어떤 컬럼이 포함하거나 포함하지 않는 값들 얼마만큼 가지고 있는지 같은 정보를 파악하는것을 의미합니다. 이런 분석 작업을 통하여 데이터를 검증하게 됩니다.
데이터 검증은 데이터의 품질을 높이기 위하여, 데이터의 오류를 파악하여 수정하는 것을 의미합니다. 데이터 분석에서 얻은 정보들을 기반으로 데이터 검증이 이루어 집니다.
데이터 분석과 검증을 통하여 유효하지 않거나, 유실된 데이터를 처리해야만 데이터의 품질이 좋아질 수 있습니다.
데이터의 품질을 높이기 위해서는 다음과 같은 여러 가지 기능을 사용합니다.
기본 사항(선택, 필터, 중복 제거 등)
샘플링(균형, 계층화 등)
데이터 파티셔닝(학습 세트 + 검증 세트 + 테스트 세트)
변환(일반화, 표준화, 스케일링, 피벗, 등)
Binning (결측값 처리 등)
타이타닉 데이터 분석 및 검증
먼저 타이타닉 데이터에 대해 이해하기 위하여, 캐글에서 제공하고 있는 Data Dictionary 와 Variable Notes를 살펴 보겠습니다. Data 탭의 Data Description 부분을 펼치면 다음과 같은 내용을 확인할 수 있습니다.
Data Dictionary
변수 노트
각 변수에 대해서 좀 더 자세한 내용이 적혀 있습니다.
pclass :사회 경제적 지위 (SES)
1st = Upper
2nd = Middle
3rd = Lower
age : 나이가 1보다 작은 경우는 분수입니다. 추정된 나이일 경우에는 xx.5 형식입니다.
sibsp : 데이터 세트는 다음과 같은 방식으로 가족 관계를 정의합니다.
형제 자매 = 형제, 자매, 의붓 형제, 이복 누이
배우자 = 남편, 아내 (정부와 약혼자는 무시 합니다.)
parch : 데이터 세트는 다음과 같은 방식으로 가족 관계를 정의합니다.
부모 = 어머니, 아버지
아이 = 딸, 아들, 의붓 딸, 의붓 아들
보모와 같이 여행한 어린이는 parch=0 으로 처리합니다.
이제 각 변수들에 대한 의미를 알게되었으니, 주어진 데이터에 대해 간략하게 살펴보겠습니다.
캐글에서 제공하는 타이타닉 데이터 세트는 분석 모델을 만드는데에는 아직 적합하지 않습니다. 결측 값들이 존재하고, 학습에 사용하기 어려운 문자열 값들이 존재하고 있습니다. 이러한 값들을 잘 처리하여야만, 머신 러닝 알고리즘을 적용할 때 최상의 결과를 얻을 수 있습니다.
주피터 노트북을 생성합니다. 노트북의 위치는 ~/workspace/titanic 입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/titanic 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.
주피터 노트북을 이용하여 데이터를 분석 및 검증해 보겠습니다.
판다스를 이용해서 데이터를 읽어 오겠습니다.
In []:
import pandas as pd
train = pd.read_csv('~/workspace/titanic/kaggle/train.csv')
test = pd.read_csv('~/workspace/titanic/kaggle/test.csv')
head() 메소드를 이용하여 학습용 데이터를 조회해 봅니다.
In []:
train.head()
Cabin 에 Nan(Not a Number) 가 존재하는 것을 알 수가 있습니다.
테스트 데이터도 조회해 봅니다.
In[]:
test.head()
테스트 데이터에는 Survived 컬럼이 없다는 것을 알 수 있습니다. 파일명이 test 이지만 테스트에는 사용할 수 없습니다. 이 데이터는 캐글에 예측 결과값을 제출할때 사용하는 입력 데이터 입니다.
Non-Null 의 값들이 다른 것을 알 수 있습니다. 즉 Null 인 값들이 존재한다는 것입니다.
Null 값들의 개수를 조회해 보겠습니다.
train.isnull().sum()
정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.
PassengerId 0
Survived 0
Pclass 0
Name 0
Sex 0
Age 177
SibSp 0
Parch 0
Ticket 0
Fare 0
Cabin 687
Embarked 2
dtype: int64
Age 와 Cabin 그리고 Embarked 에 결측값이 존재하는 것을 알 수 있습니다.
데이터에 대한 간단한 통계 정보를 보고 싶으면 describe() 메소드를 사용할 수 도 있습니다.
train.describe(include='all')
정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.
조회해 본 결과를 통해서 알 수 있듯이, Age, Cabin, Embarked 는 결측값이 존재하는 것을 확인할 수 있습니다. 그리고 여러 유형의 문자열 데이터가 존재하는 것도 확인할 수 있습니다.
데이터는 크게 숫자형 데이터(Numerical Type)와 범주형 데이터(Categorical Type)로 나눌 수 있습니다. 숫자형 데이터는 연속성을 가지는 숫자로 이루어진 데이터를 의미합니다. 예제에서는 Age와 Fare 같은 것이 여기에 속합니다. 범주형 데이터는 연속적이지 않는 값을 갖는 데이터를 의미합니다. 대부분의 경우 문자형 데이터가 여기에 속합니다. 하지만, 어떤 경우에는 숫자형 데이터도 개념적으로 범주형으로 처리해야 할 경우도 있습니다. 예제에서는 Sex, Embarked, Pclass 를 범주형 데이터라고 볼 수 있습니다. Pclass 의 경우 숫자형 데이터로 보이지만, 개념적으로 범주형으로 처리하겠습니다.
데이터 변환 (Data Transformation )
머신 러닝 모델을 학습할 때 사용할 수 있도록 데이터를 변환하고, 결측값들을 처리하도록 하겠습니다
범주형 데이터 변환
One-Hot Encoding 을 사용하겠습니다. 판다스에서는 get_dummies() 메소드를 이용하면 One-Hot Encoding 을 손쉽게 할 수 있습니다.
One-Hot Encoding 은 문자를 숫자로 바꾸어 주는 방법 중의 하나로서, 가변수(dummy variable)을 0과 1로 이루어진 가변수를 만들어 주는 것입니다. 1은 있다는 것을, 0은 없다는것을 나타냅니다.
예를 들어 과일이라는 컬럼이 있습니다. 해당 컬럼은 사과, 바나나, 체리라는 세가지 종류의 값을 가지고 있습니다. 이 값을 One-Hot Encoding 할 경우 사과라는 값은 “1,0,0” 같은 형태로 변환시킬수 있습니다.
그림 출처 : (?)
판다스의 get_dummies() 메소드를 이용하여 Sex, Embarked, Pclass 를 One-Hot Encoding 하겠습니다.
결측값들을 처리하는 방법은 크게 두 가지가 있습니다. 결측값이 포함된 데이터를 삭제하거나, 다른값으로 치환하는 것입니다. 판다스에서 결측값이 포함된 데이터를 삭제하고 싶으면 dropna() 메소드를 사용하면 됩니다. 만약 다른값으로 치환하고 싶다면 fillna() 메소드를 사용하면 됩니다.
Age 의 결측값을 처리해 보겠습니다. 간단하게 생존자와 사망자의 나이 평균값을 구한다음, 그 값으로 치환하겠습니다.
Cabin 즉 객실 번호는 생존 여부에 영향을 미칠 수 있을거 같습니다. 하지만 결측값이 너무 많기 때문에, 예제에서는 값들을 사용하지 않도록 하겠습니다. Name 과 Ticket 도 삭제하도록 하겠습니다. 좋은 모델을 만드는게 목적이 아니라, Kubeflow 파이프라인을 만드는것이 목적이므로, 과감히 삭제 하도록 하겠습니다.
모델의 성능은 데이터의 품질에 좌우됩니다. 그래서 위의 상관 그래프나 컬럼별 통계분포 등을 파악하여 보다 많은 데이터의 정제 과정을 거쳐야 합니다. 그리고 피쳐 엔지니어링 과정을 통해서 데이터의 품질을 높여야합니다. 하지만 해당 내용들은 이 책의 범위에서 벗어나기 때문에 다루지 않겠습니다.
모델 학습 (Train)
준비한 데이터를 이용하여 모델을 학습해 보겠습니다.
먼저, 모델 학습을 위하여 데이터를 학습 세트와 테스트 세트로 나누겠습니다. sklearn의 train_test_split() 를 사용하면, 간단히 나눌 수 있습니다.
In []:
from sklearn.model_selection import train_test_split
X = train.drop('Survived', axis=1)
y = train['Survived']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=30)
sklearn 의 LogisticRegression 을 사용하여, 모델을 학습 시켜 보겠습니다.
In []:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
model = LogisticRegression()
model.fit(X_train, y_train)
prediction = model.predict(X_test)
cr = classification_report(y_test, prediction, output_dict=True)
print('accuracy =', cr['accuracy'])
정상적으로 실행되면, 다음과 같은 결과를 확인할 수 있습니다.
accuracy = 0.7892376681614349
이제 캐글에서 제공한 test 데이터를 가지고 예측해 보도록 하겠습니다.
pred = model.predict(test)
예측한 결과물을 가지고, 캐글에 제공할 submission.csv 파일을 생성해 보겠습니다.
캐글의 “My Submissions” 탭을 클릭하면, 제출한 내용들을 확인할 수 있습니다.
파이프라인 만들기
앞서 작성한 코드들을 바탕으로 하여 파이프라인을 구성해 보겠습니다. 각각의 단계를 컴포넌트로 구성한 다음, 파이프라인을 작성하고 실행해 보겠습니다.
파이프라인의 단계는 다음과 같습니다.
데이터 다운로드 : 캐글에서 데이터를 다운로드 합니다.
데이터 압축풀기 : 캐글에서 다운로드한 데이터의 압축을 풉니다.
데이터 변환 : train.csv 과 test.csv 의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다.
모델 학습 : 변환된 train.csv 데이터를 이용하여 모델을 학습니다.
예측 : 변환된 test.csv 데이터를 이용하여 예측합니다. 그리고 예측한 결과를 submission.csv 파일로 저장합니다.
캐글 제출 : 생성된 submission.csv 파일을 캐글에 제출합니다.
파이프라인의 전체적인 흐름은 다음과 같습니다.
기본 이미지 만들기
데이터 변환, 모덱 학습, 예측 컴포넌트에서 사용할 기본 이미지를 만들어 보겠습니다. 파이프라인에서도 필요한 이미지를 빌드할 수 있지만, 기본 이미지를 만들어서 사용하는게 더 효율적입니다. docker 명령어를 이용하여, 컨테이너 이미지를 빌드하기 때문에, docker 명령어가 실행 가능한 곳에서 작업을 해야합니다.
먼저 베이스 이미지 디렉토리를 만듭니다.
mkdir -p ~/workspace/base/sklearn
cd ~/workspace/base/sklearn
필요한 파이썬 패키지 목록을 requirements.txt 파일로 작성합니다.
requirements.txt
scikit-learn
joblib
numpy
pandas
fire
컨테이너 이미지 빌드를 위하여 Dockerfile 파일을 작성합니다. 파이썬 패키지 목록인 requirements.txt 파일을 추가하고, 해당 파일을 이용하여 패키지를 설치합니다.
Dockerfile
FROM python:3.6-slim
WORKDIR /app
COPY requirements.txt /app
RUN pip --no-cache-dir install -r requirements.txt
컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하기 위하여 build_image.sh 파일을 작성합니다.
build_image.sh 파일을 실행하여, 컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하겠습니다.
chmod +x build_image.sh
./build_image.sh
컴포넌트 만들기
캐글 데이터 다운로드 컴포넌트
캐글에서 데이터 다운로드하는 컴포넌트를 만들어 보겠습니다. docker 명령어를 이용하여, 컨테이너 이미지를 빌드하기 때문에, docker 명령어가 실행 가능한 곳에서 작업을 해야합니다.
먼저 컴포넌트 디렉토리를 만듭니다.
mkdir -p ~/workspace/components/kaggle/competitions_download
mkdir -p ~/workspace/components/kaggle/competitions_download/src
cd ~/workspace/components/kaggle/competitions_download
컴포넌트 디렉토리의 하위 src 디렉토리에, 컴포넌트에서 사용할 애플리케이션 코드인 [download.py](<http://download.py>) 파일을 작성합니다. kaggle 명령어를 실행하여, 데이터를 다운로드 합니다. 이미 데이터가 저장 경로에 존재하면 다운로드 하지 않습니다. 데이터를 다운로드 했는지 여부를 출력 결과값으로 반환합니다.
src/download.py
from __future__ import absolute_import, division, print_function, unicode_literals
import argparse
import os
import subprocess
from distutils.util import strtobool
parser = argparse.ArgumentParser()
parser.add_argument('--competition', required=True, type=str)
parser.add_argument('--path', default='.', type=str)
parser.add_argument('--force', default='False', type=strtobool)
parser.add_argument('--downloaded', default='/tmp/outputs/downloaded', type=str)
args = parser.parse_args()
if not os.path.exists(args.path):
os.makedirs(args.path)
kaggle_args = ['kaggle', 'competitions', 'download', '--path', args.path]
if args.force:
kaggle_args.append('--force')
kaggle_args.append(args.competition)
print(kaggle_args)
result = subprocess.check_output(kaggle_args, encoding='utf-8')
print('result:', result)
downloaded = False
if 'Downloading' in result:
downloaded = True
print('downloaded:', downloaded)
if not os.path.exists(os.path.dirname(args.downloaded)):
os.makedirs(os.path.dirname(args.downloaded))
with open(args.downloaded, 'w') as writer:
writer.write(str(downloaded))
컴포넌트 디렉토리에 컴포넌트 설정 정보가 있는 component.yaml 파일을 작성합니다.
component.yaml
name: Kaggle - Competitions downloader
description: Download competition files
inputs:
- {name: competition, type: String, description: 'Competition URL suffix'}
- {name: path, type: String, default: '.', description: 'Folder where file(s) will be download, defaults to current working directory'}
- {name: force, type: String, default: 'False', description: 'Skip check whether local version of files is up to date, force file download'}
outputs:
- {name: downloaded, type: String, description: 'Downloaded'}
implementation:
container:
image: kangwoo/kaggle-competitions-download@sha256:e0c585eaa50d880a0e0ab2245077c9ec487ffc7c5b8c910c7a88798314d6eab9
command: ['python', 'download.py']
args: [
--competition, {inputValue: competition},
--path, {inputValue: path},
--force, {inputValue: force},
--downloaded, {outputPath: downloaded},
]
컨테이너 이미지 빌드를 위하여 Dockerfile 파일을 작성합니다. 파이썬을 기본 이미지로 사용하고 있으며, 앞서 작성한 src/download.py 파일을 추가합니다.
Dockerfile
ARG BASE_IMAGE_TAG=3.6-slim
FROM python:$BASE_IMAGE_TAG
RUN pip install kaggle
WORKDIR /app
ADD src/download.py /app/
ENTRYPOINT ['python', 'download.py']
컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하기 위하여 build_image.sh 파일을 작성합니다.
build_image.sh
#!/bin/bash -e
image_name=kangwoo/kaggle-competitions-download
image_tag=0.0.1
full_image_name=${image_name}:${image_tag}
base_image_tag=3.6-slim
cd "$(dirname "$0")"
docker build --build-arg BASE_IMAGE_TAG=$base_image_tag -t "$full_image_name" .
docker push "$full_image_name"
# Output the strict image name (which contains the sha256 image digest)
# This name can be used by the subsequent steps to refer to the exact image that was built even if another image with the same name was pushed
image_name_with_digest=$(docker inspect --format="{{index .RepoDigests 0}}" "$full_image_name")
strict_image_name_output_file=./versions/image_digests_for_tags/$image_tag
mkdir -p "$(dirname "$strict_image_name_output_file")"
echo $image_name_with_digest | tee "$strict_image_name_output_file"
build_image.sh 파일을 실행하여, 컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하겠습니다.
chmod +x build_image.sh
./build_image.sh
생성한 로컬 개발 환경의 ~/workspace/components/kaggle/competitions_downloader/component.yaml 파일을, 주피터 노트북의 ~/workspace/components/kaggle/competitions_downloader/component.yaml 로 복사합니다.
캐글 제출 컴포넌트
예측 결과를 캐글에 제출하는 컴포넌트를 만들어 보겠습니다. docker 명령어를 이용하여, 컨테이너 이미지를 빌드하기 때문에, docker 명령어가 실행 가능한 곳에서 작업을 해야합니다.
컨테이너 이미지 빌드를 위하여 Dockerfile 파일을 작성합니다. 파이썬을 기본 이미지로 사용하고 있으며, 앞서 작성한 src/submit.py 파일을 추가합니다.
Dockerfile
ARG BASE_IMAGE_TAG=3.6-slim
FROM python:$BASE_IMAGE_TAG
RUN pip install kaggle
WORKDIR /app
ADD src/submit.py /app/
ENTRYPOINT ['python', '/app/submit.py']
컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하기 위하여 build_image.sh 파일을 작성합니다.
build_image.sh
#!/bin/bash -e
image_name=kangwoo/kaggle-competitions-submit
image_tag=0.0.1
full_image_name=${image_name}:${image_tag}
base_image_tag=3.6-slim
cd "$(dirname "$0")"
docker build --build-arg BASE_IMAGE_TAG=$base_image_tag -t "$full_image_name" .
docker push "$full_image_name"
# Output the strict image name (which contains the sha256 image digest)
# This name can be used by the subsequent steps to refer to the exact image that was built even if another image with the same name was pushed
image_name_with_digest=$(docker inspect --format="{{index .RepoDigests 0}}" "$full_image_name")
strict_image_name_output_file=./versions/image_digests_for_tags/$image_tag
mkdir -p "$(dirname "$strict_image_name_output_file")"
echo $image_name_with_digest | tee "$strict_image_name_output_file"
build_image.sh 파일을 실행하여, 컨테이너 이미지를 빌드하고, 컨테이너 레지스트리에 푸시하겠습니다.
chmod +x build_image.sh
./build_image.sh
생성한 로컬 개발 환경의 ~/workspace/components/kaggle/competitions_submit/component.yaml 파일을, 주피터 노트북의 ~/workspace/components/kaggle/competitions_submit/component.yaml 로 복사합니다.
컴포넌트 작성하기
주피터 노트북을 생성합니다. 파일 이름은 titanic_pipeline.ipynb 입니다. 노트북의 위치는 ~/workspace/titanic입니다. 앞으로 작성할 예제에서는 노트북이 ~/workspace/titanic 에 위치하고 있다고 가정하여, 파일 경로를 설정할 것입니다.
패키지 추가
파이프라인과 컴포넌트를 사용하기 위한 패키지를 추가합니다.
In []:
import os
import kfp
from kfp import dsl
from kfp import onprem
from kfp import components
from kfp.components import func_to_container_op, InputPath, OutputPath
from kubernetes.client.models import V1Volume
from kubernetes.client.models import V1SecretVolumeSource
from kubernetes.client.models import V1VolumeMount
캐글 데이터 다운로드 컴포넌트
캐글에서 데이터를 다운로드 하는 컴포넌트를 만들어 보겠습니다. 이 기능은 필요에 따라 다른 곳에서도 사용기 가능할거 같기 때문에, 재사용 컴포넌트로 만들겠습니다.
train.csv 과 test.csv 의 데이터를 머신러닝의 학습에 사용할 수 있도록 변환합니다. 캐글 데이터의 경로를 input_path 파라미터로 입력 받아 데이터를 변환합니다. 데이터 변환에 쓰이는 코드들은 앞에서 작성한 코드와 동일합니다. 변환할 데이터를 저장할 경로를 output_path 파라미터로 입력 받아 변환된 데이터를 저장합니다. 컨테이너 이미지를 빠르게 빌드하기 위하여, 필요한 패키지가 포함된 기본 이미지를 미리 만들어서 사용하였습니다.
변환된 train.csv 데이터를 이용하여 모델을 학습니다. 변환된 데이터의 경로를 path 파리미터로 입력받에 데이터를 읽어옵니다. 그리고 데이터를 학습 세트와 테스트 세트로 나눈다음, 모델을 학습니다. 사용할 모델 이름은 model_name 파라미터로 입력 받게 하였습니다. 학습된 모델의 분류 결과를 classification_report.json 파일로 저장하고, 학습된 모델을 model.joblib 파일로 저장하였습니다. 각 파일든은 export_path 파라미터로 입력하 경로에 저장됩니다.
In []:
def train_op(input_path, model_name, model_path):
import os
import joblib
import pandas as pd
train = pd.read_csv(os.path.join(input_path, 'train.csv'))
print(train.info())
from sklearn.model_selection import train_test_split
X = train.drop('Survived', axis=1)
y = train['Survived']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=30)
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
model = LogisticRegression()
model.fit(X_train, y_train)
prediction = model.predict(X_test)
cr = classification_report(y_test, prediction, output_dict=True)
output_path = os.path.join(model_path, model_name)
if not os.path.exists(output_path):
os.makedirs(output_path)
mode_file = os.path.join(output_path, 'model.joblib')
joblib.dump(model, mode_file)
cr_file = os.path.join(output_path, 'classification_report.csv')
cr_df = pd.DataFrame(cr).transpose()
cr_df.to_csv(cr_file)
train_op = components.func_to_container_op(train_op, base_image='kangwoo/sklearn:0.0.1')
예측 컴포넌트
변환된 test.csv 데이터를 이용하여 예측합니다. 그리고 예측한 결과를 submission.csv 파일로 저장합니다.
In []:
def predict_op(model_path, test_data_path, submission_path):
import argparse
import json
import os
import joblib
import pandas as pd
reports = {}
model_names = os.listdir(model_path)
for model_name in model_names:
file = os.path.join(model_path, model_name, 'classification_report.csv')
print(file)
if os.path.isfile(file):
with open(file) as csv_file:
report = pd.read_csv(csv_file, index_col=0)
reports[model_name] = report
print('{} found'.format(len(reports)))
for item in reports.items():
print('item :', item)
print('{} : accuracy={}'.format(item[0], item[1].loc['accuracy']['f1-score']))
def score(x):
return reports[x].loc['accuracy']['f1-score']
best_model = max(reports.keys(), key=score)
print('Best model is', best_model, reports[best_model].loc['accuracy']['f1-score'])
model = joblib.load(os.path.join(model_path, best_model, 'model.joblib'))
print(model)
test = pd.read_csv(os.path.join(test_data_path, 'test.csv'))
pred = model.predict(test)
submission_file = os.path.join(submission_path, 'submission.csv')
if not os.path.isdir(os.path.dirname(submission_file)):
os.makedirs(os.path.dirname(submission_file))
submission = pd.DataFrame({'PassengerId': test['PassengerId'], 'Survived': pred})
submission.to_csv(submission_file, index=False)
print("Saved submission :", submission_file)
predict_op= components.func_to_container_op(predict_op, base_image='kangwoo/sklearn:0.0.1')
$ ip addr
...
3: docker0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc noqueue state DOWN group default
link/ether 02:42:6c:3d:0f:04 brd ff:ff:ff:ff:ff:ff
inet 172.17.0.1/16 brd 172.17.255.255 scope global docker0
valid_lft forever preferred_lft forever
이 브리지는 컨테이너를 기본 브리지 모드로 실행할 때 사용되면, CIRD 표기법으로 172.17.0.1/16 의 주소 범위를 가지고 있습니다. 172.17.0.1 부터 172.17.255.254 까지의 아이피를 사용할 수 있습니다. 그래서 컨테이너가 기본 브리지 모드로 실행될 때, 해당 범위에서 아이피를 할당받습니다.
만약 이 범위를 변경하고 싶다면, 도커 설정 파일인 /etc/docker/daemon.json 에 "bip" 항목을 추가 하면 됩니다.
Bridge Mode Networking
Docker는 연결된 다른 네트워크 인터페이스 간에 패킷을 자동으로 전달하는 가상 이더넷 브리지인 docker0을 생성합니다. 기본적으로 호스트의 모든 컨테이너는 이 브리지를 이용하여 내부 네트워크에 연결이 됩니다. 이 모드는 컨테이너를 분리된 네트워크 네임스페이스에 배치하고, 네트워크 주소 변환을 사용하여 여러 컨테이너 간에 호스트의 외부 IP 주소를 공유합니다.
브리지 모드 네트워킹은 동일한 호스트에서 여러 컨테이너를 실행할 때 네트워크 포트 충돌을 일으키지 않습니다. 즉, 동일한 포트를 사용하는 다수의 컨테이너를 하나의 호스트에서 실행할 수 있습니다. 각 컨테이너는 호스트와 분리된 전용 네트워크 네임스페이스를 소유하고 있습니다. 그래서 이 모드는 NAT의 사용으로 인해 네트워크 처리량과 지연 시간에 영향을 미치고, 호스트와 컨테이너 간의 네트워크 포트 매핑을 제어해야하는 단점이 있습니다.
컨테이너가 생성되면, 해당 컨테이너를 위해서 페어 인터페이스(pair interfaces)가 생성됩니다. 이 인터페이스들은 두 개가 한 쌍으로 구성되어 있는데, 마치 직접 연결된 것 처럼 서로 패킷을 주고 받습니다.
컨테이너가 생성되면, 페어 인터페이스의 한쪽은 컨테이너 내부 네임스페이스에 eth0 이라는 이름으로 할당됩니다. 나머자 하나는 vethXXXX 라는 이름으로 docker0 브리지에 바인딩 됩니다.
컨테이너를 실행할 때 브리지 네트워킹 모드를 사용하려면 별다른 설정을 추가할 필요 없습니다. 기본값이 브리지 네트워킹 모드이기 때문입니다.
docker run -i -t --rm --name network_bridge ubuntu:18.04
정상적으로 실행되면, 쉘이 나타나고, 명령어를 입력할 수 있습니다. 우분투 이미지에서 네트워크 관련 도구가 설치되어 있지 않기 때문에, 필요한 도구들을 설치해 줍니다.
docker run -i -t --name mybridge_container --net mybrdige ubuntu:18.04
Host Mode Networking
호스트 모드는 컨테이너가 호스트의 네트워킹 네임스페이스를 공유하고 있으며, 외부 네트워크에 직접 노출됩니다. 호스트의 IP 주소와 호스트의 TCP 포트 공간을 사용하여, 컨테이너 내부에서 실행 중인 서비스를 노출합니다.
컨테이너를 실행할 때 호스트 네트워킹 모드를 사용하려면 다음과 같이 --net=host 라고 설정하면 됩니다.
$ docker run -i -t --rm --net=host --name network_host ubuntu:18.04
이 네트워킹 모드는 간단하기 때문에, 개발자가 이해하기 쉽고, 사용하기 쉽습니다. 하지만 호스트 네트워크를 그대로 사용하기 때문에 동일한 네트워크 포트를 사용할 경우 충돌이 발생합니다. 동일한 포트를 사용하는 다수의 컨테이너를 하나의 호스트에서 실행할 경우, 포트 충돌이 발생하여 서비스가 시작되지 않을 수 있습니다.