Kubeflow 1.0.0에서 InferenceService 의 상태 갱신이 실패할때

Kubeflow 1.0.0 이나 1.0.1에서 kfserving이 정상적으로 작동하지 않는 분을 위해서 참고삼아 적어 봅니다.

Kubeflow 설치 환경이 워낙 다양하고, 설치 환경 파일도 다양하기 때문에 이 글이 의미가 없을 수도 있습니다.

그리고, 이 방법은 정상적인 방법이 아니기 때문에, 단순히 테스트 삼아 KFServing을 사용하는게 목적일때만 사용하시기 바랍니다.

혹시 업그레이드가 가능하신 분들은 최신 버전으로 업데이트해 보시기 바랍니다.


KFServing는 InferenceService 라는 사용자 리소스를 사용합니다. InferenceService 는 정의한 추론 서버를 실행시킨 다음, 서버가 정상적으로 구동되었는지를 체크하는데, 환경에 따라서 이 상태 체크가 정상적으로 안되는 문제가 있습니다. 그 이유는 istio 때문입니다. Kubeflow에서 인증/권한을 위해서 istio를 사용합니다. InferenceService 를 관리하는 컨트롤러에서 상태 체크를 위해서 URL을 호출하는데, istio에서 인증/권한 체크 부분 때문에 200 OK가 반환되지 않는 문제가 생기는 것이죠. 그래서 실패한 상태로 인식이 되고, InferenceService 가 정상작동하지 않습니다.

이 문제를 제대로 해결하기 위해서는, Kubeflow에서 해결해 줄때까지 기다리던지, 아니면, kfserving + istio 전문가를 부르면 됩니다.

하지만 그럴 여건이 안되거나, 단순히 취미 생활로 KFServing 을 사용할 예정이라면, 다음과 같이 작동하게는 바꿀 수 있습니다. (권장하는 방법은 아닙니다.)

KFServing 에서 사용하는 istio-ingressgateway를 만들고, 보안 설정 부분을 삭제하는 것입니다.

먼저 kfserving-ingressgateway 을 생성합니다. 만약 생성되어 있다면 무시하시면 됩니다. 보통 istio-system 네임스페이스나, knative-serving 네임스페이스 존재할 수 있습니다. DeploymentService 를 확인해 보시면 됩니다.

 $ kubectl -n istio-system get deploy
NAME                       READY   UP-TO-DATE   AVAILABLE   AGE
cluster-local-gateway      1/1     1            1           54d
istio-citadel              1/1     1            1           54d
istio-galley               1/1     1            1           54d
istio-ingressgateway       1/1     1            1           54d
istio-pilot                1/1     1            1           54d
istio-policy               1/1     1            1           54d
istio-sidecar-injector     1/1     1            1           54d
istio-telemetry            1/1     1            1           54d
kfserving-ingressgateway   1/1     1            1           54d
prometheus                 1/1     1            1           54d
$ kubectl -n istio-system get service
NAME                       TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)                                                                                                                                                                                   AGE
authservice                ClusterIP      10.98.242.196    <none>        8080/TCP                                                                                                                                                                                  54d
cluster-local-gateway      ClusterIP      10.109.28.157    <none>        80/TCP,443/TCP,31400/TCP,15011/TCP,8060/TCP,15029/TCP,15030/TCP,15031/TCP,15032/TCP                                                                                                       54d
istio-citadel              ClusterIP      10.102.226.47    <none>        8060/TCP,15014/TCP                                                                                                                                                                        54d
istio-galley               ClusterIP      10.111.115.206   <none>        443/TCP,15014/TCP,9901/TCP                                                                                                                                                                54d
istio-ingressgateway       NodePort       10.103.205.239   <none>        15020:30536/TCP,80:31380/TCP,443:31390/TCP,31400:31400/TCP,15029:32168/TCP,15030:32077/TCP,15031:31505/TCP,15032:32021/TCP,15443:31546/TCP                                                54d
istio-pilot                ClusterIP      10.110.53.9      <none>        15010/TCP,15011/TCP,8080/TCP,15014/TCP                                                                                                                                                    54d
istio-policy               ClusterIP      10.106.248.16    <none>        9091/TCP,15004/TCP,15014/TCP                                                                                                                                                              54d
istio-sidecar-injector     ClusterIP      10.105.132.134   <none>        443/TCP,15014/TCP                                                                                                                                                                         54d
istio-telemetry            ClusterIP      10.105.24.245    <none>        9091/TCP,15004/TCP,15014/TCP,42422/TCP                                                                                                                                                    54d
kfserving-ingressgateway   LoadBalancer   10.101.141.37    <pending>     15020:30543/TCP,80:32380/TCP,443:32390/TCP,31400:32400/TCP,15011:30263/TCP,8060:32119/TCP,853:32180/TCP,15029:32156/TCP,15030:30674/TCP,15031:30230/TCP,15032:32563/TCP,15443:30995/TCP   54d
prometheus                 ClusterIP      10.101.81.54     <none>        9090/TCP                                                                                                                                                                                  54d

만약 없다면 생성해 줍니다.

kubeflow 네임스페이스에 있는 inferenceservice-config라는 ConfigMap을 수정해 줍니다. inferenceservice-configingress 부분의 ingressService 를 앞서 생성했거나, 존재하는 kfserving-ingressgateway 으로 수정해 줍니다. 주소 형식은 “서비스명.네임스페이스.svc.cluster.local” 입니다.

$ kubectl -n kubeflow edit cm inferenceservice-config 

apiVersion: v1
data:
...
  ingress: |-
    {
        "ingressGateway" : "knative-ingress-gateway.knative-serving",
        "ingressService" : "kfserving-ingressgateway.istio-system.svc.cluster.local"
    }
...

설정이 변경이 되었으면, 혹시 모르니 kfserving-controller-manager 를 재시작해 줍니다.

그리고, istio RBAC 설정이 담겨 있는 clusterrbacconfig 을 삭제합니다.

$ kubectl get clusterrbacconfig
$ kubectl delete clusterrbacconfig XXXX

이제 kfserving-ingressgateway 의 주소로 요청을 보낼 수 있고, 상태가 갱신되는것을 확인할 수 있습니다. 아.마.도….

KFServing – SDK를 사용하여 InferenceService 배포하기

지금까지는 쿠버네티스 리소스 매니페스트인 YAML 파일을 이용하여 InferenceService 를 생성하였습니다. 이번에는 KFServing SDK의 client 라이브러리를 이용하여, 파이썬 코드에서 InferenceService 를 생성해 보겠습니다. 편의를 위해서 주피터 노트북에서 예제를 실행하겠습니다.

Kubeflow의 주피터 노트북 서버에 접속해서 노트북을 하나 생성합니다.

KFServing SDK 설치하기

만약 주피터 노트북 서버에 KFServing SDK가 설치되어 있지 않다면, pip를 이용하여 설치합니다. 주피터 노트북 서버에서 터미널을 하나 생성해서 다음 명령어를 실행합니다.

pip install kfserving --user

KFServing SDK 를 사용하여 InferenceService 생성하기

사용할 패키지를 추가 합니다.

In [ ] :

from kubernetes import client

from kfserving import KFServingClient
from kfserving import constants
from kfserving import utils
from kfserving import V1alpha2EndpointSpec
from kfserving import V1alpha2PredictorSpec
from kfserving import V1alpha2TensorflowSpec
from kfserving import V1alpha2InferenceServiceSpec
from kfserving import V1alpha2InferenceService
from kubernetes.client import V1ResourceRequirements

InferenceService를 배포 할 네임스페이스를 정의합니다. SDK 에 있는 utils 함수를 이용하면, 현재 실행중인 노트북의 네임 스페이스 이름을 가져올 수 있습니다. 다른 네임스페이스를 사용하고 싶으면 직접 명시하면 됩니다.

In [ ] :

namespace = utils.get_default_target_namespace()
print('namespace : {}'.format(namespace))

InferenceService 정의하기

V1alpha2EndpointSpec 를 이용하여 기본 엔드포인트 스펙을 정의합니다. 그리고 V1alpha2InferenceService를 이용하여 추론 서비스를 정의합니다. 추론 서비스를 정의할 때 앞서 정의한 기본 엔드포인트 스펙을 파라미터로 추가합니다.

In [ ] :

api_version = constants.KFSERVING_GROUP + '/' + constants.KFSERVING_VERSION
default_endpoint_spec = V1alpha2EndpointSpec(
                          predictor=V1alpha2PredictorSpec(
                            tensorflow=V1alpha2TensorflowSpec(
                              storage_uri='gs://kfserving-samples/models/tensorflow/flowers',
                              resources=V1ResourceRequirements(
                                  requests={'cpu':'100m','memory':'1Gi'},
                                  limits={'cpu':'100m', 'memory':'1Gi'}))))
    
isvc = V1alpha2InferenceService(api_version=api_version,
                          kind=constants.KFSERVING_KIND,
                          metadata=client.V1ObjectMeta(
                              name='flower-sample', namespace=namespace),
                          spec=V1alpha2InferenceServiceSpec(default=default_endpoint_spec))

InferenceService 생성하기

KFServingClient를 호출하여 InferenceService를 작성합니다.

In [ ] :

KFServing = KFServingClient()
KFServing.create(isvc)

InferenceService 조회하기

KFServingClient의 get() 메소드를 호출하여, 생성한 InferenceService를 조회합니다.

In [ ] :

KFServing.get('flower-sample', namespace=namespace, watch=True, timeout_seconds=120)

InferenceService에 카나리아 추가하기

카나리아 엔드포인트 스펙을 정의한 다음, 10% 의 트래픽을 카나리아 버전으로 롤아웃합니다.

In [ ] :

canary_endpoint_spec = V1alpha2EndpointSpec(
                         predictor=V1alpha2PredictorSpec(
                           tensorflow=V1alpha2TensorflowSpec(
                             storage_uri='gs://kfserving-samples/models/tensorflow/flowers-2',
                             resources=V1ResourceRequirements(
                                 requests={'cpu':'100m','memory':'1Gi'},
                                 limits={'cpu':'100m', 'memory':'1Gi'}))))

KFServing.rollout_canary('flower-sample', canary=canary_endpoint_spec, percent=10,
                         namespace=namespace, watch=True, timeout_seconds=120)

InferenceService의 카나리아로 더 많은 트래픽 롤아웃

카나리아 버전의 트래픽을 50 %까지 증가시킵니다.

In [ ] :

KFServing.rollout_canary('flower-sample', percent=50, namespace=namespace,
                         watch=True, timeout_seconds=120)

카나리아를 기본으로 승격하기

카나리아 버전을 기본 버전, 즉 기본 엔드포인트로 승격 시킵니다.

In [ ] :

KFServing.promote('flower-sample', namespace=namespace, watch=True, timeout_seconds=120)

InferenceService 를 삭제합니다.

사용한 InferenceService를 삭제합니다.

In [ ] :

KFServing.delete('flower-sample', namespace=namespace)

KFServing – Canary Rollout

KFServing은 카나리아 롤아웃 기능을 제공하고 있습니다.

Canary Rollout 은 새로운 버전의 모델을 배포할 때, 소규모의 사용자들에게만 먼저 제공함으로써 위험을 빠르게 감지할 수 있는 배포 전략입니다. Canary Rollout 의 장점은 문제 발생 시 언제든지 안전하게 롤백할 수 있고, 이를 바탕으로 운영 환경에서 신규 버전을 테스트할 수 있다는 것이다. 부하를 서서히 증가시키면서 신규 버전이 운영 환경에서 어떤 반응을 보이는지 모니터링하고 수치를 측정하는 것도 가능합니다. 성능 테스트가 실제 운영 환경에서 이루어지기 때문에, 완전히 분리된 다른 성능 테스트 환경을 만들 필요가 없습니다.

먼저 Tensorflow를 사용하는 InferenceService 를 생성합니다. 자세한 내용은 “KFServing InferenceService 배포와 예측 – Tensorflow“를 참고하시기 바랍니다.

InferenceService 를 정의합니다.

tensorflow.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "tensorflow-mnist"
spec:
  default:
    predictor:
      tensorflow:
        storageUri: "pvc://kfserving-models-pvc/models/tensorflow/mnist/"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl apply -f tensorflow-canary.yaml 

다음은 InferenceService를 조회하는 명령어입니다.

kubectl -n admin get inferenceservice

InferenceService가 생성되었고, DEFAULT TRAFFIC이 100인 것을 확인할 수 있습니다.

NAME                     URL                                                                                READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
tensorflow-mnist         <http://tensorflow-mnist.admin.example.com/v1/models/tensorflow-mnist>               True    100                                10m

카나리아 롤아웃을 적용하기 위해, InferenceService 를 수정합니다. canaryTrafficPercent 필드를 이용하여 카라니아로 트래픽의 10%를 보냅니다. 그리고 canary 필드에 사용할 predictor 를 정의합니다.

tensorflow-canary.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "tensorflow-mnist"
spec:
  default:
    predictor:
      tensorflow:
        storageUri: "pvc://kfserving-models-pvc/models/tensorflow/mnist/"
  canaryTrafficPercent: 10
  canary:
    predictor:
      tensorflow:
        storageUri: "pvc://kfserving-models-pvc/models/tensorflow/mnist/"

수정한 InferenceService 를 적용합니다.

kubectl -n admin apply -f tensorflow-canary.yaml 

트래픽 분할 백분율이 올바르게 적용 되었는지 확인하려면 다음 명령을 사용하십시오.

kubectl -n admin get inferenceservice

DEFAULT TRAFFIC이 90, CANARY TRAFFIC 이 10으로 변경 된 것을 확인할 수 있습니다.

NAME                     URL                                                                                READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
tensorflow-mnist         <http://tensorflow-mnist.admin.example.com/v1/models/tensorflow-mnist>               True    90                10               17m

Knative CLI (kn) 를 사용하여, 다음 명령어를 실행하면 라우트를 조회 할 수 있습니다.

kn -n admin route list

정상적으로 실행되면, 다음과 같은 응답 결과를 얻을 수 있습니다.

NAME                                 URL                                                           READY
tensorflow-mnist-predictor-canary    <http://tensorflow-mnist-predictor-canary.admin.example.com>    True
tensorflow-mnist-predictor-default   <http://tensorflow-mnist-predictor-default.admin.example.com>   True

KFServing – 클라우드 저장소를 이용하여 InferenceService 배포와 예측

S3에 저장된 모델로 InferenceService 배포와 예측하기

S3에 저장된 모델을 이용하여 InferenceService 를 생성해 보겠습니다.

S3 Secret 을 샐성하고 서비스 계정에 추가하기

InferenceService 에서 S3에 접속하여 모델을 가져오려면 접속 정보가 필요합니다. KFServing은 InferenceService에 정의된 서비스 계정의 Secret을 이용하여 접속 정보를 가져옵니다. 그래서 S3 접속 정보를 쿠버테티스 Secret 으로 저장해야합니다. Secret 에는 기본적으로 awsAccessKeyIDawsSecretAccessKey 가 포함되어야 합니다. KFServing은 S3 엔드포인트와 https 사용 여부 등의 S3 옵션을 Secret의 어노테이션으로 설정할 수 있는 기능을 제공합니다. 그래서 변경이 필요한 S3 옵션이 있을 경우에는 어노테이션을 추가 하면 됩니다.

다음은 Kubeflow에서 제공하는 minio를 s3 저장소로 사용하는 Secret 매니페스트 입니다. data 섹션에 있는 awsAccessKeyIDawsSecretAccessKey 필드의 값은 BASE64 로 인코딩 된 값을 사용해야합니다.

apiVersion: v1
kind: Secret
metadata:
  name: s3-secret
  annotations:
     serving.kubeflow.org/s3-endpoint: minio-service.kubeflow:9000 # replace with your s3 endpoint
     serving.kubeflow.org/s3-usehttps: "0" # by default 1, for testing with minio you need to set to 0
     serving.kubeflow.org/s3-verifyssl: "0" # by default 1, for testing with minio you need to set to 0
type: Opaque
data:
  awsAccessKeyID: bWluaW8=
  awsSecretAccessKey: bWluaW8xMjM=

맥이나 리눅스를 사용하고 있다면, 다음 명령어로 BASE64 인코딩을 할 수 있습니다.

echo -n 'minio' | base64

KFServing은 서비스 계정에서 Secret을 가져옵니다. 그렇기 때문에, 생성한 Secret 을 서비스 계정의 secrets 필드에 추가해야 합니다. 만약 기존에 생성했던 Secret 이 있다면, 기존것을 사용해도 됩니다. KFServing은 기본적으로 default 서비스 계정을 사용합니다. 예제에서는 새로운 서비스 계정을 생성한 후, InferenceService 매니페스트에 사용할 서비스 계정을 지정해 주겠습니다.

다음은 서비스 계정을 만드는 매니페스트입니다. 앞서 생성한 s3-secret 을 추가하였습니다.

s3-sa.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: s3-sa
secrets:
- name: s3-secret

admin 네임스페이스에 서비스 계정을 생성하겠습니다.

kubectl -n admin apply -f s3-sa.yaml

텐서플로우 mnist 모델을 학습하고 S3에 저장하기

InferenceService에서 사용할 mnist 모델을 S3에 저장하겠습니다. 코드는 앞서 사용한 Tensorflow 코드와 동일하기 때문에 자세한 설명은 생략하겠습니다.

tensorflow_mnist.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os

import tensorflow as tf


def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='s3://tensorflow/mnist/model', type=str)
    args = parser.parse_args()

    version = 1
    export_path = os.path.join(args.model_path, str(version))

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_split=0.2)

    print('\\nEvaluate on test data')
    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)

    model.save(export_path)
    print('"Saved model to {}'.format(export_path))


if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우를 기본 이미지로 사용합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD tensorflow_mnist.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 학습한 모델을 S3에 저장하기 위해서 S3 접속 정보를 추가해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

InferneceService 에서 학습한 모델을 가져올 때 S3 접속 정보가 필요한 것 처럼, Job을 실행해서 S3로 모델을 저장할 때도 S3 접속 정보가 필요합니다. S3 접속 정보는 Job의 환경 정보(Environment)로 추가해 줄 수 있습니다. 현재 Fairing 에는 Job의 환경 정보를 추가할 수 있는 손 쉬운 방법이 없기때문에, pod_spec_mutator

를 이용하여, 환경 정보를 추가하겠습니다.

다음은 S3 접속 정보를 환경 정보로 추가하는 pod_spec_mutator 코드입니다. 접속 정보를 별도로 입력 받는 것이 번거로워서, 앞서 생성한 s3-secret 에서 정보를 가져와서 환경 변수로 설정해 줍니다. 이 코드는 예외 처리가 안되어 있기 때문에 참고만 하시기 바랍니다.

s3_utils.py

from kubeflow.fairing.constants import constants
from kubernetes import client


def aws_credentials(secret_name=constants.AWS_CREDS_SECRET_NAME):
    def _add_aws_credentials(kube_manager, pod_spec, namespace):
        """add AWS credential

        :param kube_manager: kube manager for handles communication with Kubernetes' client
        :param pod_spec: pod spec like volumes and security context
        :param namespace: The custom resource

        """
        if not kube_manager.secret_exists(secret_name, namespace):
            raise ValueError('Unable to mount credentials: Secret {}} not found in namespace {}'
                             .format(secret_name, namespace))

        secret = client.CoreV1Api().read_namespaced_secret(secret_name, namespace)
        annotations = secret.metadata.annotations
        s3_endpoint = annotations['serving.kubeflow.org/s3-endpoint']
        s3_use_https = annotations['serving.kubeflow.org/s3-usehttps']
        s3_verify_ssl = annotations['serving.kubeflow.org/s3-verifyssl']

        env = [
            client.V1EnvVar(
                name='AWS_ACCESS_KEY_ID',
                value_from=client.V1EnvVarSource(
                    secret_key_ref=client.V1SecretKeySelector(
                        name=secret_name,
                        key='awsAccessKeyID'
                    )
                )
            ),
            client.V1EnvVar(
                name='AWS_SECRET_ACCESS_KEY',
                value_from=client.V1EnvVarSource(
                    secret_key_ref=client.V1SecretKeySelector(
                        name=secret_name,
                        key='awsSecretAccessKey'
                    )
                )
            ),
            client.V1EnvVar(name='S3_ENDPOINT', value=s3_endpoint),
            client.V1EnvVar(name='S3_USE_HTTPS', value=s3_use_https),
            client.V1EnvVar(name='S3_VERIFY_SSL', value=s3_verify_ssl),

        ]

        if pod_spec.containers[0].env:
            pod_spec.containers[0].env.extend(env)
        else:
            pod_spec.containers[0].env = env

    return _add_aws_credentials

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid

import s3_utils

from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils



CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'tensorflow-mnist-s3-job-{uuid.uuid4().hex[:4]}'

command = ["python", "tensorflow_mnist.py", "--model_path", "s3://tensorflow/mnist/model"]
output_map = {
    "Dockerfile": "Dockerfile",
    "tensorflow_mnist.py": "tensorflow_mnist.py"
}
fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="tensorflow-mnist",
                           dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[
                                s3_utils.aws_credentials(secret_name='s3-secret')
                            ],
                            cleanup=False, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

S3을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 tensorflow 를 사용합니다. storageUri 필드로 모델 저장 위치인 s3://tensorflow/mnist/model 를 지정해 줍니다. 그리고 serviceAccountName 필드를 이용하여, S3 접근 권한 정보를 가지고 있는 서비스 계정을 지정해 줍니다.

mnist-s3.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "mnist-s3"
spec:
  default:
    predictor:
      serviceAccountName: s3-sa
      tensorflow:
        storageUri: "s3://tensorflow/fashion-mnist/model"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f mnist-s3.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME                  URL                                                                          READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
mnist-s3              <http://mnist-s3.admin.example.com/v1/models/mnist-s3>                         True    100                                20s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

mnist-input.json

{
  "instances": [
    [
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0
    ],
...
  ]
}

다음은 admin 네임스페이스의 tensorflow-mnist InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=mnist-s3
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice mnist-s3 -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./mnist-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/${MODEL_NAME}:predict -d ${INPUT_PATH}

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> POST /v1/models/mnist-s3:predict HTTP/1.1
> Host: mnist-s3.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 9866
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 183
< content-type: application/json
< date: Sat, 04 Apr 2020 03:43:16 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 29
< 
{
    "predictions": [[0.000134299538, 1.87837134e-06, 0.000226354547, 0.00111765205, 1.1317371e-05, 3.8724811e-05, 3.75441829e-07, 0.997255147, 3.78830955e-05, 0.00117637496]
    ]
}

GCS에 저장된 모델로 InferenceService 배포와 예측하기

GCS에 저장된 모델을 이용하여 InferenceService 를 생성해 보겠습니다.

(현재 KFServing의 버그로 인해 아래 방법은 정상 작동하지 않습니다.)

GCS Secret 을 생성하고 서비스 계정에 추가하기

InferenceService 에서 GCS에 접속하여 모델을 가져오려면 접속 정보가 필요합니다. KFServing은 InferenceService에 정의된 서비스 계정의 Secret을 이용하여 접속 정보를 가져옵니다. 그래서 GCS 접속 정보를 쿠버테티스 Secret 으로 저장해야합니다. Secret 에는 기본적으로 awsAccessKeyIDawsSecretAccessKey 가 포함되어야 합니다. KFServing은 S3 엔드포인트와 https 사용 여부 등의 S3 옵션을 Secret의 어노테이션으로 설정할 수 있는 기능을 제공합니다. 그래서 변경이 필요한 S3 옵션이 있을 경우에는 어노테이션을 추가 하면 됩니다.

구글 클라우드의 JSON 서비스 계정 키를 다운로드 합니다. 이 서비스 계정은 사용할 GCS에 접근할 권한이 부여되어 있어야합니다.

다음은 서비스 계정 키를 gcp-sa-credentials.json 파일로 다운로드 하는 명령어 입니다. [SA-NAME] 에는 서비스 계정의 아이디를, [PROJECT-ID] 에는 프로젝트 아이디를 입력하면 됩니다.

gcloud iam service-accounts keys create gcp-sa-credentials.json \\
  --iam-account [SA-NAME]@[PROJECT-ID].iam.gserviceaccount.com

다운로드 받은 구글 클라우드의 서비스 계정 키를, 쿠버네티스 Secret 에 등록합니다.

다음은 admin 네임스페이스 Secret 을 생성하는 명령어 입니다. KFServing의 설정 값을 변경하지 않았다면, Secretdata에 저장되는 파일명은 gcloud-application-credentials.json 을 반드시 사용해야합니다.

kubectl -n admin create secret generic user-gcp-sa \\
  --from-file=gcloud-application-credentials.json=gcp-sa-credentials.json

KFServing은 서비스 계정에서 Secret을 가져옵니다. 그렇기 때문에, 생성한 Secret 을 서비스 계정의 secrets 필드에 추가해야 합니다. 만약 기존에 생성했던 Secret 이 있다면, 기존것을 사용해도 됩니다. KFServing은 기본적으로 default 서비스 계정을 사용합니다. 예제에서는 새로운 서비스 계정을 생성한 후, InferenceService 매니페스트에 사용할 서비스 계정을 지정해 주겠습니다.

다음은 서비스 계정을 만드는 매니페스트입니다. 앞서 생성한 s3-secret 을 추가하였습니다.

gcs-sa.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: gcs-sa
secrets:
- name: user-gcp-sa

admin 네임스페이스에 서비스 계정을 생성하겠습니다.

kubectl -n admin apply -f gcs-sa.yaml

텐서플로우 mnist 모델을 학습하고 GCS에 저장하기

InferenceService에서 사용할 mnist 모델을 GCS에 저장하겠습니다. 코드는 앞서 사용한 Tensorflow 코드와 동일하기 때문에 자세한 설명은 생략하겠습니다.

tensorflow_mnist.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os

import tensorflow as tf


def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='gcs://tensorflow/fashion-mnist/model', type=str)
    args = parser.parse_args()

    version = 1
    export_path = os.path.join(args.model_path, str(version))

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_split=0.2)

    print('\\nEvaluate on test data')
    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)

    model.save(export_path)
    print('"Saved model to {}'.format(export_path))


if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우를 기본 이미지로 사용합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD tensorflow_mnist.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 학습한 모델을 GCS에 저장하기 위해서 GCS 접속 정보를 추가해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

InferneceService 에서 학습한 모델을 가져올 때 GCS 접속 정보가 필요한 것 처럼, Job을 실행해서 GCS로 모델을 저장할 때도 GCS 접속 정보가 필요합니다. GCS 접속 정보는 Job의 환경 정보(Environment)로 추가해 줄 수 있습니다. 현재 Fairing 에는 Job의 환경 정보를 추가할 수 있는 손 쉬운 방법이 없기때문에, pod_spec_mutator

를 이용하여, 환경 정보를 추가하겠습니다.

다음은 GCS 접속 정보를 환경 정보로 추가하는 pod_spec_mutator 코드입니다. 접속 정보를 별도로 입력 받는 것이 번거로워서, 앞서 생성한 user-gcp-sa 를 볼륨 마운트 한 다음, GOOGLE_APPLICATION_CREDENTIALS 라는 환경 변수에 인증 정보가 들어 있는 json 파일의 위치를 설정해 줍니다. 이 코드는 예외 처리가 안되어 있기 때문에 참고만 하시기 바랍니다.

gcs_utils.py

import os

from kubernetes import client

GCSCredentialFileName = "gcloud-application-credentials.json"
GCSCredentialVolumeName = "user-gcp-sa"
GCSCredentialVolumeMountPath = "/var/secrets/"
GCSCredentialEnvKey = "GOOGLE_APPLICATION_CREDENTIALS"


def gcp_credentials(secret_name='user-gcp-sa', credential_file_name=GCSCredentialFileName):
    def _add_gcp_credentials(kube_manager, pod_spec, namespace):
        """add GCP credential

        :param kube_manager: kube manager for handles communication with Kubernetes' client
        :param pod_spec: pod spec like volumes and security context
        :param namespace: The custom resource

        """
        if not kube_manager.secret_exists(secret_name, namespace):
            raise ValueError('Unable to mount credentials: Secret {}} not found in namespace {}'
                             .format(secret_name, namespace))

        # volume_mount
        volume_mount = client.V1VolumeMount(
            name=GCSCredentialVolumeName, mount_path=GCSCredentialVolumeMountPath)
        if pod_spec.containers[0].volume_mounts:
            pod_spec.containers[0].volume_mounts.append(volume_mount)
        else:
            pod_spec.containers[0].volume_mounts = [volume_mount]

        volume = client.V1Volume(
            name=GCSCredentialVolumeName,
            secret=client.V1SecretVolumeSource(secret_name=secret_name))
        if pod_spec.volumes:
            pod_spec.volumes.append(volume)
        else:
            pod_spec.volumes = [volume]

        # environment
        credential_file_path = os.path.join(GCSCredentialVolumeMountPath, GCSCredentialFileName)
        env = [
            client.V1EnvVar(name=GCSCredentialEnvKey, value=credential_file_path),
        ]

        if pod_spec.containers[0].env:
            pod_spec.containers[0].env.extend(env)
        else:
            pod_spec.containers[0].env = env

    return _add_gcp_credentials

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid

import gcs_utils
from kubeflow import fairing

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'tensorflow-mnist-gcs-job-{uuid.uuid4().hex[:4]}'

command = ["python", "tensorflow_mnist.py", "--model_path", "gs://kfp-bucket/mnist/model"]
output_map = {
    "Dockerfile": "Dockerfile",
    "tensorflow_mnist.py": "tensorflow_mnist.py"
}
fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="tensorflow-mnist",
                           dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[
                                gcs_utils.gcp_credentials(secret_name='user-gcp-sa')
                            ],
                            cleanup=False, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

GCS을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 tensorflow 를 사용합니다. storageUri 필드로 모델 저장 위치인 gs://kfp-bucket/mnist/model 를 지정해 줍니다. 그리고 serviceAccountName 필드를 이용하여, GCS 접근 권한 정보를 가지고 있는 서비스 계정을 지정해 줍니다.

mnist-gcs.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "mnist-gcs"
spec:
  default:
    predictor:
      serviceAccountName: gcs-sa
      tensorflow:
        storageUri: "gs://kfp-bucket/mnist/model"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f mnist-gcs.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME                  URL                                                                          READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
mnist-gcs             <http://mnist-gcs.admin.example.com/v1/models/mnist-gcs>                      True    100                                20s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

mnist-input.json

{
  "instances": [
    [
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0
    ],
...
  ]
}

다음은 admin 네임스페이스의 tensorflow-mnist InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=mnist-gcs
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice mnist-gcs -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./mnist-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/${MODEL_NAME}:predict -d ${INPUT_PATH}

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> POST /v1/models/mnist-gcs:predict HTTP/1.1
> Host: mnist-gcs.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 9866
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 183
< content-type: application/json
< date: Sat, 04 Apr 2020 11:43:16 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 29
< 
{
    "predictions": [[0.000134299538, 1.87837134e-06, 0.000226354547, 0.00111765205, 1.1317371e-05, 3.8724811e-05, 3.75441829e-07, 0.997255147, 3.78830955e-05, 0.00117637496]
    ]
}

KFServing – Explainer를 포함한 InferenceService

Anchors

KFServing 은 Alibi의 Anchors 알고리즘을 이용한 Explainer를 제공하고 있습니다.

앵커 알고리즘은 Ribeiro et al.의 Anchors : High-Precision Model-Agnostic Descriptions 논문을 기반으로 하고 있습니다. 이 알고리즘은 이미지, 텍스트 및 테이블 형식 데이터에 적용되는 분류 모델에 적합한 모델 독립적 (블랙 박스) 인 설명을 제공합니다. 앵커의 기본 개념은 앵커라는 고정밀 규칙을 사용하여 복잡한 모델의 동작을 설명하는 것입니다.

Alibi

Alibi는 학습한 머신 러닝 모델이 왜 그런 결정을 내었는지에 대해 설명하기 위한 오픈 소스 파이썬 라이브러리입니다. 블랙 박스인 머신 러닝 모델을 설명하고 해석할 수 있는 알고리즘을 제공합니다. 그리고 해석 가능한 머신 러닝 메소드에 대한 일관된 API를 지원하기 때문에 사용하기 편합니다. Alibi는 테이블 형식(tabular), 텍스트 그리고 이미지 데이터 분류 등 같은 다양한 사용 사례를 지원합니다.

지원하는 알고리즘

Alibi에서 지원하는 알고리즘들은 머신러닝 모델 예측에 대한 설명을 제공합니다. 모델 예측을 요청하면, 왜 이러한 예측 결과를 도출하게 되었는지에 대해 설명합니다. 다음 알고리즘은 모두 블랙 박스 모델에서 작동하기 때문에, 예측 함수(prediction function)에 접근할 수 있어야합니다.

Model explanations

사용한 Alibi 패키비의 버전은 0.2.2 입니다. KFServing에서 Alibi 서빙을 위해서 사용하는 이미지 버전이 0.2.2 이기 때문입니다. 다른 버전을 사용하고 싶다면, KFServing의 설정을 바꾸면 됩니다. Alibi 0.2.2 버전은 텐서플로우 2 버전을 지원하기 않기 때문에, 테스트를 위해서 텐서플로우 1 버전을 사용하였습니다.


Alibi Image Explainer

Alibi Image Explainer 를 사용하는 InferenceService 를 작성해 보겠습니다. Alibi Image Explainer 는 블랙 박스 설명자 알고리즘으로서, Alibi 오픈 소스 라이브러리의 Anchors 이미지 버전입니다.

Anchors 이미지는 먼저 슈퍼 픽셀로 분할되어 로컬 이미지 구조를 유지하고 있습니다다. 해석 가능한 표현은 앵커에 각 수퍼 픽셀의 존재 또는 부재로 구성됩니다. 해석 가능한 설명에 도달하려면 의미있는 수퍼 픽셀을 생성하는 것이 중요합니다. 이 알고리즘은 여러 표준 이미지 분할 알고리즘 (felzenszwalb, slic 및 quickshift)을 지원하며 사용자가 사용자 정의 분할 기능을 제공 할 수 있습니다.

이 알고리즘에 대한 자세한 내용은 Seldon Alibi 설명서를 참조하십시오.

모델 생성

Alibi Image Explainer 를 사용하기 위해서 Explainer 모델을 저장해야합니다. AnchorImage 인스턴스를 생성한 후, dill을 이용해서 저장하면 됩니다. Alibi Image Explainer 를 사용하는 InferenceService를 생성하기 위해서는 예측 모델과 설명 모델 둘 다 필요하기 때문에 같이 작성하겠습니다.

케라스의 데이터셋 중의 하나인 패션 mnist 데이터를 분류하는 모델을 작성해 보겠습니다.

먼저, 예측 모델을 만들고 훈련하기 위한 텐서플로우의 고수준 API인 tf.keras를 사용합니다. tf.saved_model.save() 메소드를 이용하여, 전체 모델을 지정한 위치에 저장합니다. 여기에는 가중치, 모델 구성 등이 포함됩니다. 모델을 저장할 때 주의해야할 점은 모델 저장 위치의 마지막 디렉토리에 모델의 버전이 포함되어야 합니다. 모델 버전은 숫자를 사용해야합니다.

def model():
    x_in = Input(shape=(28, 28, 1))
    x = Conv2D(filters=64, kernel_size=2, padding='same', activation='relu')(x_in)
    x = MaxPooling2D(pool_size=2)(x)
    x = Dropout(0.3)(x)

    x = Conv2D(filters=32, kernel_size=2, padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=2)(x)
    x = Dropout(0.3)(x)

    x = Flatten()(x)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.5)(x)
    x_out = Dense(10, activation='softmax')(x)

    cnn = Model(inputs=x_in, outputs=x_out)
    cnn.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    return cnn

생성 된 모델을 사용하여 Tensorflow 서버를 실행하고 예측을 수행 할 수 있습니다. 모델은 PV, S3 호환 가능 개체 저장소, Azure Blob 저장소 또는 Google Cloud Storage에 있을 수 있습니다.

그 다음, AnchorImage 인스턴스를 생성합니다. 모델의 예측 함수와 이미지 크기를 지정해 줍니다. 수퍼 픽셀은 표준 이미지 분할 알고리즘인 slic를 사용하여였습니다. 생성한 인스턴스를 dill을 이용해서 지정한 위치에 저장합니다.

predict_fn = lambda x: model.predict(x)

image_shape = (28, 28, 1)
segmentation_fn = 'slic'
explainer = AnchorImage(predict_fn, image_shape=image_shape, segmentation_fn=segmentation_fn)
explainer.predict_fn = None  # Clear explainer predict_fn as its a lambda and will be reset when loaded
with open(explainer_export_path, 'wb') as f:
    dill.dump(explainer, f)

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 kfserving-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

패션 mnist 이미지를 분류하는 모델입니다. 예측 모델을 저장할 위치를 --model_path 파라미터로 explainer 모델의 저장할 위치를 --explainer_path 파라미터로 입력받게 하였습니다.

tensorflow_fashion_mnist.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os

import dill
import numpy as np
import tensorflow as tf
from alibi.explainers import AnchorImage
from tensorflow.keras.layers import Conv2D, Dense, Dropout, Flatten, MaxPooling2D, Input
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical

print("TensorFlow version: ", tf.__version__)

parser = argparse.ArgumentParser()
parser.add_argument('--model_path', default='/mnt/pv/tensorflow/fashion_mnist/model', type=str)
parser.add_argument('--explainer_path', default='/mnt/pv/tensorflow/fashion_mnist/explainer', type=str)

args = parser.parse_args()

version = 1
export_path = os.path.join(args.model_path, str(version))

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
print('x_train shape:', x_train.shape, 'y_train shape:', y_train.shape)

x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255
x_train = np.reshape(x_train, x_train.shape + (1,))
x_test = np.reshape(x_test, x_test.shape + (1,))
print('x_train shape:', x_train.shape, 'x_test shape:', x_test.shape)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
print('y_train shape:', y_train.shape, 'y_test shape:', y_test.shape)


def model():
    x_in = Input(shape=(28, 28, 1))
    x = Conv2D(filters=64, kernel_size=2, padding='same', activation='relu')(x_in)
    x = MaxPooling2D(pool_size=2)(x)
    x = Dropout(0.3)(x)

    x = Conv2D(filters=32, kernel_size=2, padding='same', activation='relu')(x)
    x = MaxPooling2D(pool_size=2)(x)
    x = Dropout(0.3)(x)

    x = Flatten()(x)
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.5)(x)
    x_out = Dense(10, activation='softmax')(x)

    cnn = Model(inputs=x_in, outputs=x_out)
    cnn.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    return cnn


cnn = model()
cnn.summary()

cnn.fit(x_train, y_train, batch_size=64, epochs=3)

# Evaluate the model on test set
score = cnn.evaluate(x_test, y_test, verbose=0)
print('Test accuracy: ', score[1])

tf.saved_model.save(cnn, export_path)
print('Saved model to {}'.format(export_path))

predict_fn = lambda x: cnn.predict(x)

image_shape = (28, 28, 1)
segmentation_fn = 'slic'
kwargs = {'n_segments': 15, 'compactness': 20, 'sigma': .5}
explainer = AnchorImage(predict_fn, image_shape, segmentation_fn=segmentation_fn, segmentation_kwargs=kwargs)


explainer.predict_fn = None  # Clear explainer predict_fn as its a lambda and will be reset when loaded
explainer_export_path = os.path.join(args.explainer_path, 'explainer.dill')
if not (os.path.isdir(os.path.dirname(explainer_export_path))):
    os.makedirs(os.path.dirname(explainer_export_path))
with open(explainer_export_path, 'wb') as f:
    dill.dump(explainer, f)
print('Saved explainer to {}'.format(explainer_export_path))

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우 1.15 를 기본 이미지로 사용합니다. 사용한 Alibi 패키비의 버전은 0.3.2 입니다. KFServing에서 Alibi 서빙을 위해서 사용하는 이미지 버전이 0.3.x 이기 때문입니다. 다른 버전을 사용하고 싶다면, KFServing의 설정을 바꾸면 됩니다. Alibi 0.3.2 버전은 텐서플로우 2 버전을 지원하기 않기 때문에, 텐서플로우 1.15 버전을 사용하였습니다.

Dockerfile

FROM tensorflow/tensorflow:1.15.2-py3

RUN pip install alibi==0.3.2
RUN pip install numpy joblib dill

RUN mkdir -p /app
ADD tensorflow_fashion_mnist.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid

from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'tensorflow-fashion-mnist-job-{uuid.uuid4().hex[:4]}'

command = ["python", "tensorflow_fashion_mnist.py", "--model_path", "/mnt/pv/tensorflow/fashion-mnist/model",
           "--explainer_path", "/mnt/pv/tensorflow/fashion-mnist/explainer"]
output_map = {
    "Dockerfile": "Dockerfile",
    "tensorflow_fashion_mnist.py": "tensorflow_fashion_mnist.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="tensorflow-fashion-mnist",
                           dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[
                                k8s_utils.mounting_pvc(pvc_name='kfserving-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=True, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

Tensorflow을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다.

predictor로 tensorflow 를 사용합니다. storageUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 kfserving-models-pvc 이고 저장 위치가 tensorflow/fashion-mnist/model 이므로, pvc://kfserving-models-pvc/tensorflow/fashion-mnist/model 라고 지정해 줍니다.

explainer로 alibi 를 사용합니다. typeAnchorImages 로 지정합니다. 그리고 storageUri 필드로 Explainer 의 모델 저장 위치인 pvc://kfserving-models-pvc/tensorflow/fashion-mnist/explainer 를 지정해 줍니다.

alibi-fashion-mnist.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "alibi-fashion-mnist"
spec:
  default:
    predictor:
      tensorflow:
        storageUri: "pvc://kfserving-models-pvc/tensorflow/fashion-mnist/model"
    explainer:
      alibi:
        type: AnchorImages
        storageUri: "pvc://kfserving-models-pvc/tensorflow/fashion-mnist/explainer"
        config:
          batch_size: "25"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f alibi-fashion-mnist.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME                  URL                                                                          READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
alibi-fashion-mnist   <http://alibi-fashion-mnist.admin.example.com/v1/models/alibi-fashion-mnist>   True    100                                2m

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

fashion-mnist-input.json

{
  "instances": [
    [
    
...
    ]
  ]
}

다음은 admin 네임스페이스의 tensorflow-mnist InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=alibi-fashion-mnist
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice alibi-fashion-mnist -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./fashion-mnist-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/${MODEL_NAME}:predict -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> POST /v1/models/alibi-fashion-mnist:predict HTTP/1.1
> Host: alibi-fashion-mnist.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 29557
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 181
< content-type: application/json
< date: Sat, 04 Apr 2020 12:10:22 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 8701
< 
{
    "predictions": [[3.36351741e-06, 6.70785425e-07, 8.033673e-07, 5.26102497e-07, 3.26495325e-07, 0.00148782798, 6.07064408e-07, 0.0196282454, 4.8258451e-05, 0.978829265]
    ]
* Connection #0 to host 192.168.21.38 left intact
}
* Closing connection 0

설명 실행하기

설명을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

설명을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

fashion-mnist-input.json

{
  "instances": [
    [
    
...
    ]
  ]
}

다음은 admin 네임스페이스의 tensorflow-mnist InferenceService 에 설명을 요청하는 예제입니다.

MODEL_NAME=alibi-fashion-mnist
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice alibi-fashion-mnist -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./fashion-mnist-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/${MODEL_NAME}:explain -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> POST /v1/models/alibi-fashion-mnist:explain HTTP/1.1
> Host: alibi-fashion-mnist.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 29557
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 101253
< content-type: text/html; charset=UTF-8
< date: Sat, 04 Apr 2020 10:51:23 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 10601
<
< 
{
  "anchor": [...],
  "segments": [...],
  "precision": 1.0,
  "coverage": 0.5106,
  "raw": {
    "feature": [
      13
    ],
    "mean": [
      1.0
    ],
    "precision": [
      1.0
    ],
    "coverage": [
      0.5106
    ],
    "examples": [
      {
        "covered": [...],
        "covered_true": [...],
        "covered_false": [],
        "uncovered_true": [],
        "uncovered_false": []
      }
    ],
    "all_precision": 0,
    "num_preds": 250026,
    "instance": [...],
    "prediction": 9
  },
  "meta": {
    "name": "AnchorImage"
  }
}
* Closing connection 0

설명 확인하기

설명 요청의 응답 결과를 이미지로 확인하기 위해서 테스트 코드를 작성해 보겠습니다.

test_fashion_mnist.py

import argparse
import json
import os
import ssl

import matplotlib.pyplot as plt
import numpy as np
import requests
import tensorflow as tf

ssl._create_default_https_context = ssl._create_unverified_context

HOST = 'alibi-fashion-mnist.admin.example.com'
PREDICT_TEMPLATE = 'http://{0}/v1/models/alibi-fashion-mnist:predict'
EXPLAIN_TEMPLATE = 'http://{0}/v1/models/alibi-fashion-mnist:explain'


def get_image_data(idx):
    (_, _), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()

    return x_test[idx]


def preprocess_input(input):
    data = input.astype('float32') / 255
    data = np.reshape(data, data.shape + (1,))
    data = data.reshape(1, 28, 28, 1)
    return data


CLASSES = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']


def decode_predictions(predictions):
    labels = np.argmax(predictions, axis=1)
    return CLASSES[labels[0]]


def predict(cluster_ip):
    image_data = get_image_data(0)
    data = preprocess_input(image_data)
    payload = json.dumps({"instances": data.tolist()})

    headers = {'Host': HOST}
    url = PREDICT_TEMPLATE.format(cluster_ip)
    print("Calling ", url)
    r = requests.post(url, data=payload, headers=headers)
    resp_json = json.loads(r.content.decode('utf-8'))
    preds = np.array(resp_json["predictions"])
    label = decode_predictions(preds)

    plt.imshow(image_data)
    plt.title(label)
    plt.show()


def explain(cluster_ip):
    image_data = get_image_data(1)
    data = preprocess_input(image_data)
    payload = json.dumps({"instances": data.tolist()})

    headers = {'Host': HOST}
    url = EXPLAIN_TEMPLATE.format(cluster_ip)
    print("Calling ", url)
    r = requests.post(url, data=payload, headers=headers)
    if r.status_code == 200:
        explanation = json.loads(r.content.decode('utf-8'))

        exp_arr = np.array(explanation['anchor'])

        f, axarr = plt.subplots(1, 2)
        axarr[0].imshow(image_data)
        axarr[1].imshow(exp_arr[:, :, 0])
        plt.show()
    else:
        print("Received response code and content", r.status_code, r.content)


parser = argparse.ArgumentParser()
parser.add_argument('--cluster_ip', default=os.environ.get("CLUSTER_IP"), help='Address of istio-ingress-gateway')
parser.add_argument('--op', choices=["predict", "explain"], default="predict",
                    help='Operation to run')
args, _ = parser.parse_known_args()

if __name__ == "__main__":
    if args.op == "predict":
        predict(args.cluster_ip)
    elif args.op == "explain":
        explain(args.cluster_ip)

다음 명령어를 실행하면, 설명을 요청할 수 있습니다.

python test_fashion_mnist.py --luster_ip ${CLUSTER_IP} --op explain

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

KFServing – 사용자 Predictor 를 이용한 InferenceService 배포와 예측

KFServing 은 사용자가 만든 이미지를 이용해서 InferenceService 를 생성할 수 있는 기능을 제공합니다. 이 기능을 이용하면, 사용자는 자신이 만든 모델을 컨테이너 내부로 가져와서 KFServing에서 제공 할 수 있도록 할 수 있습니다.

사용자 이미지를 이용하여 InferenceService 배포와 예측

모델 서버 만들기

웹 서버 코드 작성하기

파이썬의 플라스크를 이용하여 웹 서버를 작성합니다. 예측 엔드포인트인 /v1/models/custom-sample:predict 로 요청을 받아서 “Hello Worl”를 응답으로 반환합니다.

app.py

import os

from flask import Flask

app = Flask(__name__)


@app.route('/v1/models/custom-sample:predict')
def hello_world():
    greeting_target = os.environ.get('GREETING_TARGET', 'World')
    return 'Hello {}!\\n'.format(greeting_target)


if __name__ == "__main__":
    app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, flaskgunicorn 패키지를 추가로 설치합니다.

Dockerfile

FROM python:3.7-slim

ENV APP_HOME /app
WORKDIR $APP_HOME
COPY app.py ./

RUN pip install flask==1.1.1 gunicorn==20.0.4

CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 app:app

다음 명령어를 실행하면 kangwoo/mxnet:cpu 라는 이름의 컨테이너 이미지를 빌드 할 수 있습니다.

docker build -t kangwoo/kfserving-custom-hello:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 레지스트리로 업로드 하겠습니다.

docker push kangwoo/kfserving-custom-hello:0.0.1

custom을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 custom 을 사용합니다. container 섹션에서 사용자 이미지와 환경 변수 지정해 줍니다. containerPort 필드를 이용하여, 컨테이너에서 사용하는 포트도 지정해 줄 수 있습니다. 컨테이너 포드를 별도로 지정하지 않았을 경우에는, 기본값인 8080 포트를 사용합니다.

custom-hello.yaml

apiVersion: serving.kubeflow.org/v1alpha2
kind: InferenceService
metadata:
  name: custom-hello
spec:
  default:
    predictor:
      custom:
        container:
          image: kangwoo/kfserving-custom-hello:0.0.1
          ports:
            - containerPort: 8080
          env:
            - name: GREETING_TARGET
              value: "Python KFServing Sample"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f custom-hello.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME               URL                                                                    READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
custom-hello       <http://custom-hello.admin.example.com/v1/models/custom-hello>           True    100                                52s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

다음은 admin 네임스페이스의 custom-hello InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=custom-hello
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice custom-hello -o jsonpath='{.status.url}' | cut -d "/" -f 3)

curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/$MODEL_NAME:predict

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> GET /v1/models/custom-hello:predict HTTP/1.1
> Host: custom-hello.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< content-length: 31
< content-type: text/html; charset=utf-8
< date: Sat, 04 Apr 2020 15:32:16 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 7650
< 
Hello Python KFServing Sample!
* Connection #0 to host 192.168.21.38 left intact
* Closing connection 0

사용자 모델을 이용하여 InferenceService 배포와 예측

토네이도 웹 서버를 사용하는 kfserving.KFModel 을 확장하여 사용자 이미지를 생성해 보겠습니다. KFModel을 확장하여 모델을 만드는 경우에는 엔드포인트를 직접 구현할 필요 없이, 사용할 메소드만 구현하면 됩니다. 예를 들어 예측을 처리할 경우 predict() 메소드를 구현하면 됩니다. kfserving.KFModel 은 KFServing SDK에 포함되어 있습니다.

모델 서버 만들기

모델 코드 작성하기

kfserving.KFModel 확장하여 모델 코드를 작성합니다. 앞서 구현한 파이토치를 이용해서 cifar10 데이터를 분류하는 모델을 재사용하겠습니다. 모델은 Net 클래스로 정의하고, 학습한 파라미터 값은 저장된 model.pt 파일로부터 읽어 옵니다. mdoel.pt 파일을 생성하고 싶으면, “KFServing InferenceService 배포와 예측 – PyTorch“를 참고 하시길 바랍니다.

model.py

import base64
import io
from typing import Dict

import kfserving
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from PIL import Image


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


class KFServingSampleModel(kfserving.KFModel):
    def __init__(self, name: str):
        super().__init__(name)
        self.name = name
        self.ready = False

    def load(self):
        self.classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

        net = Net()
        state_dict = torch.load("model.pt")
        net.load_state_dict(state_dict)
        net.eval()
        self.model = net

        self.ready = True

    def predict(self, request: Dict) -> Dict:
        inputs = request["instances"]

        data = inputs[0]["image"]["b64"]

        raw_img_data = base64.b64decode(data)
        input_image = Image.open(io.BytesIO(raw_img_data))

        preprocess = transforms.Compose([
            transforms.Resize(32),
            transforms.CenterCrop(32),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

        input_tensor = preprocess(input_image)
        input_batch = input_tensor.unsqueeze(0)

        output = self.model(input_batch)

        scores = torch.nn.functional.softmax(output, dim=1)[0]

        _, top_3 = torch.topk(output, 3)

        results = {}
        for idx in top_3[0]:
            results[self.classes[idx]] = scores[idx].item()

        return {"predictions": results}


if __name__ == "__main__":
    model = KFServingSampleModel("custom-model")
    model.load()
    kfserving.KFServer(workers=1).start([model])

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, torchtorchvision 그리고 kfserving 패키지를 추가로 설치합니다.

Dockerfile

FROM python:3.6-slim

RUN pip install torch torchvision
RUn pip install kfserving==0.3.0

ENV APP_HOME /app
WORKDIR $APP_HOME
ADD model.py /app/
ADD model.pt /app/

CMD ["python", "model.py"]

다음 명령어를 실행하면 kangwoo/mxnet:cpu 라는 이름의 컨테이너 이미지를 빌드 할 수 있습니다.

docker build -t kangwoo/kfserving-custom-model:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 레지스트리로 업로드 하겠습니다.

docker push kangwoo/kfserving-custom-model:0.0.1

custom을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 custom 을 사용합니다. container 필드로 사용자 이미지를 지정해 줍니다.장 위치를 지정해 줍니다.

custom.yaml

apiVersion: serving.kubeflow.org/v1alpha2
kind: InferenceService
metadata:
  name: custom-hello
spec:
  default:
    predictor:
      custom:
        container:
          image: kangwoo/kfserving-hello:0.0.1
          env:
            - name: GREETING_TARGET
              value: "Python KFServing Sample"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f custom.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME               URL                                                                    READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
custom-hello       <http://custom-hello.admin.example.com/v1/models/custom-hello>           True    100                                52s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

cifar10-input.json

{
  "instances": [
    {
      "image": {
        "b64": "..."
      }
    }
  ]
}

다음은 admin 네임스페이스의 custom-hello InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=custom-model
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice custom-model -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./cifar10-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> POST /v1/models/custom-model:predict HTTP/1.1
> Host: custom-model.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 2611
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 108
< content-type: application/json; charset=UTF-8
< date: Sat, 04 Apr 2020 07:15:56 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 7492
< 
* Connection #0 to host 192.168.21.38 left intact
{"predictions": {"airplane": 0.8815429210662842, "ship": 0.09650199115276337, "truck": 0.01203653123229742}}
* Closing connection 0

KFServing – Transformer를 포함한 InferenceService 배포

대부분의 모델 서버는 텐서를 입력 데이터로 이용합니다. 다시 말해서, 모델을 학습할 때 사용한 데이터의 형태로 이용하는 것입니다. 이 데이터들은 효율적인 학습을 위해서 사전 처리 단계를 거칩니다. 그래서 원시 데이터와는 다릅니다. 예를 들어서 원시 데이터를 이미지로 사용한다고 가정하면, 학습을 위해서 이미지의 크기를 조절하고, 각 필셀 값의 범위를 조정합니다. 그래서 모델 서버에 예측을 요청할 때도 사전 처리를 거친 데이터를 입력해야만 정상적인 결과가 나오게 되는 것입니다. 즉 사용자가 원시 입력 형식으로 전송하는 경우 예측 요청을 하기 전에 사전 처리 단계가 필요합니다. 이럴 경우 Transformer를 사용할 수 있습니다.

Transformer를 이용하면, 사전/사후 처리 코드를 사용자가 구현할 수 있습니다. 앞서 구현한 파이토치 예제에서는 텐서를 입력 받아서 예측을 하였습니다. 이 예제에 전처리 단계를 추가한 Transformer를 사용하여, 사용자가 원시 이미지 데이터를 보내 예측을 할 수 있도록 하겠습니다.

Transformer 이미지 생성하기

사전/사후 처리를 할 Transformer 이미지를 생성해 보겠습니다.

사전/사후 처리 코드 작성하기

먼저 사전 처리에서 사용할 image_transform() 메소드를 구현합니다. 사용자가 보내온 BASE64 로 인코딩된 원시 이미지 데이터를 텐서로 변환합니다. transforms.Compose()를 이용하여 이미지 크기도 조절하고, 정규화도 시킵니다.

사후 처리에서 사용할 top_5() 메소드를 구현합니다. 이미지 분류값을 사람이 보기 쉽게 레이블을 붙이주고, TOP 5 로 정리해서 보여줍니다.

kfserving.KFModel 를 상속 받아서 preprocess() 와 postprocess() 메소드를 구현해 줍니다.

image_transformer.py

import argparse
import base64
import io
import logging
from typing import Dict

import kfserving
import numpy as np
import torch
import torchvision.transforms as transforms
from PIL import Image

logging.basicConfig(level=kfserving.constants.KFSERVING_LOGLEVEL)

transform = transforms.Compose([
    transforms.Resize(32),
    transforms.CenterCrop(32),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])


def image_transform(instance):
    byte_array = base64.b64decode(instance['image']['b64'])
    image = Image.open(io.BytesIO(byte_array))
    im = Image.fromarray(np.asarray(image))
    res = transform(im)
    logging.info(res)
    return res.tolist()


CLASSES = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']


def top_5(prediction):
    pred = torch.as_tensor(prediction)
    scores = torch.nn.functional.softmax(pred, dim=0)

    _, top_5 = torch.topk(pred, 5)

    results = {}
    for idx in top_5:
        results[CLASSES[idx]] = scores[idx].item()

    return results


class ImageTransformer(kfserving.KFModel):
    def __init__(self, name: str, predictor_host: str):
        super().__init__(name)
        self.predictor_host = predictor_host

    def preprocess(self, inputs: Dict) -> Dict:
        return {'instances': [image_transform(instance) for instance in inputs['instances']]}

    def postprocess(self, inputs: Dict) -> Dict:
        return {'predictions': [top_5(prediction) for prediction in inputs['predictions']]}


if __name__ == "__main__":
    DEFAULT_MODEL_NAME = "model"

    parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser])
    parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME,
                        help='The name that the model is served under.')
    parser.add_argument('--predictor_host', help='The URL for the model predict function', required=True)

    args, _ = parser.parse_known_args()

    transformer = ImageTransformer(args.model_name, predictor_host=args.predictor_host)
    kfserver = kfserving.KFServer()
    kfserver.start(models=[transformer])

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, torchtorchvision 패키지를 추가로 설치합니다.

ENTRYPOINT를 이용하여 image_transformer.py 를 실행합니다. Transformer 컨테이너가 실행 될때 --model_name--predictor_host 같은 인자 값들이 넘어오기 때문에, 꼭 ENTRYPOINT를 사용해야합니다.

Dockerfile

FROM python:3.6-slim

RUN pip install torch torchvision
RUn pip install kfserving==0.3.0 image numpy

ENV APP_HOME /app
WORKDIR $APP_HOME
ADD image_transformer.py /app/

ENTRYPOINT ["python", "image_transformer.py"]

다음 명령어를 실행하면 kangwoo/kfserving-transformer:0.0.1 라는 이름의 컨테이너 이미지를 빌드 할 수 있습니다.

docker build -t kangwoo/kfserving-transformer:0.0.1 .

빌드한 컨테이너 이미지를 컨테이너 레지스트리로 업로드 하겠습니다.

docker push kangwoo/kfserving-transformer:0.0.1

Transoformer를 포함한 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. 예측을 위한 predictor 는 파이토치에서 사용한 것을 그대로 재사용하겠습니다. transformer 필드를 이용하여서 생성한 Transformer 이미지를 지정해 줍니다.

transformer.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "pytorch-cifar10-transformer"
spec:
  default:
    predictor:
      pytorch:
        storageUri: "pvc://kfserving-models-pvc/models/pytorch/cifar10/"
        modelClassName: "Net"
    transformer:
      custom:
        container:
          image: kangwoo/kfserving-transformer:0.0.1

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f transformer.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME                          URL                                                                                          READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
pytorch-cifar10-transformer   <http://pytorch-cifar10-transformer.admin.example.com/v1/models/pytorch-cifar10-transformer>   True    100                                21s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

input.json

{
  "instances": [
    {
      "image": {
        "b64": "..."
      }
    }
  ]
}

b64 필드에는 BASE64로 인코딩된 이미지가 들어 있습니다. 만약 다른 이미지로 테스트해 보고 싶다면, 다음 코드를 참조하여 값을 변경하면 됩니다.

import base64
with open("airplane.jpg", "rb") as image_file:
    encoded_string = base64.b64encode(image_file.read())

print(encoded_string.decode())

다음은 admin 네임스페이스의 pytorch-cifar10-transformer InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=pytorch-cifar10-transformer
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice pytorch-cifar10-transformer -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying 192.168.21.38...
* TCP_NODELAY set
* Connected to 192.168.21.38 (192.168.21.38) port 32380 (#0)
> POST /v1/models/pytorch-cifar10-transformer:predict HTTP/1.1
> Host: pytorch-cifar10-transformer.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 4551
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 176
< content-type: application/json; charset=UTF-8
< date: Sat, 04 Apr 2020 09:43:33 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 45
< 
* Connection #0 to host 192.168.21.38 left intact
{"predictions": [{"airplane": 0.9271695017814636, "ship": 0.06180185452103615, "bird": 0.004641484934836626, "deer": 0.003963686991482973, "automobile": 0.000884002773091197}]}
* Closing connection 0

KFServing InferenceService 배포와 예측 – NVIDIA Triton Inference Server

Triton은 이전에 TensorRT Inference Server로 알려져 있습니다.

이번에는 모델을 직접 생성하지 않고, 기존에 만들어진 모델을 사용하겠습니다.

TensorRT을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 tensorrt 를 사용합니다. storageUri 필드로 모델 저장 위치를 지정해 줍니다.

tensorrt.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "tensorrt-simple-string"
spec:
  default:
    predictor:
      tensorrt:
        storageUri: "gs://kfserving-samples/models/tensorrt"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f tensorrt.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME                     URL                                                                                READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
tensorrt-simple-string   <http://tensorrt-simple-string.admin.example.com/v1/models/tensorrt-simple-string>   True    100                                33s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

https://docs.nvidia.com/deeplearning/sdk/tensorrt-inference-server-guide/docs/client.html#section-client-api 에 나와 있는 클라이언트를 사용해서 요청을 하겠습니다. tensorrtserver_client 컨테이너를 실행시킨 후, 컨테이너 안에서 클라이언트는 사용합니다. 그래서 컨테이너 안에서 CLUSTER_IP 로 접근이 가능해야 합니다.

/etc/hosts 파일을 편집해서 CLUSTER_IP 를 tensorrt-simple-string.default.example.com 과 맵핑시킵니다.

클라이언트를 사용하기 위해서, 컨테이너를 실행시킵니다.

docker run -it --rm --net=host kcorer/tensorrtserver_client:19.05

컨테이너 안에서 다음 명령어를 실행시킵니다.

root@trantor:/workspace# ./build/simple_string_client -u tensorrt-simple-string.default.example.com

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

0 + 1 = 1
0 - 1 = -1
1 + 1 = 2
1 - 1 = 0
2 + 1 = 3
2 - 1 = 1
3 + 1 = 4
3 - 1 = 2
4 + 1 = 5
4 - 1 = 3
5 + 1 = 6
5 - 1 = 4
6 + 1 = 7
6 - 1 = 5
7 + 1 = 8
7 - 1 = 6
8 + 1 = 9
8 - 1 = 7
9 + 1 = 10
9 - 1 = 8

KFServing InferenceService 배포와 예측 – Tensorflow

Tensorflow를 사용하는 InferenceService

학습이 완료된 Tensorflow 모델을 저장 한 경우, KFServing은 TensorFlow Serving을 사용하여 모델 서버를 배포 해 줍니다.

모델 생성

Tensorflow 서버를 테스트 하려면 먼저 파이썬을 사용하여 간단한 Tensorflow 모델을 생성해야 합니다.

텐서플로우 모델을 만들고 훈련하기 위한 고수준 API인 tf.keras를 사용합니다. 모델의 save() 메소드를 이용하여, 전체 모델을 지정한 위치에 저장합니다. 여기에는 가중치, 모델 구성 등이 포함됩니다. 모델을 저장할 때 주의해야할 점은 모델 저장 위치의 마지막 디렉토리에 모델의 버전이 포함되어야 합니다. 모델 버전은 숫자를 사용해야합니다.

케라스의 데이터셋 중의 하나인 mnist 데이터를 분류하는 모델을 작성해 보겠습니다.

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os

import tensorflow as tf


def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/models/tensorflow/mnist', type=str)
    args = parser.parse_args()

    version = 1
    export_path = os.path.join(args.model_path, str(version))

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_split=0.2)

    print('\\nEvaluate on test data')
    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)

    model.save(export_path)
    print('"Saved model to {}'.format(export_path))


if __name__ == '__main__':
    train()

생성 된 모델을 사용하여 Tensorflow 서버를 실행하고 예측을 수행 할 수 있습니다. 모델은 PV, S3 호환 가능 개체 저장소, Azure Blob 저장소 또는 Google Cloud Storage에 있을 수 있습니다.

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 kfserving-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

mnist 이미지를 분류하는 모델입니다. 모델을 저장할 위치를 --model_path 파라미터로 입력받게 하였습니다.

tensorflow_mnist.py

from __future__ import absolute_import, division, print_function, unicode_literals

import argparse
import os

import tensorflow as tf


def train():
    print("TensorFlow version: ", tf.__version__)

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/models/tensorflow/mnist', type=str)
    args = parser.parse_args()

    version = 1
    export_path = os.path.join(args.model_path, str(version))

    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    print("Training...")
    training_history = model.fit(x_train, y_train, batch_size=64, epochs=10,
                                 validation_split=0.2)

    print('\\nEvaluate on test data')
    results = model.evaluate(x_test, y_test, batch_size=128)
    print('test loss, test acc:', results)

    model.save(export_path)
    print('"Saved model to {}'.format(export_path))


if __name__ == '__main__':
    train()

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 텐서플로우를 기본 이미지로 사용합니다.

Dockerfile

FROM tensorflow/tensorflow:2.1.0-py3

RUN mkdir -p /app
ADD tensorflow_mnist.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid

from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'tensorflow-mnist-job-{uuid.uuid4().hex[:4]}'

command = ["python", "tensorflow_mnist.py", "--model_path", "/mnt/pv/models/tensorflow/mnist"]
output_map = {
    "Dockerfile": "Dockerfile",
    "tensorflow_mnist.py": "tensorflow_mnist.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="tensorflow-mnist",
                           dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[
                                k8s_utils.mounting_pvc(pvc_name='kfserving-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=True, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

Tensorflow을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 tensorflow 를 사용합니다. storageUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 kfserving-models-pvc 이고 저장 위치가 models/tensorflow/mnist/ 이므로, pvc://kfserving-models-pvc/models/tensorflow/mnist/ 라고 지정해 줍니다.

tensorflow.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "tensorflow-mnist"
spec:
  default:
    predictor:
      tensorflow:
        storageUri: "pvc://kfserving-models-pvc/models/tensorflow/mnist/"

만약 GPU 리소스를 사용하려고 한다면, resource 필드에 GPU 를 할당해 주면됩니다.

tensorflow_gpu.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "tensorflow-mnist"
spec:
  default:
    predictor:
      tensorflow:
        storageUri: "pvc://kfserving-models-pvc/models/tensorflow/mnist/"
        resources:
          limits:
            cpu: 100m
            memory: 1Gi
            nvidia.com/gpu: "1"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f tensorflow.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME               URL                                                                    READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
tensorflow-mnist   <http://tensorflow-mnist.admin.example.com/v1/models/tensorflow-mnist>   True    100                                70s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

mnist-input.json

{
  "instances": [
    [
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0,
      0.0
    ],
...
  ]
}

다음은 admin 네임스페이스의 tensorflow-mnist InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=tensorflow-mnist
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice tensorflow-mnist -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./mnist-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /v1/models/tensorflow-mnist:predict HTTP/1.1
> Host: tensorflow-mnist.admin.example.com
> User-Agent: curl/7.64.1
> Accept: */*
> Content-Length: 9866
> Content-Type: application/x-www-form-urlencoded
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 185
< content-type: application/json
< date: Sat, 04 Apr 2020 06:38:13 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 8726
< 
{
    "predictions": [[0.000235743544, 1.21317851e-06, 0.000527939294, 0.00202324125, 1.88633135e-06, 3.09452735e-05, 2.20991225e-07, 0.995925665, 2.64503233e-05, 0.00122672203]
    ]
}

KFServing InferenceService 배포와 예측 – ONNX Model with ONNX Runtime

ONNX를 사용하는 InferenceService

학습이 완료된 ONNX 모델을 저장 한 경우, KFServing에서 제공하는 ONNX 서버를 사용하여 간단히 배포 할 수 있습니다.

모델 생성

ONNX 서버를 테스트하려면 먼저 ONNX 모델을 생성해야 합니다.

파이토치로 학습한 모델을 ONNX 모델로 저장하겠습니다. 파이토치는 동적 그래프를 사용하므로, ONNX 로 export 할때 신경망 계산을 실제로 한 번 실행해야 합니다. 이를 위해서 실제 이미지 데이터 대신, 차원이 동일한 더미 데이터를 사용할 수도 있습니다.

앞서 PyTorch 에서 저장한 모델을 읽어와서 ONNX 모델로 저장하는코드를 작성해 보겠습니다. 파이토치의 모델 저장 경로는 /mnt/pv/models/pytorch/cifar10/model.pt 입니다.

import argparse
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.onnx as onnx


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/models/onnx/cifar10', type=str)
    args = parser.parse_args()

    model_path = args.model_path
    if not (os.path.isdir(model_path)):
        os.makedirs(model_path)

    model_file = os.path.join(model_path, 'model.onnx')

    net = Net()
    state_dict = torch.load("/mnt/pv/models/pytorch/cifar10/model.pt")
    net.load_state_dict(state_dict)
    net.eval()

    x = torch.empty(1, 3, 32, 32)
    torch.onnx.export(net, x, model_file)

생성 된 모델을 사용하여 ONNX 서버를 실행하고 예측을 수행 할 수 있습니다. 모델은 PV, S3 호환 가능 개체 저장소, Azure Blob 저장소 또는 Google Cloud Storage에 있을 수 있습니다.

모델 저장하기

쿠버네티스의 퍼시스턴스 볼륨에 모델을 저장해 보겠습니다. PVC 는 앞서 생성한 kfserving-models-pvc 을 사용하겠습니다. 모델을 학습시키기 위해서 쿠버네티스 잡(Job)을 사용하겠습니다. Job을 생성할 때 모델을 저장하기 위한 PVC를 마운트 해줍니다.

모델 코드 작성하기

cifa10 이미지를 분류하는 모델입니다. 모델을 저장할 위치를 --model_path 파라미터로 입력받게 하였습니다.

먼저 학습이 완료된 모델을 불러옵니다. 모델을 변환하기 전에 모델을 추론 모드로 바꾸기 위해서 torch_model.eval() 을 호출합니다. 모델을 변환하기 위해서는 torch.onnx.export() 함수를 호출합니다. 이 함수는 모델을 실행하여 어떤 연산자들이 출력값을 계산하는데 사용되었는지를 기록해줍니다. export 함수가 모델을 실행하기 때문에, 직접 텐서를 입력값으로 넘겨주어야 합니다. 이 텐서의 값은 알맞은 자료형과 모양이라면 더미 데이터를 사용해도 상관없습니다.

ipytorch_cifar10.py

import argparse
import os

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.onnx as onnx


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 5 * 5)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', default='/mnt/pv/models/onnx/cifar10', type=str)
    args = parser.parse_args()

    model_path = args.model_path
    if not (os.path.isdir(model_path)):
        os.makedirs(model_path)

    model_file = os.path.join(model_path, 'model.onnx')

    net = Net()
    state_dict = torch.load("/mnt/pv/models/pytorch/cifar10/model.pt")
    net.load_state_dict(state_dict)
    net.eval()

    x = torch.empty(1, 3, 32, 32)
    torch.onnx.export(net, x, model_file, input_names=['input1'], output_names=['output1'])

컨테이너 이미지를 만들기

컨테이너 이미지를 만들기 위한 Dockerfile 입니다. 파이썬을 기본 이미지로 사용하고, torchtorchvision 패키지를 추가로 설치합니다.

Dockerfile

FROM python:3.6-slim

RUN pip install torch torchvision

RUN mkdir -p /app
ADD onnx_cifar10.py /app/

쿠버네티스 잡 실행하기

컨테이너 이미지를 빌드하고, 컨테이너 이미지 레지스트리에 푸시 한 다음, 쿠버네티스 잡(Job)을 생성하겠습니다.

Job을 생성할 때는 모델을 저장하기 위해서 PVC를 마운트 해줍니다. 이 일련의 작업들은 직접 실행 할 수 있습니다. 하지만 좀 더 편하게 하기 위해서 앞서 배운 Kubeflow Fairing을 사용하겠습니다.

다음은 로컬 개발 환경에서 Fairing을 사용하여 컨테이너 이미지를 만들고, 쿠버네티스 잡을 실행하는 예제입니다.

fairing-local-docker.py

import uuid

from kubeflow import fairing
from kubeflow.fairing.kubernetes import utils as k8s_utils

CONTAINER_REGISTRY = 'kangwoo'

namespace = 'admin'
job_name = f'onnx-cifar10-job-{uuid.uuid4().hex[:4]}'

command = ["python", "onnx_cifar10.py", "--model_path", "/mnt/pv/models/onnx/cifar10"]
output_map = {
    "Dockerfile": "Dockerfile",
    "onnx_cifar10.py": "onnx_cifar10.py"
}

fairing.config.set_preprocessor('python', command=command, path_prefix="/app", output_map=output_map)

fairing.config.set_builder('docker', registry=CONTAINER_REGISTRY, image_name="onnx-cifar10",
                           dockerfile_path="Dockerfile")

fairing.config.set_deployer('job', namespace=namespace, job_name=job_name,
                            pod_spec_mutators=[
                                k8s_utils.mounting_pvc(pvc_name='kfserving-models-pvc', pvc_mount_path='/mnt/pv')],
                            cleanup=True, stream_log=True)

fairing.config.run()

fairing을 실행하면 쿠버네티스 잡이 생성되고, 학습이 완료된 모델이 지정한 경로에 저장됩니다.

PyTorch을 사용하는 InferenceService 로 예측 하기

InferenceService 생성

InferenceService 매니페스트를 작성합니다. predictor로 pytorch 을 사용합니다. storageUri 필드로 모델 저장 위치를 지정해 줍니다. pvc 의 이름이 kfserving-models-pvc 이고 저장 위치가 models/onnx/cifar10/ 이므로, pvc://kfserving-models-pvc/models/onnx/cifar10/model.onnx 라고 지정해 줍니다.

onnx.yaml

apiVersion: "serving.kubeflow.org/v1alpha2"
kind: "InferenceService"
metadata:
  name: "onnx-cifar10"
spec:
  default:
    predictor:
      onnx:
        storageUri: "pvc://kfserving-models-pvc/models/onnx/cifar10/model.onnx"

InferenceService 를 생성합니다.

다음은 admin 네임스페이스 InferenceService 를 생성하는 예제입니다.

kubectl -n admin apply -f onnx.yaml

생성한 InferenceService를 조회해 보겠습니다.

kubectl -n admin get inferenceservice

InferenceService 가 정상적으로 생성되면 다음과 같은 응답 결과를 확인할 수 있습니다.

NAME              URL                                                                  READY   DEFAULT TRAFFIC   CANARY TRAFFIC   AGE
onnx-cifar10      <http://onnx-cifar10.admin.example.com/v1/models/onnx-cifar10>         True    100                                56s

예측 실행하기

예측을 요청하기 위해서는 모델 서버에 접근해야 합니다. 모델 서버는 ingressgateway 를 통해서 접근할 수 있습니다. ingressgateway 는 모델 서버들을 구분하기 위해서 호스트 이름을 사용합니다. ingressgateway에 접근하 기 위한 주소는 앞서 정의한 CLUSTER_IP 를 사용하겠습니다.

예측을 요청할 데이터를 json 파일로 작성합니다.

데이터의 크기가 크기 때문에 git 에 있는 파일을 다운받아서 사용해주세요.

cifar10-input.json

{
  "inputs": {
    "input1": {
      "dims": [
        "1",
        "3",
        "32",
        "32"
      ],
      "dataType": 1,
      "rawData": "..."
    }
  }
}

다음은 admin 네임스페이스의 xgboost-iris InferenceService 에 예측을 요청하는 예제입니다.

MODEL_NAME=onnx-cifar10
SERVICE_HOSTNAME=$(kubectl -n admin get inferenceservice onnx-cifar10 -o jsonpath='{.status.url}' | cut -d "/" -f 3)

INPUT_PATH=@./cifar10-input.json
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://$CLUSTER_IP/v1/models/$MODEL_NAME:predict -d $INPUT_PATH

정상적으로 실행되면 다음과 같은 응답 결과를 확인 할 수 있습니다.

*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /v1/models/onnx-cifar10:predict HTTP/1.1
> Host: onnx-cifar10.admin.example.com
> User-Agent: curl/7.64.1
> Content-Type: application/json
> Accept: application/json
> Content-Length: 16533
> Expect: 100-continue
> 
< HTTP/1.1 100 Continue
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< content-length: 145
< content-type: application/json
< date: Sat, 04 Apr 2020 18:19:07 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 10068
< x-ms-request-id: 3f6c43cf-586a-416e-a55a-d0b3ac35057e
< 
* Connection #0 to host localhost left intact
{"outputs":{"output1":{"dims":["1","10"],"dataType":1,"rawData":"ejmIv2h5BsAdRVQ/PswYQCSLZL/Midc/rBSTP2QqU79gRCi/8NJQPQ==","dataLocation":"DEFAULT"}}}