MXNet 알아 보기
MXNet은 카네기 멜론 대학과 워싱톤 대학이 시작한 오픈소스 딥러닝 프레임워크입니다. MXNet은 빠르고 확장 가능한 학습 및 추론 프레임워크로서 다양한 언어와 디바이스를 지원하고 있습니다.
MXJob을 이용해서 MXNet을 학습 할 수 있습니다. MXJob은 쿠버네티스에서 분산 또는 비 분산 MXNet 작업을 쉽게 실행할 수 있는 쿠버네티스 CR(Custom Resource) 입니다.
MXJob
MXJob은 Kubernetes에서 MXNet 학습 작업을 실행하는 데 사용할 수 있는, 쿠버네티스 사용자 리소스 입니다. MXJob의 구현은 mxnet-operator에 있습니다. mxnet-operator는 MXJob을 관리합니다. 쿠버네티스에 MXJob이 등록되면, 필요한 포드들을 생성하여 작업을 실행할 수 있도록 도와줍니다.
MXJob은 다음과 같이 YAML 형식으로 표현할 수 있는 쿠버네티스 사용자 리소스입니다.
apiVersion: "kubeflow.org/v1beta1" kind: "MXJob" metadata: name: "mxnet-job" spec: jobMode: MXTrain mxReplicaSpecs: Scheduler: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: mxnet image: kangwoo/mxnet:cpu Server: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: mxnet image: kangwoo/mxnet:cpu Worker: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: mxnet image: kangwoo/mxnet:cpu
분산 처리 기술을 사용한 모델 학습
쿠버네티스에서 MXJob을 관리하고 있는 mxnet-operator 는 MXNet을 사용한 모델 학습시 분산 처리가 가능하도록 도와주고 있습니다.
MXNet에는 분산 처리 기술을 사용하기 위해서 세 가지 역할의 프로세스가 존재합니다.
- Scheduler (스케줄러) : 스케줄러는 1대만 존재해야만 합니다. 스케줄러의 역할은 클러스터를 설정하는 것입니다. 모든 프로세스에 클러스터의 다른 노드들 알려서 서로 통신 할 수 있도록 도와주고 있습니다.
- Server (서버) : 모델의 매개 변수를 저장하고 작업자와 통신하는 역할을 합니다. 서버는 1대 이상 있을 수 있습니다.
- Worker (작업자) : 작업자 노드는 실제 모델 학습 작업을 수행하는 역할을 합니다. 각 배치를 처리하기 전에 서버에서 가중치를 가지고 오고, 배치 작업이 끝난 후 서버에 가중치를 그라디언트를 보냅니다. 작업자는 1대 이상 있을 수 있습니다.
MXNet은 환경 변수를 사용하여 프로세스의 역할 정의하고, 다른 프로세스가 스케줄러를 찾도록 합니다.
모델 학습을 시작하려면 다음과 같은 환경 변수가 올바르게 설정되어 있어야 합니다.
- DMLC_ROLE : 프로세스의 역할을 설정합니다. 서버, 작업자 또는 스케줄러 일 수 있습니다. 스케줄러는 하나만 있어야 합니다.
- DMLC_PS_ROOT_URI : 스케줄러의 IP를 설정합니다.
- DMLC_PS_ROOT_PORT : 스케줄러의 포트를 설정합니다.
- DMLC_NUM_SERVER : 클러스터에있는 서버 노드 수를 설정합니다.
- DMLC_NUM_WORKER : 클러스터에있는 작업자 노드 수를 설정합니다.
MXJob 이라는 쿠터네티스트 CR을 생성하여 모델 학습을 진행 경우에는, mxnet-operator 가 프로세스의 역할에 맞게 자동으로 환경 변수를 설정해 줍니다. 그래서 분산 작업을 위한 별도의 환경 변수 설정 작업을 하지 않아도 됩니다.
다음은 MXJob에서 생성한 포드의 환경 변수를 일부 출력해 본 것입니다.
spec: containers: - env: - name: MX_CONFIG value: '{"cluster":{"scheduler":[{"url":"mxnet-job-scheduler-0","port":9091}],"server":[{"url":"mxnet-job-server-0","port":9091}],"worker":[{"url":"mxnet-job-worker-0","port":9091}]},"labels":{"scheduler":"","server":"","worker":""},"task":{"type":"server","index":0}}' - name: DMLC_PS_ROOT_PORT value: "9091" - name: DMLC_PS_ROOT_URI value: mxnet-job-scheduler-0 - name: DMLC_NUM_SERVER value: "1" - name: DMLC_NUM_WORKER value: "1" - name: DMLC_ROLE value: server - name: DMLC_USE_KUBERNETES value: "1"
MXJob CRD와 Operator 설치하기
MXJob을 사용하려면 쿠버네티스에 CRD와 Operator가 설치되어 있어야 합니다.
쿠버네티스가 MXJob을 지원하고 있는지 확인하기
다음 명령어를 실행하면 쿠버네티스 클러스터에 MXNet CRD가 설치되어 있는지 확인할 수 있습니다.
kubectl get crd | grep mxjobs
CRD가 설치되어 있다면, 다음과 같은 응답 결과를 얻을 수 있습니다.
mxjobs.kubeflow.org 2020-03-01T08:41:16Z
만약 설치되어 있지 않은 경우에는, 다음과 같은 명령어로 설치할 수 있습니다.
git clone <https://github.com/kubeflow/manifests> cd manifests/mxnet-job/mxnet-operator kubectl apply -k base
참고 : 다음은 MXJob CRD와 mxnet-operator 생성이 필요한 전체 매니페스트 내용입니다.
apiVersion: apiextensions.k8s.io/v1beta1 kind: CustomResourceDefinition metadata: labels: kustomize.component: mxnet-operator name: mxjobs.kubeflow.org spec: group: kubeflow.org names: kind: MXJob plural: mxjobs singular: mxjob scope: Namespaced version: v1beta1 --- apiVersion: v1 kind: ServiceAccount metadata: labels: app: mxnet-operator kustomize.component: mxnet-operator name: mxnet-operator namespace: kubeflow --- apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRole metadata: labels: app: mxnet-operator kustomize.component: mxnet-operator name: mxnet-operator rules: - apiGroups: - kubeflow.org resources: - mxjobs verbs: - '*' - apiGroups: - apiextensions.k8s.io resources: - customresourcedefinitions verbs: - '*' - apiGroups: - storage.k8s.io resources: - storageclasses verbs: - '*' - apiGroups: - batch resources: - jobs verbs: - '*' - apiGroups: - "" resources: - configmaps - pods - services - endpoints - persistentvolumeclaims - events verbs: - '*' - apiGroups: - apps - extensions resources: - deployments verbs: - '*' --- apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding metadata: labels: app: mxnet-operator kustomize.component: mxnet-operator name: mxnet-operator roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: mxnet-operator subjects: - kind: ServiceAccount name: mxnet-operator namespace: kubeflow --- apiVersion: apps/v1 kind: Deployment metadata: labels: kustomize.component: mxnet-operator name: mxnet-operator namespace: kubeflow spec: replicas: 1 selector: matchLabels: kustomize.component: mxnet-operator template: metadata: annotations: sidecar.istio.io/inject: "false" labels: kustomize.component: mxnet-operator name: mxnet-operator spec: containers: - command: - /opt/kubeflow/mxnet-operator.v1beta1 - --alsologtostderr - -v=1 env: - name: MY_POD_NAMESPACE valueFrom: fieldRef: fieldPath: metadata.namespace - name: MY_POD_NAME valueFrom: fieldRef: fieldPath: metadata.name image: mxjob/mxnet-operator:v1beta1 imagePullPolicy: Always name: mxnet-operator serviceAccountName: mxnet-operator
MXNet 학습 작업 실행하기
MXTrain 모드로 MXJob을 정의한 후 학습 작업을 생성해 보겠습니다.
모델 코드 작성하기
MXNet으로 학습할 모델을 작성해 보겠습니다.
다음은 MXNet에서 제공하고 있는 이미지 분류 파이썬 코드인 image_classification.py 와 data.py 입니다. Gluon (https://github.com/apache/incubator-mxnet/tree/master/example/distributed_training) 을 사용해서 분산 학습을 지원하고 있습니다.
image_classification.py
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # <http://www.apache.org/licenses/LICENSE-2.0> # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import division import argparse, time, os import logging import mxnet as mx from mxnet import gluon from mxnet import profiler from mxnet.gluon import nn from mxnet.gluon.model_zoo import vision as models from mxnet import autograd as ag from mxnet.test_utils import get_mnist_iterator from mxnet.metric import Accuracy, TopKAccuracy, CompositeEvalMetric import numpy as np from data import (get_cifar10_iterator, get_imagenet_iterator, get_caltech101_iterator, dummy_iterator) # logging logging.basicConfig(level=logging.INFO) fh = logging.FileHandler('image-classification.log') logger = logging.getLogger() logger.addHandler(fh) formatter = logging.Formatter('%(message)s') fh.setFormatter(formatter) fh.setLevel(logging.DEBUG) logging.debug('\\n%s', '-' * 100) formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') fh.setFormatter(formatter) # CLI parser = argparse.ArgumentParser(description='Train a model for image classification.') parser.add_argument('--dataset', type=str, default='cifar10', help='dataset to use. options are mnist, cifar10, caltech101, imagenet and dummy.') parser.add_argument('--data-dir', type=str, default='', help='training directory of imagenet images, contains train/val subdirs.') parser.add_argument('--num-worker', '-j', dest='num_workers', default=4, type=int, help='number of workers for dataloader') parser.add_argument('--batch-size', type=int, default=32, help='training batch size per device (CPU/GPU).') parser.add_argument('--gpus', type=str, default='', help='ordinates of gpus to use, can be "0,1,2" or empty for cpu only.') parser.add_argument('--epochs', type=int, default=120, help='number of training epochs.') parser.add_argument('--lr', type=float, default=0.1, help='learning rate. default is 0.1.') parser.add_argument('--momentum', type=float, default=0.9, help='momentum value for optimizer, default is 0.9.') parser.add_argument('--wd', type=float, default=0.0001, help='weight decay rate. default is 0.0001.') parser.add_argument('--seed', type=int, default=123, help='random seed to use. Default=123.') parser.add_argument('--mode', type=str, help='mode in which to train the model. options are symbolic, imperative, hybrid') parser.add_argument('--model', type=str, required=True, help='type of model to use. see vision_model for options.') parser.add_argument('--use_thumbnail', action='store_true', help='use thumbnail or not in resnet. default is false.') parser.add_argument('--batch-norm', action='store_true', help='enable batch normalization or not in vgg. default is false.') parser.add_argument('--use-pretrained', action='store_true', help='enable using pretrained model from gluon.') parser.add_argument('--prefix', default='', type=str, help='path to checkpoint prefix, default is current working dir') parser.add_argument('--start-epoch', default=0, type=int, help='starting epoch, 0 for fresh training, > 0 to resume') parser.add_argument('--resume', type=str, default='', help='path to saved weight where you want resume') parser.add_argument('--lr-factor', default=0.1, type=float, help='learning rate decay ratio') parser.add_argument('--lr-steps', default='30,60,90', type=str, help='list of learning rate decay epochs as in str') parser.add_argument('--dtype', default='float32', type=str, help='data type, float32 or float16 if applicable') parser.add_argument('--save-frequency', default=10, type=int, help='epoch frequence to save model, best model will always be saved') parser.add_argument('--kvstore', type=str, default='device', help='kvstore to use for trainer/module.') parser.add_argument('--log-interval', type=int, default=50, help='Number of batches to wait before logging.') parser.add_argument('--profile', action='store_true', help='Option to turn on memory profiling for front-end, '\\ 'and prints out the memory usage by python function at the end.') parser.add_argument('--builtin-profiler', type=int, default=0, help='Enable built-in profiler (0=off, 1=on)') opt = parser.parse_args() # global variables logger.info('Starting new image-classification task:, %s',opt) mx.random.seed(opt.seed) model_name = opt.model dataset_classes = {'mnist': 10, 'cifar10': 10, 'caltech101':101, 'imagenet': 1000, 'dummy': 1000} batch_size, dataset, classes = opt.batch_size, opt.dataset, dataset_classes[opt.dataset] context = [mx.gpu(int(i)) for i in opt.gpus.split(',')] if opt.gpus.strip() else [mx.cpu()] num_gpus = len(context) batch_size *= max(1, num_gpus) lr_steps = [int(x) for x in opt.lr_steps.split(',') if x.strip()] metric = CompositeEvalMetric([Accuracy(), TopKAccuracy(5)]) kv = mx.kv.create(opt.kvstore) def get_model(model, ctx, opt): """Model initialization.""" kwargs = {'ctx': ctx, 'pretrained': opt.use_pretrained, 'classes': classes} if model.startswith('resnet'): kwargs['thumbnail'] = opt.use_thumbnail elif model.startswith('vgg'): kwargs['batch_norm'] = opt.batch_norm net = models.get_model(model, **kwargs) if opt.resume: net.load_parameters(opt.resume) elif not opt.use_pretrained: if model in ['alexnet']: net.initialize(mx.init.Normal()) else: net.initialize(mx.init.Xavier(magnitude=2)) net.cast(opt.dtype) return net net = get_model(opt.model, context, opt) def get_data_iters(dataset, batch_size, opt): """get dataset iterators""" if dataset == 'mnist': train_data, val_data = get_mnist_iterator(batch_size, (1, 28, 28), num_parts=kv.num_workers, part_index=kv.rank) elif dataset == 'cifar10': train_data, val_data = get_cifar10_iterator(batch_size, (3, 32, 32), num_parts=kv.num_workers, part_index=kv.rank) elif dataset == 'imagenet': shape_dim = 299 if model_name == 'inceptionv3' else 224 if not opt.data_dir: raise ValueError('Dir containing raw images in train/val is required for imagenet.' 'Please specify "--data-dir"') train_data, val_data = get_imagenet_iterator(opt.data_dir, batch_size, opt.num_workers, shape_dim, opt.dtype) elif dataset == 'caltech101': train_data, val_data = get_caltech101_iterator(batch_size, opt.num_workers, opt.dtype) elif dataset == 'dummy': shape_dim = 299 if model_name == 'inceptionv3' else 224 train_data, val_data = dummy_iterator(batch_size, (3, shape_dim, shape_dim)) return train_data, val_data def test(ctx, val_data): metric.reset() val_data.reset() for batch in val_data: data = gluon.utils.split_and_load(batch.data[0].astype(opt.dtype, copy=False), ctx_list=ctx, batch_axis=0) label = gluon.utils.split_and_load(batch.label[0].astype(opt.dtype, copy=False), ctx_list=ctx, batch_axis=0) outputs = [net(X) for X in data] metric.update(label, outputs) return metric.get() def update_learning_rate(lr, trainer, epoch, ratio, steps): """Set the learning rate to the initial value decayed by ratio every N epochs.""" new_lr = lr * (ratio ** int(np.sum(np.array(steps) < epoch))) trainer.set_learning_rate(new_lr) return trainer def save_checkpoint(epoch, top1, best_acc): if opt.save_frequency and (epoch + 1) % opt.save_frequency == 0: fname = os.path.join(opt.prefix, '%s_%d_acc_%.4f.params' % (opt.model, epoch, top1)) net.save_parameters(fname) logger.info('[Epoch %d] Saving checkpoint to %s with Accuracy: %.4f', epoch, fname, top1) if top1 > best_acc[0]: best_acc[0] = top1 fname = os.path.join(opt.prefix, '%s_best.params' % (opt.model)) net.save_parameters(fname) logger.info('[Epoch %d] Saving checkpoint to %s with Accuracy: %.4f', epoch, fname, top1) def train(opt, ctx): if isinstance(ctx, mx.Context): ctx = [ctx] train_data, val_data = get_data_iters(dataset, batch_size, opt) net.collect_params().reset_ctx(ctx) trainer = gluon.Trainer(net.collect_params(), 'sgd', optimizer_params={'learning_rate': opt.lr, 'wd': opt.wd, 'momentum': opt.momentum, 'multi_precision': True}, kvstore=kv) loss = gluon.loss.SoftmaxCrossEntropyLoss() total_time = 0 num_epochs = 0 best_acc = [0] for epoch in range(opt.start_epoch, opt.epochs): trainer = update_learning_rate(opt.lr, trainer, epoch, opt.lr_factor, lr_steps) tic = time.time() train_data.reset() metric.reset() btic = time.time() for i, batch in enumerate(train_data): data = gluon.utils.split_and_load(batch.data[0].astype(opt.dtype), ctx_list=ctx, batch_axis=0) label = gluon.utils.split_and_load(batch.label[0].astype(opt.dtype), ctx_list=ctx, batch_axis=0) outputs = [] Ls = [] with ag.record(): for x, y in zip(data, label): z = net(x) L = loss(z, y) # store the loss and do backward after we have done forward # on all GPUs for better speed on multiple GPUs. Ls.append(L) outputs.append(z) ag.backward(Ls) trainer.step(batch.data[0].shape[0]) metric.update(label, outputs) if opt.log_interval and not (i+1)%opt.log_interval: name, acc = metric.get() logger.info('Epoch[%d] Batch [%d]\\tSpeed: %f samples/sec\\t%s=%f, %s=%f'%( epoch, i, batch_size/(time.time()-btic), name[0], acc[0], name[1], acc[1])) btic = time.time() epoch_time = time.time()-tic # First epoch will usually be much slower than the subsequent epics, # so don't factor into the average if num_epochs > 0: total_time = total_time + epoch_time num_epochs = num_epochs + 1 name, acc = metric.get() logger.info('[Epoch %d] training: %s=%f, %s=%f'%(epoch, name[0], acc[0], name[1], acc[1])) logger.info('[Epoch %d] time cost: %f'%(epoch, epoch_time)) name, val_acc = test(ctx, val_data) logger.info('[Epoch %d] validation: %s=%f, %s=%f'%(epoch, name[0], val_acc[0], name[1], val_acc[1])) # save model if meet requirements save_checkpoint(epoch, val_acc[0], best_acc) if num_epochs > 1: print('Average epoch time: {}'.format(float(total_time)/(num_epochs - 1))) def main(): if opt.builtin_profiler > 0: profiler.set_config(profile_all=True, aggregate_stats=True) profiler.set_state('run') if opt.mode == 'symbolic': data = mx.sym.var('data') if opt.dtype == 'float16': data = mx.sym.Cast(data=data, dtype=np.float16) out = net(data) if opt.dtype == 'float16': out = mx.sym.Cast(data=out, dtype=np.float32) softmax = mx.sym.SoftmaxOutput(out, name='softmax') mod = mx.mod.Module(softmax, context=context) train_data, val_data = get_data_iters(dataset, batch_size, opt) mod.fit(train_data, eval_data=val_data, num_epoch=opt.epochs, kvstore=kv, batch_end_callback = mx.callback.Speedometer(batch_size, max(1, opt.log_interval)), epoch_end_callback = mx.callback.do_checkpoint('image-classifier-%s'% opt.model), optimizer = 'sgd', optimizer_params = {'learning_rate': opt.lr, 'wd': opt.wd, 'momentum': opt.momentum, 'multi_precision': True}, initializer = mx.init.Xavier(magnitude=2)) mod.save_parameters('image-classifier-%s-%d-final.params'%(opt.model, opt.epochs)) else: if opt.mode == 'hybrid': net.hybridize() train(opt, context) if opt.builtin_profiler > 0: profiler.set_state('stop') print(profiler.dumps()) if __name__ == '__main__': if opt.profile: import hotshot, hotshot.stats prof = hotshot.Profile('image-classifier-%s-%s.prof'%(opt.model, opt.mode)) prof.runcall(main) prof.close() stats = hotshot.stats.load('image-classifier-%s-%s.prof'%(opt.model, opt.mode)) stats.strip_dirs() stats.sort_stats('cumtime', 'calls') stats.print_stats() else: main()
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # <http://www.apache.org/licenses/LICENSE-2.0> # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # pylint: skip-file """ data iterator for mnist """ import os import random import tarfile import logging import tarfile logging.basicConfig(level=logging.INFO) import mxnet as mx from mxnet.test_utils import get_cifar10 from mxnet.gluon.data.vision import ImageFolderDataset from mxnet.gluon.data import DataLoader from mxnet.contrib.io import DataLoaderIter def get_cifar10_iterator(batch_size, data_shape, resize=-1, num_parts=1, part_index=0): get_cifar10() train = mx.io.ImageRecordIter( path_imgrec = "data/cifar/train.rec", # mean_img = "data/cifar/mean.bin", resize = resize, data_shape = data_shape, batch_size = batch_size, rand_crop = True, rand_mirror = True, num_parts=num_parts, part_index=part_index) val = mx.io.ImageRecordIter( path_imgrec = "data/cifar/test.rec", # mean_img = "data/cifar/mean.bin", resize = resize, rand_crop = False, rand_mirror = False, data_shape = data_shape, batch_size = batch_size, num_parts=num_parts, part_index=part_index) return train, val def get_imagenet_transforms(data_shape=224, dtype='float32'): def train_transform(image, label): image, _ = mx.image.random_size_crop(image, (data_shape, data_shape), 0.08, (3/4., 4/3.)) image = mx.nd.image.random_flip_left_right(image) image = mx.nd.image.to_tensor(image) image = mx.nd.image.normalize(image, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)) return mx.nd.cast(image, dtype), label def val_transform(image, label): image = mx.image.resize_short(image, data_shape + 32) image, _ = mx.image.center_crop(image, (data_shape, data_shape)) image = mx.nd.image.to_tensor(image) image = mx.nd.image.normalize(image, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)) return mx.nd.cast(image, dtype), label return train_transform, val_transform def get_imagenet_iterator(root, batch_size, num_workers, data_shape=224, dtype='float32'): """Dataset loader with preprocessing.""" train_dir = os.path.join(root, 'train') train_transform, val_transform = get_imagenet_transforms(data_shape, dtype) logging.info("Loading image folder %s, this may take a bit long...", train_dir) train_dataset = ImageFolderDataset(train_dir, transform=train_transform) train_data = DataLoader(train_dataset, batch_size, shuffle=True, last_batch='discard', num_workers=num_workers) val_dir = os.path.join(root, 'val') if not os.path.isdir(os.path.expanduser(os.path.join(root, 'val', 'n01440764'))): user_warning = 'Make sure validation images are stored in one subdir per category, a helper script is available at <https://git.io/vNQv1>' raise ValueError(user_warning) logging.info("Loading image folder %s, this may take a bit long...", val_dir) val_dataset = ImageFolderDataset(val_dir, transform=val_transform) val_data = DataLoader(val_dataset, batch_size, last_batch='keep', num_workers=num_workers) return DataLoaderIter(train_data, dtype), DataLoaderIter(val_data, dtype) def get_caltech101_data(): url = "<https://s3.us-east-2.amazonaws.com/mxnet-public/101_ObjectCategories.tar.gz>" dataset_name = "101_ObjectCategories" data_folder = "data" if not os.path.isdir(data_folder): os.makedirs(data_folder) tar_path = mx.gluon.utils.download(url, path=data_folder) if (not os.path.isdir(os.path.join(data_folder, "101_ObjectCategories")) or not os.path.isdir(os.path.join(data_folder, "101_ObjectCategories_test"))): tar = tarfile.open(tar_path, "r:gz") tar.extractall(data_folder) tar.close() print('Data extracted') training_path = os.path.join(data_folder, dataset_name) testing_path = os.path.join(data_folder, "{}_test".format(dataset_name)) return training_path, testing_path def get_caltech101_iterator(batch_size, num_workers, dtype): def transform(image, label): # resize the shorter edge to 224, the longer edge will be greater or equal to 224 resized = mx.image.resize_short(image, 224) # center and crop an area of size (224,224) cropped, crop_info = mx.image.center_crop(resized, (224, 224)) # transpose the channels to be (3,224,224) transposed = mx.nd.transpose(cropped, (2, 0, 1)) return transposed, label training_path, testing_path = get_caltech101_data() dataset_train = ImageFolderDataset(root=training_path, transform=transform) dataset_test = ImageFolderDataset(root=testing_path, transform=transform) train_data = DataLoader(dataset_train, batch_size, shuffle=True, num_workers=num_workers) test_data = DataLoader(dataset_test, batch_size, shuffle=False, num_workers=num_workers) return DataLoaderIter(train_data), DataLoaderIter(test_data) class DummyIter(mx.io.DataIter): def __init__(self, batch_size, data_shape, batches = 100): super(DummyIter, self).__init__(batch_size) self.data_shape = (batch_size,) + data_shape self.label_shape = (batch_size,) self.provide_data = [('data', self.data_shape)] self.provide_label = [('softmax_label', self.label_shape)] self.batch = mx.io.DataBatch(data=[mx.nd.zeros(self.data_shape)], label=[mx.nd.zeros(self.label_shape)]) self._batches = 0 self.batches = batches def next(self): if self._batches < self.batches: self._batches += 1 return self.batch else: self._batches = 0 raise StopIteration def dummy_iterator(batch_size, data_shape): return DummyIter(batch_size, data_shape), DummyIter(batch_size, data_shape) class ImagePairIter(mx.io.DataIter): def __init__(self, path, data_shape, label_shape, batch_size=64, flag=0, input_aug=None, target_aug=None): super(ImagePairIter, self).__init__(batch_size) self.data_shape = (batch_size,) + data_shape self.label_shape = (batch_size,) + label_shape self.input_aug = input_aug self.target_aug = target_aug self.provide_data = [('data', self.data_shape)] self.provide_label = [('label', self.label_shape)] is_image_file = lambda fn: any(fn.endswith(ext) for ext in [".png", ".jpg", ".jpeg"]) self.filenames = [os.path.join(path, x) for x in os.listdir(path) if is_image_file(x)] self.count = 0 self.flag = flag random.shuffle(self.filenames) def next(self): from PIL import Image if self.count + self.batch_size <= len(self.filenames): data = [] label = [] for i in range(self.batch_size): fn = self.filenames[self.count] self.count += 1 image = Image.open(fn).convert('YCbCr').split()[0] if image.size[0] > image.size[1]: image = image.transpose(Image.TRANSPOSE) image = mx.nd.expand_dims(mx.nd.array(image), axis=2) target = image.copy() for aug in self.input_aug: image = aug(image) for aug in self.target_aug: target = aug(target) data.append(image) label.append(target) data = mx.nd.concat(*[mx.nd.expand_dims(d, axis=0) for d in data], dim=0) label = mx.nd.concat(*[mx.nd.expand_dims(d, axis=0) for d in label], dim=0) data = [mx.nd.transpose(data, axes=(0, 3, 1, 2)).astype('float32')/255] label = [mx.nd.transpose(label, axes=(0, 3, 1, 2)).astype('float32')/255] return mx.io.DataBatch(data=data, label=label) else: raise StopIteration def reset(self): self.count = 0 random.shuffle(self.filenames)
모델 컨테이너 이미지 만들기
모델 학습용 컨테이너 이미지를 만들기 위해서 Dockerfile을 생성하겠습니다.
다음은 MXNet 1.5를 기반 이미지로 해서, 모델 파일을 추가하는 Dockerfile 입니다.
Dockerfile
FROM mxnet/python:1.5.0_cpu_py3 RUN mkdir -p /app COPY image_classification.py /app/ COPY data.py /app/ WORKDIR /app CMD ["python3", "image_classification.py", "--dataset", "cifar10", "--model", "vgg11", "--epochs", "1", "--kvstore", "dist_sync"]
다음 명령어를 실행하면 kangwoo/mxnet:cpu 라는 이름의 컨테이너 이미지를 빌드 할 수 있습니다.
docker build -t kangwoo/mxnet:cpu .
빌드한 컨테이너 이미지를 컨테이너 레지스트리로 업로드 하겠습니다.
docker push kangwoo/mxnet:cpu
MXJob 생성하기
mxnet-operator를 사용해서 MXNet 모델 학습을 하라면 MXJob을 정의해야합니다.
- jobMode를 MXTrain로 설정합니다. jobMode: MXTrain
- mxReplicaSpecs를 설정합니다. “mxReplicaSpecs”은 MXNet의 분산 학습시 사용하는 프로세스들을 정의하는데 사용합니다. Scheduler와 Server를 1개로 설정하고, Worker도 1개로 설정합니다. Kubeflow 클러스터에 istio가 설치되어 있기 때문에, 자동으로 istio-proxy가 포드에 주입됩니다. 이것을 방지하기 위해서 어노테이션에 sidecar.istio.io/inject: “false” 을 추가해 주었습니다. mxReplicaSpecs: Scheduler: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: “false” spec: containers: – name: mxnet image: kangwoo/mxnet:cpu Server: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: “false” spec: containers: – name: mxnet image: kangwoo/mxnet:cpu Worker: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: “false” spec: containers: – name: mxnet image: kangwoo/mxnet:cpu
다음은 MXJob 을 생성한 위한 메니페스트입니다.
mxnet-job.yaml
apiVersion: "kubeflow.org/v1beta1" kind: "MXJob" metadata: name: "mxnet-job" spec: jobMode: MXTrain mxReplicaSpecs: Scheduler: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: mxnet image: kangwoo/mxnet:cpu Server: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: mxnet image: kangwoo/mxnet:cpu Worker: replicas: 1 restartPolicy: Never template: metadata: annotations: sidecar.istio.io/inject: "false" spec: containers: - name: mxnet image: kangwoo/mxnet:cpu
다음 명령어를 실행하면 admin 네임스페이스에 mxnet-job 이라는 이름의 MXJob을 생성할 수 있습니다.
kubectl -n admin apply -f mxnet-job.yaml
MXNet 학습 작업 확인하기
생성한 MXJob은 다음 명령어를 실행해서 확인 해 볼 수 있습니다.
kubectl -n admin get mxjob
생성된 MXJob이 있다면, 다음과 같은 응답 결과를 얻을 수 있습니다.
NAME AGE mxnet-job 5s
MXJob이 생성되면, mxnet-operator 에 의해서 포드들이 생성됩니다. MXjob 매니페스트에 정의한 갯수대로 scheduler, server, worker 포드가 생성되게 됩니다.
생성된 포드들은 다음 명령어를 실행해서 확인 해 볼 수 있습니다.
kubectl -n admin get pod -l mxnet_job_name=mxnet-job
생성된 포드들이 남아 있다면, 다음과 같은 응답 결과를 얻을 수 있습니다.
NAME READY STATUS RESTARTS AGE mxnet-job-scheduler-0 1/1 Running 0 9s mxnet-job-server-0 1/1 Running 0 9s mxnet-job-worker-0 1/1 Running 0 9
MXJob은 작업이 끝난 후, 관련 포드들을 삭제해버립니다. 그래서 작업이 완료되면 포드가 조회되지 않을 수 있습니다. 작업이 완료되어도 포드들을 남겨 두고 싶다면, MXJob 매니페스트의 spec 부분에 “cleanPodPolicy: None” 를 추가하시면 됩니다.
MXJob spec의 CleanPodPolicy는 작업이 종료 될 때 포드 삭제를 제어할 때 사용합니다. 다음 값들 중 하나를 사용할 수 있습니다.
- Running : 작업이 완료되었을 때, 실행이 끝난(Completed) 포드들은 삭제하지 않고, 실행중인(Running) 포드들만 삭제합니다.
- All : 작업이 완료되었을 때, 실행이 끝난 포드들을 즉시 삭제합니다.
- None : 작업이 완료되어도 포드들을 삭제하지 않습니다.
다음은 cleanPodPolicy를 추가한 메니페스트 예제입니다.
apiVersion: "kubeflow.org/v1beta1" kind: "MXJob" metadata: name: "mxnet-job" spec: cleanPodPolicy: None ...
MXJob의 작업 상태를 알고 싶으면 describe 명령어를 사용할 수 있습니다.
다음 명령어를 실행하면 admin 네임스페이스에 mxnet-job 이라는 이름의 MXJob의 상태를 조회할 수 있습니다.
kubectl -n admin describe mxjob mxnet-job
다음은 예제 작업에 대한 샘플 출력입니다.
Name: mxnet-job Namespace: admin Labels: <none> Annotations: ... API Version: kubeflow.org/v1beta1 Kind: MXJob Metadata: ... Spec: ... Status: Completion Time: 2020-03-08T00:44:23Z Conditions: Last Transition Time: 2020-03-07T23:45:12Z Last Update Time: 2020-03-07T23:45:12Z Message: MXJob mxnet-job is created. Reason: MXJobCreated Status: True Type: Created Last Transition Time: 2020-03-07T23:45:12Z Last Update Time: 2020-03-07T23:45:14Z Message: MXJob mxnet-job is running. Reason: MXJobRunning Status: False Type: Running Last Transition Time: 2020-03-07T23:45:12Z Last Update Time: 2020-03-08T00:44:23Z Message: MXJob mxnet-job is successfully completed. Reason: MXJobSucceeded Status: True Type: Succeeded Mx Replica Statuses: Scheduler: Server: Worker: Start Time: 2020-03-07T23:45:14Z Events: <none>
MXNet 학습 작업 삭제하기
작업이 완료되어도 MXJob은 삭제되지 않습니다.
다음 명령어를 실행하면 admin 네임스페이스에 mxnet-job 이라는 이름의 MXJob을 삭제할 수 있습니다.
kubectl -n admin delete mxjob mxnet-job