Kubeflow – MXNet 학습하기

MXNet 알아 보기

MXNet은 카네기 멜론 대학과 워싱톤 대학이 시작한 오픈소스 딥러닝 프레임워크입니다. MXNet은 빠르고 확장 가능한 학습 및 추론 프레임워크로서 다양한 언어와 디바이스를 지원하고 있습니다.

MXJob을 이용해서 MXNet을 학습 할 수 있습니다. MXJob은 쿠버네티스에서 분산 또는 비 분산 MXNet 작업을 쉽게 실행할 수 있는 쿠버네티스 CR(Custom Resource) 입니다.

MXJob

MXJob은 Kubernetes에서 MXNet 학습 작업을 실행하는 데 사용할 수 있는, 쿠버네티스 사용자 리소스 입니다. MXJob의 구현은 mxnet-operator에 있습니다. mxnet-operator는 MXJob을 관리합니다. 쿠버네티스에 MXJob이 등록되면, 필요한 포드들을 생성하여 작업을 실행할 수 있도록 도와줍니다.

MXJob은 다음과 같이 YAML 형식으로 표현할 수 있는 쿠버네티스 사용자 리소스입니다.

apiVersion: "kubeflow.org/v1beta1"
kind: "MXJob"
metadata:
  name: "mxnet-job"
spec:
  jobMode: MXTrain
  mxReplicaSpecs:
    Scheduler:
      replicas: 1
      restartPolicy: Never
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: mxnet
              image: kangwoo/mxnet:cpu
    Server:
      replicas: 1
      restartPolicy: Never
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: mxnet
              image: kangwoo/mxnet:cpu
    Worker:
      replicas: 1
      restartPolicy: Never
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: mxnet
              image: kangwoo/mxnet:cpu

분산 처리 기술을 사용한 모델 학습

쿠버네티스에서 MXJob을 관리하고 있는 mxnet-operator 는 MXNet을 사용한 모델 학습시 분산 처리가 가능하도록 도와주고 있습니다.

MXNet에는 분산 처리 기술을 사용하기 위해서 세 가지 역할의 프로세스가 존재합니다.

  • Scheduler (스케줄러) : 스케줄러는 1대만 존재해야만 합니다. 스케줄러의 역할은 클러스터를 설정하는 것입니다. 모든 프로세스에 클러스터의 다른 노드들 알려서 서로 통신 할 수 있도록 도와주고 있습니다.
  • Server (서버) : 모델의 매개 변수를 저장하고 작업자와 통신하는 역할을 합니다. 서버는 1대 이상 있을 수 있습니다.
  • Worker (작업자) : 작업자 노드는 실제 모델 학습 작업을 수행하는 역할을 합니다. 각 배치를 처리하기 전에 서버에서 가중치를 가지고 오고, 배치 작업이 끝난 후 서버에 가중치를 그라디언트를 보냅니다. 작업자는 1대 이상 있을 수 있습니다.

MXNet은 환경 변수를 사용하여 프로세스의 역할 정의하고, 다른 프로세스가 스케줄러를 찾도록 합니다.

모델 학습을 시작하려면 다음과 같은 환경 변수가 올바르게 설정되어 있어야 합니다.

  • DMLC_ROLE : 프로세스의 역할을 설정합니다. 서버, 작업자 또는 스케줄러 일 수 있습니다. 스케줄러는 하나만 있어야 합니다.
  • DMLC_PS_ROOT_URI : 스케줄러의 IP를 설정합니다.
  • DMLC_PS_ROOT_PORT : 스케줄러의 포트를 설정합니다.
  • DMLC_NUM_SERVER : 클러스터에있는 서버 노드 수를 설정합니다.
  • DMLC_NUM_WORKER : 클러스터에있는 작업자 노드 수를 설정합니다.

MXJob 이라는 쿠터네티스트 CR을 생성하여 모델 학습을 진행 경우에는, mxnet-operator 가 프로세스의 역할에 맞게 자동으로 환경 변수를 설정해 줍니다. 그래서 분산 작업을 위한 별도의 환경 변수 설정 작업을 하지 않아도 됩니다.

다음은 MXJob에서 생성한 포드의 환경 변수를 일부 출력해 본 것입니다.

spec:
  containers:
  - env:
    - name: MX_CONFIG
      value: '{"cluster":{"scheduler":[{"url":"mxnet-job-scheduler-0","port":9091}],"server":[{"url":"mxnet-job-server-0","port":9091}],"worker":[{"url":"mxnet-job-worker-0","port":9091}]},"labels":{"scheduler":"","server":"","worker":""},"task":{"type":"server","index":0}}'
    - name: DMLC_PS_ROOT_PORT
      value: "9091"
    - name: DMLC_PS_ROOT_URI
      value: mxnet-job-scheduler-0
    - name: DMLC_NUM_SERVER
      value: "1"
    - name: DMLC_NUM_WORKER
      value: "1"
    - name: DMLC_ROLE
      value: server
    - name: DMLC_USE_KUBERNETES
      value: "1"

MXJob CRD와 Operator 설치하기

MXJob을 사용하려면 쿠버네티스에 CRD와 Operator가 설치되어 있어야 합니다.

쿠버네티스가 MXJob을 지원하고 있는지 확인하기

다음 명령어를 실행하면 쿠버네티스 클러스터에 MXNet CRD가 설치되어 있는지 확인할 수 있습니다.

kubectl get crd | grep mxjobs

CRD가 설치되어 있다면, 다음과 같은 응답 결과를 얻을 수 있습니다.

mxjobs.kubeflow.org                                  2020-03-01T08:41:16Z

만약 설치되어 있지 않은 경우에는, 다음과 같은 명령어로 설치할 수 있습니다.

git clone <https://github.com/kubeflow/manifests>
cd manifests/mxnet-job/mxnet-operator
kubectl apply -k base

참고 : 다음은 MXJob CRD와 mxnet-operator 생성이 필요한 전체 매니페스트 내용입니다.

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  labels:
    kustomize.component: mxnet-operator
  name: mxjobs.kubeflow.org
spec:
  group: kubeflow.org
  names:
    kind: MXJob
    plural: mxjobs
    singular: mxjob
  scope: Namespaced
  version: v1beta1
---
apiVersion: v1
kind: ServiceAccount
metadata:
  labels:
    app: mxnet-operator
    kustomize.component: mxnet-operator
  name: mxnet-operator
  namespace: kubeflow
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  labels:
    app: mxnet-operator
    kustomize.component: mxnet-operator
  name: mxnet-operator
rules:
- apiGroups:
  - kubeflow.org
  resources:
  - mxjobs
  verbs:
  - '*'
- apiGroups:
  - apiextensions.k8s.io
  resources:
  - customresourcedefinitions
  verbs:
  - '*'
- apiGroups:
  - storage.k8s.io
  resources:
  - storageclasses
  verbs:
  - '*'
- apiGroups:
  - batch
  resources:
  - jobs
  verbs:
  - '*'
- apiGroups:
  - ""
  resources:
  - configmaps
  - pods
  - services
  - endpoints
  - persistentvolumeclaims
  - events
  verbs:
  - '*'
- apiGroups:
  - apps
  - extensions
  resources:
  - deployments
  verbs:
  - '*'
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  labels:
    app: mxnet-operator
    kustomize.component: mxnet-operator
  name: mxnet-operator
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: mxnet-operator
subjects:
- kind: ServiceAccount
  name: mxnet-operator
  namespace: kubeflow
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    kustomize.component: mxnet-operator
  name: mxnet-operator
  namespace: kubeflow
spec:
  replicas: 1
  selector:
    matchLabels:
      kustomize.component: mxnet-operator
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: "false"
      labels:
        kustomize.component: mxnet-operator
        name: mxnet-operator
    spec:
      containers:
      - command:
        - /opt/kubeflow/mxnet-operator.v1beta1
        - --alsologtostderr
        - -v=1
        env:
        - name: MY_POD_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        image: mxjob/mxnet-operator:v1beta1
        imagePullPolicy: Always
        name: mxnet-operator
      serviceAccountName: mxnet-operator

MXNet 학습 작업 실행하기

MXTrain 모드로 MXJob을 정의한 후 학습 작업을 생성해 보겠습니다.

모델 코드 작성하기

MXNet으로 학습할 모델을 작성해 보겠습니다.

다음은 MXNet에서 제공하고 있는 이미지 분류 파이썬 코드인 image_classification.py 와 data.py 입니다. Gluon (https://github.com/apache/incubator-mxnet/tree/master/example/distributed_training) 을 사용해서 분산 학습을 지원하고 있습니다.

image_classification.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import division

import argparse, time, os
import logging

import mxnet as mx
from mxnet import gluon
from mxnet import profiler
from mxnet.gluon import nn
from mxnet.gluon.model_zoo import vision as models
from mxnet import autograd as ag
from mxnet.test_utils import get_mnist_iterator
from mxnet.metric import Accuracy, TopKAccuracy, CompositeEvalMetric
import numpy as np

from data import (get_cifar10_iterator, get_imagenet_iterator,
                  get_caltech101_iterator, dummy_iterator)

# logging
logging.basicConfig(level=logging.INFO)
fh = logging.FileHandler('image-classification.log')
logger = logging.getLogger()
logger.addHandler(fh)
formatter = logging.Formatter('%(message)s')
fh.setFormatter(formatter)
fh.setLevel(logging.DEBUG)
logging.debug('\\n%s', '-' * 100)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
fh.setFormatter(formatter)

# CLI
parser = argparse.ArgumentParser(description='Train a model for image classification.')
parser.add_argument('--dataset', type=str, default='cifar10',
                    help='dataset to use. options are mnist, cifar10, caltech101, imagenet and dummy.')
parser.add_argument('--data-dir', type=str, default='',
                  help='training directory of imagenet images, contains train/val subdirs.')
parser.add_argument('--num-worker', '-j', dest='num_workers', default=4, type=int,
                    help='number of workers for dataloader')
parser.add_argument('--batch-size', type=int, default=32,
                    help='training batch size per device (CPU/GPU).')
parser.add_argument('--gpus', type=str, default='',
                    help='ordinates of gpus to use, can be "0,1,2" or empty for cpu only.')
parser.add_argument('--epochs', type=int, default=120,
                    help='number of training epochs.')
parser.add_argument('--lr', type=float, default=0.1,
                    help='learning rate. default is 0.1.')
parser.add_argument('--momentum', type=float, default=0.9,
                    help='momentum value for optimizer, default is 0.9.')
parser.add_argument('--wd', type=float, default=0.0001,
                    help='weight decay rate. default is 0.0001.')
parser.add_argument('--seed', type=int, default=123,
                    help='random seed to use. Default=123.')
parser.add_argument('--mode', type=str,
                    help='mode in which to train the model. options are symbolic, imperative, hybrid')
parser.add_argument('--model', type=str, required=True,
                    help='type of model to use. see vision_model for options.')
parser.add_argument('--use_thumbnail', action='store_true',
                    help='use thumbnail or not in resnet. default is false.')
parser.add_argument('--batch-norm', action='store_true',
                    help='enable batch normalization or not in vgg. default is false.')
parser.add_argument('--use-pretrained', action='store_true',
                    help='enable using pretrained model from gluon.')
parser.add_argument('--prefix', default='', type=str,
                    help='path to checkpoint prefix, default is current working dir')
parser.add_argument('--start-epoch', default=0, type=int,
                    help='starting epoch, 0 for fresh training, > 0 to resume')
parser.add_argument('--resume', type=str, default='',
                    help='path to saved weight where you want resume')
parser.add_argument('--lr-factor', default=0.1, type=float,
                    help='learning rate decay ratio')
parser.add_argument('--lr-steps', default='30,60,90', type=str,
                    help='list of learning rate decay epochs as in str')
parser.add_argument('--dtype', default='float32', type=str,
                    help='data type, float32 or float16 if applicable')
parser.add_argument('--save-frequency', default=10, type=int,
                    help='epoch frequence to save model, best model will always be saved')
parser.add_argument('--kvstore', type=str, default='device',
                    help='kvstore to use for trainer/module.')
parser.add_argument('--log-interval', type=int, default=50,
                    help='Number of batches to wait before logging.')
parser.add_argument('--profile', action='store_true',
                    help='Option to turn on memory profiling for front-end, '\\
                         'and prints out the memory usage by python function at the end.')
parser.add_argument('--builtin-profiler', type=int, default=0, help='Enable built-in profiler (0=off, 1=on)')
opt = parser.parse_args()

# global variables
logger.info('Starting new image-classification task:, %s',opt)
mx.random.seed(opt.seed)
model_name = opt.model
dataset_classes = {'mnist': 10, 'cifar10': 10, 'caltech101':101, 'imagenet': 1000, 'dummy': 1000}
batch_size, dataset, classes = opt.batch_size, opt.dataset, dataset_classes[opt.dataset]
context = [mx.gpu(int(i)) for i in opt.gpus.split(',')] if opt.gpus.strip() else [mx.cpu()]
num_gpus = len(context)
batch_size *= max(1, num_gpus)
lr_steps = [int(x) for x in opt.lr_steps.split(',') if x.strip()]
metric = CompositeEvalMetric([Accuracy(), TopKAccuracy(5)])
kv = mx.kv.create(opt.kvstore)

def get_model(model, ctx, opt):
    """Model initialization."""
    kwargs = {'ctx': ctx, 'pretrained': opt.use_pretrained, 'classes': classes}
    if model.startswith('resnet'):
        kwargs['thumbnail'] = opt.use_thumbnail
    elif model.startswith('vgg'):
        kwargs['batch_norm'] = opt.batch_norm

    net = models.get_model(model, **kwargs)
    if opt.resume:
        net.load_parameters(opt.resume)
    elif not opt.use_pretrained:
        if model in ['alexnet']:
            net.initialize(mx.init.Normal())
        else:
            net.initialize(mx.init.Xavier(magnitude=2))
    net.cast(opt.dtype)
    return net

net = get_model(opt.model, context, opt)

def get_data_iters(dataset, batch_size, opt):
    """get dataset iterators"""
    if dataset == 'mnist':
        train_data, val_data = get_mnist_iterator(batch_size, (1, 28, 28),
                                                  num_parts=kv.num_workers, part_index=kv.rank)
    elif dataset == 'cifar10':
        train_data, val_data = get_cifar10_iterator(batch_size, (3, 32, 32),
                                                    num_parts=kv.num_workers, part_index=kv.rank)
    elif dataset == 'imagenet':
        shape_dim = 299 if model_name == 'inceptionv3' else 224

        if not opt.data_dir:
            raise ValueError('Dir containing raw images in train/val is required for imagenet.'
                             'Please specify "--data-dir"')

        train_data, val_data = get_imagenet_iterator(opt.data_dir, batch_size,
                                                                opt.num_workers, shape_dim, opt.dtype)
    elif dataset == 'caltech101':
        train_data, val_data = get_caltech101_iterator(batch_size, opt.num_workers, opt.dtype)
    elif dataset == 'dummy':
        shape_dim = 299 if model_name == 'inceptionv3' else 224
        train_data, val_data = dummy_iterator(batch_size, (3, shape_dim, shape_dim))
    return train_data, val_data

def test(ctx, val_data):
    metric.reset()
    val_data.reset()
    for batch in val_data:
        data = gluon.utils.split_and_load(batch.data[0].astype(opt.dtype, copy=False),
                                          ctx_list=ctx, batch_axis=0)
        label = gluon.utils.split_and_load(batch.label[0].astype(opt.dtype, copy=False),
                                           ctx_list=ctx, batch_axis=0)
        outputs = [net(X) for X in data]
        metric.update(label, outputs)
    return metric.get()

def update_learning_rate(lr, trainer, epoch, ratio, steps):
    """Set the learning rate to the initial value decayed by ratio every N epochs."""
    new_lr = lr * (ratio ** int(np.sum(np.array(steps) < epoch)))
    trainer.set_learning_rate(new_lr)
    return trainer

def save_checkpoint(epoch, top1, best_acc):
    if opt.save_frequency and (epoch + 1) % opt.save_frequency == 0:
        fname = os.path.join(opt.prefix, '%s_%d_acc_%.4f.params' % (opt.model, epoch, top1))
        net.save_parameters(fname)
        logger.info('[Epoch %d] Saving checkpoint to %s with Accuracy: %.4f', epoch, fname, top1)
    if top1 > best_acc[0]:
        best_acc[0] = top1
        fname = os.path.join(opt.prefix, '%s_best.params' % (opt.model))
        net.save_parameters(fname)
        logger.info('[Epoch %d] Saving checkpoint to %s with Accuracy: %.4f', epoch, fname, top1)

def train(opt, ctx):
    if isinstance(ctx, mx.Context):
        ctx = [ctx]

    train_data, val_data = get_data_iters(dataset, batch_size, opt)
    net.collect_params().reset_ctx(ctx)
    trainer = gluon.Trainer(net.collect_params(), 'sgd',
                            optimizer_params={'learning_rate': opt.lr,
                                              'wd': opt.wd,
                                              'momentum': opt.momentum,
                                              'multi_precision': True},
                            kvstore=kv)
    loss = gluon.loss.SoftmaxCrossEntropyLoss()

    total_time = 0
    num_epochs = 0
    best_acc = [0]
    for epoch in range(opt.start_epoch, opt.epochs):
        trainer = update_learning_rate(opt.lr, trainer, epoch, opt.lr_factor, lr_steps)
        tic = time.time()
        train_data.reset()
        metric.reset()
        btic = time.time()
        for i, batch in enumerate(train_data):
            data = gluon.utils.split_and_load(batch.data[0].astype(opt.dtype), ctx_list=ctx, batch_axis=0)
            label = gluon.utils.split_and_load(batch.label[0].astype(opt.dtype), ctx_list=ctx, batch_axis=0)
            outputs = []
            Ls = []
            with ag.record():
                for x, y in zip(data, label):
                    z = net(x)
                    L = loss(z, y)
                    # store the loss and do backward after we have done forward
                    # on all GPUs for better speed on multiple GPUs.
                    Ls.append(L)
                    outputs.append(z)
                ag.backward(Ls)
            trainer.step(batch.data[0].shape[0])
            metric.update(label, outputs)
            if opt.log_interval and not (i+1)%opt.log_interval:
                name, acc = metric.get()
                logger.info('Epoch[%d] Batch [%d]\\tSpeed: %f samples/sec\\t%s=%f, %s=%f'%(
                               epoch, i, batch_size/(time.time()-btic), name[0], acc[0], name[1], acc[1]))
            btic = time.time()

        epoch_time = time.time()-tic

        # First epoch will usually be much slower than the subsequent epics,
        # so don't factor into the average
        if num_epochs > 0:
          total_time = total_time + epoch_time
        num_epochs = num_epochs + 1

        name, acc = metric.get()
        logger.info('[Epoch %d] training: %s=%f, %s=%f'%(epoch, name[0], acc[0], name[1], acc[1]))
        logger.info('[Epoch %d] time cost: %f'%(epoch, epoch_time))
        name, val_acc = test(ctx, val_data)
        logger.info('[Epoch %d] validation: %s=%f, %s=%f'%(epoch, name[0], val_acc[0], name[1], val_acc[1]))

        # save model if meet requirements
        save_checkpoint(epoch, val_acc[0], best_acc)
    if num_epochs > 1:
        print('Average epoch time: {}'.format(float(total_time)/(num_epochs - 1)))

def main():
    if opt.builtin_profiler > 0:
        profiler.set_config(profile_all=True, aggregate_stats=True)
        profiler.set_state('run')
    if opt.mode == 'symbolic':
        data = mx.sym.var('data')
        if opt.dtype == 'float16':
            data = mx.sym.Cast(data=data, dtype=np.float16)
        out = net(data)
        if opt.dtype == 'float16':
            out = mx.sym.Cast(data=out, dtype=np.float32)
        softmax = mx.sym.SoftmaxOutput(out, name='softmax')
        mod = mx.mod.Module(softmax, context=context)
        train_data, val_data = get_data_iters(dataset, batch_size, opt)
        mod.fit(train_data,
                eval_data=val_data,
                num_epoch=opt.epochs,
                kvstore=kv,
                batch_end_callback = mx.callback.Speedometer(batch_size, max(1, opt.log_interval)),
                epoch_end_callback = mx.callback.do_checkpoint('image-classifier-%s'% opt.model),
                optimizer = 'sgd',
                optimizer_params = {'learning_rate': opt.lr, 'wd': opt.wd, 'momentum': opt.momentum, 'multi_precision': True},
                initializer = mx.init.Xavier(magnitude=2))
        mod.save_parameters('image-classifier-%s-%d-final.params'%(opt.model, opt.epochs))
    else:
        if opt.mode == 'hybrid':
            net.hybridize()
        train(opt, context)
    if opt.builtin_profiler > 0:
        profiler.set_state('stop')
        print(profiler.dumps())

if __name__ == '__main__':
    if opt.profile:
        import hotshot, hotshot.stats
        prof = hotshot.Profile('image-classifier-%s-%s.prof'%(opt.model, opt.mode))
        prof.runcall(main)
        prof.close()
        stats = hotshot.stats.load('image-classifier-%s-%s.prof'%(opt.model, opt.mode))
        stats.strip_dirs()
        stats.sort_stats('cumtime', 'calls')
        stats.print_stats()
    else:
        main()

data.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# pylint: skip-file
""" data iterator for mnist """
import os
import random
import tarfile
import logging
import tarfile
logging.basicConfig(level=logging.INFO)

import mxnet as mx
from mxnet.test_utils import get_cifar10
from mxnet.gluon.data.vision import ImageFolderDataset
from mxnet.gluon.data import DataLoader
from mxnet.contrib.io import DataLoaderIter

def get_cifar10_iterator(batch_size, data_shape, resize=-1, num_parts=1, part_index=0):
    get_cifar10()

    train = mx.io.ImageRecordIter(
        path_imgrec = "data/cifar/train.rec",
        # mean_img    = "data/cifar/mean.bin",
        resize      = resize,
        data_shape  = data_shape,
        batch_size  = batch_size,
        rand_crop   = True,
        rand_mirror = True,
        num_parts=num_parts,
        part_index=part_index)

    val = mx.io.ImageRecordIter(
        path_imgrec = "data/cifar/test.rec",
        # mean_img    = "data/cifar/mean.bin",
        resize      = resize,
        rand_crop   = False,
        rand_mirror = False,
        data_shape  = data_shape,
        batch_size  = batch_size,
        num_parts=num_parts,
        part_index=part_index)

    return train, val

def get_imagenet_transforms(data_shape=224, dtype='float32'):
    def train_transform(image, label):
        image, _ = mx.image.random_size_crop(image, (data_shape, data_shape), 0.08, (3/4., 4/3.))
        image = mx.nd.image.random_flip_left_right(image)
        image = mx.nd.image.to_tensor(image)
        image = mx.nd.image.normalize(image, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
        return mx.nd.cast(image, dtype), label

    def val_transform(image, label):
        image = mx.image.resize_short(image, data_shape + 32)
        image, _ = mx.image.center_crop(image, (data_shape, data_shape))
        image = mx.nd.image.to_tensor(image)
        image = mx.nd.image.normalize(image, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
        return mx.nd.cast(image, dtype), label
    return train_transform, val_transform

def get_imagenet_iterator(root, batch_size, num_workers, data_shape=224, dtype='float32'):
    """Dataset loader with preprocessing."""
    train_dir = os.path.join(root, 'train')
    train_transform, val_transform = get_imagenet_transforms(data_shape, dtype)
    logging.info("Loading image folder %s, this may take a bit long...", train_dir)
    train_dataset = ImageFolderDataset(train_dir, transform=train_transform)
    train_data = DataLoader(train_dataset, batch_size, shuffle=True,
                            last_batch='discard', num_workers=num_workers)
    val_dir = os.path.join(root, 'val')
    if not os.path.isdir(os.path.expanduser(os.path.join(root, 'val', 'n01440764'))):
        user_warning = 'Make sure validation images are stored in one subdir per category, a helper script is available at <https://git.io/vNQv1>'
        raise ValueError(user_warning)
    logging.info("Loading image folder %s, this may take a bit long...", val_dir)
    val_dataset = ImageFolderDataset(val_dir, transform=val_transform)
    val_data = DataLoader(val_dataset, batch_size, last_batch='keep', num_workers=num_workers)
    return DataLoaderIter(train_data, dtype), DataLoaderIter(val_data, dtype)

def get_caltech101_data():
    url = "<https://s3.us-east-2.amazonaws.com/mxnet-public/101_ObjectCategories.tar.gz>"
    dataset_name = "101_ObjectCategories"
    data_folder = "data"
    if not os.path.isdir(data_folder):
        os.makedirs(data_folder)
    tar_path = mx.gluon.utils.download(url, path=data_folder)
    if (not os.path.isdir(os.path.join(data_folder, "101_ObjectCategories")) or
        not os.path.isdir(os.path.join(data_folder, "101_ObjectCategories_test"))):
        tar = tarfile.open(tar_path, "r:gz")
        tar.extractall(data_folder)
        tar.close()
        print('Data extracted')
    training_path = os.path.join(data_folder, dataset_name)
    testing_path = os.path.join(data_folder, "{}_test".format(dataset_name))
    return training_path, testing_path

def get_caltech101_iterator(batch_size, num_workers, dtype):
    def transform(image, label):
        # resize the shorter edge to 224, the longer edge will be greater or equal to 224
        resized = mx.image.resize_short(image, 224)
        # center and crop an area of size (224,224)
        cropped, crop_info = mx.image.center_crop(resized, (224, 224))
        # transpose the channels to be (3,224,224)
        transposed = mx.nd.transpose(cropped, (2, 0, 1))
        return transposed, label

    training_path, testing_path = get_caltech101_data()
    dataset_train = ImageFolderDataset(root=training_path, transform=transform)
    dataset_test = ImageFolderDataset(root=testing_path, transform=transform)

    train_data = DataLoader(dataset_train, batch_size, shuffle=True, num_workers=num_workers)
    test_data = DataLoader(dataset_test, batch_size, shuffle=False, num_workers=num_workers)
    return DataLoaderIter(train_data), DataLoaderIter(test_data)

class DummyIter(mx.io.DataIter):
    def __init__(self, batch_size, data_shape, batches = 100):
        super(DummyIter, self).__init__(batch_size)
        self.data_shape = (batch_size,) + data_shape
        self.label_shape = (batch_size,)
        self.provide_data = [('data', self.data_shape)]
        self.provide_label = [('softmax_label', self.label_shape)]
        self.batch = mx.io.DataBatch(data=[mx.nd.zeros(self.data_shape)],
                                     label=[mx.nd.zeros(self.label_shape)])
        self._batches = 0
        self.batches = batches

    def next(self):
        if self._batches < self.batches:
            self._batches += 1
            return self.batch
        else:
            self._batches = 0
            raise StopIteration

def dummy_iterator(batch_size, data_shape):
    return DummyIter(batch_size, data_shape), DummyIter(batch_size, data_shape)

class ImagePairIter(mx.io.DataIter):
    def __init__(self, path, data_shape, label_shape, batch_size=64, flag=0, input_aug=None, target_aug=None):
        super(ImagePairIter, self).__init__(batch_size)
        self.data_shape = (batch_size,) + data_shape
        self.label_shape = (batch_size,) + label_shape
        self.input_aug = input_aug
        self.target_aug = target_aug
        self.provide_data = [('data', self.data_shape)]
        self.provide_label = [('label', self.label_shape)]
        is_image_file = lambda fn: any(fn.endswith(ext) for ext in [".png", ".jpg", ".jpeg"])
        self.filenames = [os.path.join(path, x) for x in os.listdir(path) if is_image_file(x)]
        self.count = 0
        self.flag = flag
        random.shuffle(self.filenames)

    def next(self):
        from PIL import Image
        if self.count + self.batch_size <= len(self.filenames):
            data = []
            label = []
            for i in range(self.batch_size):
                fn = self.filenames[self.count]
                self.count += 1
                image = Image.open(fn).convert('YCbCr').split()[0]
                if image.size[0] > image.size[1]:
                    image = image.transpose(Image.TRANSPOSE)
                image = mx.nd.expand_dims(mx.nd.array(image), axis=2)
                target = image.copy()
                for aug in self.input_aug:
                    image = aug(image)
                for aug in self.target_aug:
                    target = aug(target)
                data.append(image)
                label.append(target)

            data = mx.nd.concat(*[mx.nd.expand_dims(d, axis=0) for d in data], dim=0)
            label = mx.nd.concat(*[mx.nd.expand_dims(d, axis=0) for d in label], dim=0)
            data = [mx.nd.transpose(data, axes=(0, 3, 1, 2)).astype('float32')/255]
            label = [mx.nd.transpose(label, axes=(0, 3, 1, 2)).astype('float32')/255]

            return mx.io.DataBatch(data=data, label=label)
        else:
            raise StopIteration

    def reset(self):
        self.count = 0
        random.shuffle(self.filenames)

모델 컨테이너 이미지 만들기

모델 학습용 컨테이너 이미지를 만들기 위해서 Dockerfile을 생성하겠습니다.

다음은 MXNet 1.5를 기반 이미지로 해서, 모델 파일을 추가하는 Dockerfile 입니다.

Dockerfile

FROM mxnet/python:1.5.0_cpu_py3

RUN mkdir -p /app
COPY image_classification.py /app/
COPY data.py /app/

WORKDIR /app

CMD ["python3", "image_classification.py", "--dataset", "cifar10", "--model", "vgg11", "--epochs", "1", "--kvstore", "dist_sync"]

다음 명령어를 실행하면 kangwoo/mxnet:cpu 라는 이름의 컨테이너 이미지를 빌드 할 수 있습니다.

docker build -t kangwoo/mxnet:cpu .

빌드한 컨테이너 이미지를 컨테이너 레지스트리로 업로드 하겠습니다.

docker push kangwoo/mxnet:cpu

MXJob 생성하기

mxnet-operator를 사용해서 MXNet 모델 학습을 하라면 MXJob을 정의해야합니다.

  1. jobMode를 MXTrain로 설정합니다. jobMode: MXTrain
  2. mxReplicaSpecs를 설정합니다. “mxReplicaSpecs”은 MXNet의 분산 학습시 사용하는 프로세스들을 정의하는데 사용합니다. Scheduler와 Server를 1개로 설정하고, Worker도 1개로 설정합니다. Kubeflow 클러스터에 istio가 설치되어 있기 때문에, 자동으로 istio-proxy가 포드에 주입됩니다. 이것을 방지하기 위해서 어노테이션에 sidecar.istio.io/inject: “false” 을 추가해 주었습니다. mxReplicaSpecs: Scheduler: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: “false” spec: containers: – name: mxnet image: kangwoo/mxnet:cpu Server: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: “false” spec: containers: – name: mxnet image: kangwoo/mxnet:cpu Worker: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: “false” spec: containers: – name: mxnet image: kangwoo/mxnet:cpu

다음은 MXJob 을 생성한 위한 메니페스트입니다.

mxnet-job.yaml

apiVersion: "kubeflow.org/v1beta1"
kind: "MXJob"
metadata:
  name: "mxnet-job"
spec:
  jobMode: MXTrain
  mxReplicaSpecs:
    Scheduler:
      replicas: 1
      restartPolicy: Never
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: mxnet
              image: kangwoo/mxnet:cpu
    Server:
      replicas: 1
      restartPolicy: Never
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: mxnet
              image: kangwoo/mxnet:cpu
    Worker:
      replicas: 1
      restartPolicy: Never
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
            - name: mxnet
              image: kangwoo/mxnet:cpu

다음 명령어를 실행하면 admin 네임스페이스에 mxnet-job 이라는 이름의 MXJob을 생성할 수 있습니다.

kubectl -n admin apply -f mxnet-job.yaml

MXNet 학습 작업 확인하기

생성한 MXJob은 다음 명령어를 실행해서 확인 해 볼 수 있습니다.

kubectl -n admin get mxjob

생성된 MXJob이 있다면, 다음과 같은 응답 결과를 얻을 수 있습니다.

NAME        AGE
mxnet-job   5s

MXJob이 생성되면, mxnet-operator 에 의해서 포드들이 생성됩니다. MXjob 매니페스트에 정의한 갯수대로 scheduler, server, worker 포드가 생성되게 됩니다.

생성된 포드들은 다음 명령어를 실행해서 확인 해 볼 수 있습니다.

kubectl -n admin get pod -l mxnet_job_name=mxnet-job

생성된 포드들이 남아 있다면, 다음과 같은 응답 결과를 얻을 수 있습니다.

NAME                    READY   STATUS    RESTARTS   AGE
mxnet-job-scheduler-0   1/1     Running   0          9s
mxnet-job-server-0      1/1     Running   0          9s
mxnet-job-worker-0      1/1     Running   0          9

MXJob은 작업이 끝난 후, 관련 포드들을 삭제해버립니다. 그래서 작업이 완료되면 포드가 조회되지 않을 수 있습니다. 작업이 완료되어도 포드들을 남겨 두고 싶다면, MXJob 매니페스트의 spec 부분에 “cleanPodPolicy: None” 를 추가하시면 됩니다.

MXJob spec의 CleanPodPolicy는 작업이 종료 될 때 포드 삭제를 제어할 때 사용합니다. 다음 값들 중 하나를 사용할 수 있습니다.

  • Running : 작업이 완료되었을 때, 실행이 끝난(Completed) 포드들은 삭제하지 않고, 실행중인(Running) 포드들만 삭제합니다.
  • All : 작업이 완료되었을 때, 실행이 끝난 포드들을 즉시 삭제합니다.
  • None : 작업이 완료되어도 포드들을 삭제하지 않습니다.

다음은 cleanPodPolicy를 추가한 메니페스트 예제입니다.

apiVersion: "kubeflow.org/v1beta1"
kind: "MXJob"
metadata:
  name: "mxnet-job"
spec:
  cleanPodPolicy: None
...

MXJob의 작업 상태를 알고 싶으면 describe 명령어를 사용할 수 있습니다.

다음 명령어를 실행하면 admin 네임스페이스에 mxnet-job 이라는 이름의 MXJob의 상태를 조회할 수 있습니다.

kubectl -n admin describe mxjob mxnet-job

다음은 예제 작업에 대한 샘플 출력입니다.

Name:         mxnet-job
Namespace:    admin
Labels:       <none>
Annotations:  ...
API Version:  kubeflow.org/v1beta1
Kind:         MXJob
Metadata:
...
Spec:
...
Status:
  Completion Time:  2020-03-08T00:44:23Z
  Conditions:
    Last Transition Time:  2020-03-07T23:45:12Z
    Last Update Time:      2020-03-07T23:45:12Z
    Message:               MXJob mxnet-job is created.
    Reason:                MXJobCreated
    Status:                True
    Type:                  Created
    Last Transition Time:  2020-03-07T23:45:12Z
    Last Update Time:      2020-03-07T23:45:14Z
    Message:               MXJob mxnet-job is running.
    Reason:                MXJobRunning
    Status:                False
    Type:                  Running
    Last Transition Time:  2020-03-07T23:45:12Z
    Last Update Time:      2020-03-08T00:44:23Z
    Message:               MXJob mxnet-job is successfully completed.
    Reason:                MXJobSucceeded
    Status:                True
    Type:                  Succeeded
  Mx Replica Statuses:
    Scheduler:
    Server:
    Worker:
  Start Time:  2020-03-07T23:45:14Z
Events:        <none>

MXNet 학습 작업 삭제하기

작업이 완료되어도 MXJob은 삭제되지 않습니다.

다음 명령어를 실행하면 admin 네임스페이스에 mxnet-job 이라는 이름의 MXJob을 삭제할 수 있습니다.

kubectl -n admin delete mxjob mxnet-job

참고