서두
- 이 글은 pytorch DDP(DistributedDataParallel) 로 기본 병렬처리를 짜고, 이를 k8s 위에서 pytorchjob 으로 돌리는 예시로 변환하는 방식을 설명한다. 또 아주 기초적으로 수박 겉핥기로 설명한다. 필자도 아주 깊게는 이제 공부하는 중이다...;;
- pytorch 의 병렬처리에 대한 깊은 이해가 필요하다면 공식문서를 참조하시길 바란다.
- pytorch 코드를 pytorchjob으로 돌리기 위해서 필요한 작업은 간단하다
- 기존에 직접 정해주던 rank, world_size 등을 init_process_group 에서 삭제
- model 을 몇번째 gpu 에 할당할지 정하는 코드를 삭제
- 위 두가지만 해주면 끝이다... pytorchjob 에서 master - worker 를 띄울때 이를 자동으로 할당하기 때문에 신경 쓸 필요가 없어진다
Pytorch 분산 처리 코드 짜기
- (기본만 할때는...) 분산 처리 코드 짜는 것을 어렵지 않게 생각하면 좋겠다. pytorch를 사용하여 분산 처리 코드를 짤때는 world_size, rank 만 알면 반은 성공이라 생각한다. 하나의 서버가 아닌 경우는 global_rank, local_rank 로 나뉜다. 쉽게 설명하자면
- world_size: 사용할 전체 gpu 개수
- rank: 사용할 gpu 중 몇 번째 인지
- 그리고 global 과 local 차이는 만약 2개 노드에 각각 2개씩 총 4개 gpu 가 붙어있으면 local_rank 는 [0, 1], [0, 1] 이 되지만, global_rank 는 [0, 1, 2, 3] 이 된다. (process id지만.. 일단 쉽게 생각하고 익숙해지면 어렵게 들어가시길 추천한다)
- 새로운 개념은 끝이다! 이제는 model 을 각 gpu 로 분배해주고, data를 나눠서 학습하고, 모델 sync를 맞추는 작업을 할 수 있도록 세팅하면 된다.
- mp.spawn : 학습을 여러 프로세스에서 돌릴 수 있도록 torch.multiprocessing 을 통해 여러개의 프로세스를 생성하자. 그리고 학습을 시작하자.
- init_process_group 가 가장 중요한 세팅이다!
- backend: gloo, nccl, mpi 중 선택가능. (gpu 학습을 할거니 nccl 을 쓴다고 생각하자. gloo 는 cpu 분산학습, mpi 는 고성능 컴퓨팅에 사용된다고 한다)
- rank: 분산 처리는 다른 프로세서로 돌아가기 때문에 지금 어떤 gpu 인지 (이말인즉 이 부분이 돌아가는 시점은 mp.spawn을 통해 여러 프로세스가 뜬 시점이다.)
- world_size: 학습 할 전체 프로세스가 몇개인지
- DistributedSampler : 데이터를 여러 gpu 에 나누는 역할
- 기존의 DataLoader 에서 sampler 인자에 DistributedSampler 로 감싼 데이터셋을 넘겨주면 pytorch 가 알아서 데이터를 분배한다. (참 똑똑하다.. 쉽다..)
- 이렇게 하면 끝이다!! 정말 어려운 개념이라 깊게 들어가면 한도끝도 없지만 코드를 돌리기 위해서는 이정도면 충분하다. 이제 아래 코드의 주석을 통해 부분 부분을 설명해보고자 한다. (실제로 돌려보고, 각 gpu 마다 데이터가 n 등분 되어 학습이 진행되는지 확인해보자!)
- 아래의 예제는 당연히 n 개의 gpu 가 붙어있는 서버에서 코드를 돌린다고 가정한다. (없으면 cpu로 돌리고 이런 기본적인 코드도 안넣었다...;;)
import os
import torch
import torch.multiprocessing as mp
from torch import nn
from torch.distributed import init_process_group, destroy_process_group
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
from torchvision import datasets
from torchvision.transforms import ToTensor
MAX_EPOCHS = 10
SAVE_EVERY = 1
def ddp_setup(rank, world_size):
os.environ["MASTER_ADDR"] = "localhost" # 1 node, N gpu 인 경우에는 localhost를 사용해도 됨
os.environ["MASTER_PORT"] = "12345" # 임의 숫자 할당해도 됨
init_process_group(backend="nccl", rank=rank, world_size=world_size) # backend 는 Gloo, NCCL, MPI 선택 가능
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
gpu_id: int,
) -> None:
self.gpu_id = gpu_id
self.train_data = train_data # 이 train_data는 한 gpu 에서 할당해야 하는 데이터 수. 즉 total_dataset / n_gpus 한 숫자
self.optimizer = optimizer
self.model = DDP(model.to(gpu_id), device_ids=[self.gpu_id]) # master - worker 에 모델 복사
def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = torch.nn.CrossEntropyLoss()(output, targets)
loss.backward()
self.optimizer.step()
def _run_epoch(self):
b_sz = len(next(iter(self.train_data))[0])
print(f"b_sz : {b_sz}, gpu_id : {self.gpu_id}, len(train_data) : {len(self.train_data)}")
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
self._run_batch(source, targets)
def _save_checkpoint(self, epoch):
ckp = self.model.module.state_dict()
torch.save(ckp, "ckpt.pt")
print(f"Epoch {epoch} | Training ckpt saved at ckpt.pt")
def train(self):
for epoch in range(MAX_EPOCHS):
self._run_epoch()
if self.gpu_id == 0 and epoch % SAVE_EVERY == 0:
self._save_checkpoint(epoch)
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
def load_dataset_model_and_optimizer():
train_set = datasets.FashionMNIST(
root="data",
train=True,
download=True,
transform=ToTensor(),
)
model = NeuralNetwork()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset, batch_size=batch_size, pin_memory=True, shuffle=False, sampler=DistributedSampler(dataset)
)
def main(rank: int, world_size: int): # rank 는 자동으로 mp.spawn에서 할딩
ddp_setup(rank, world_size) # 분산 처리를 하기 위해 필요한 master-worker 간의 네트워크 설정
dataset, model, optimizer = load_dataset_model_and_optimizer()
print(f" len(dataset) : {len(dataset)}") # 데이터가 나눠지기 전 전체 데이터 수를 확인하기 위한 프린팅
train_data = prepare_dataloader(
dataset, batch_size=32
) # 분산 처리를 위해 데이터 나누기. DistributedSampler 사용. batch_size는 한 gpu가 처리하는 숫자. (이 숫자를 / n_gpu 해서 최종 batch_size 되는 것이 아님)
trainer = Trainer(model, train_data, optimizer, rank) # DDP를 사용해 모델을 master, worker 에 복사
trainer.train() # 실제 학습 시작
destroy_process_group()
if __name__ == "__main__":
world_size = torch.cuda.device_count() # main 파일이 도는 환경의 gpu 수 확인
mp.spawn(
main,
args=(world_size,),
nprocs=world_size,
)
Pytorchjob 분산 처리 코드로 변환
- 위의 코드가 이해가 되었다면 이제는 이를 k8s 클러스터 위에서 돌릴 수 있도록 변환해보자
- 변환은 크게 init_process_group 부분 삭제, gpu_id 할당부분 삭제만 하면 된다.
- 그러면 위의 두가지를 실행한 코드를 만들고 이를 도커라이징 해보자.
# main.py
import os
import torch
import torch.multiprocessing as mp
from torch import nn
from torch.distributed import init_process_group, destroy_process_group
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
from torchvision import datasets
from torchvision.transforms import ToTensor
MAX_EPOCHS = 10
SAVE_EVERY = 1
device = torch.device("cuda")
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
) -> None:
self.model = DDP(model.to(device))
self.train_data = train_data
self.optimizer = optimizer
def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"b_sz : {b_sz} / epoch : {epoch} / len data : {len(self.train_data)}")
for source, targets in self.train_data:
source = source.to(device)
targets = targets.to(device)
self.optimizer.zero_grad()
output = self.model(source)
loss = torch.nn.CrossEntropyLoss()(output, targets)
loss.backward()
self.optimizer.step()
def _save_checkpoint(self, epoch):
ckp = self.model.module.state_dict()
torch.save(ckp, "ckpt.pt")
print(f"Epoch {epoch} | Training ckpt saved at ckpt.pt")
def train(self):
for epoch in range(MAX_EPOCHS):
self._run_epoch(epoch)
if epoch % SAVE_EVERY == 0:
self._save_checkpoint(epoch)
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
def load_train_dataset_model_and_opt():
train_set = datasets.FashionMNIST(
root="data",
train=True,
download=True,
transform=ToTensor(),
)
model = NeuralNetwork()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset, batch_size=batch_size, pin_memory=True, shuffle=False, sampler=DistributedSampler(dataset)
)
def main(rank: int): # rank 는 mp.spawn에서 자동으로 할당하기 때문에 사용하지 않아도 받아야 한다. 그렇지 않으면 에러가 난다.
init_process_group(backend="nccl") # 이부분이 가장 큰 변화이다! 기존의 rank 와 world_size를 직접 넘겨주던 부분이 더이상 불필요해졌다!
dataset, model, optimizer = load_train_dataset_model_and_opt()
print(f"WORLD_SIZE : {os.environ['WORLD_SIZE']} / len(dataset) : {len(dataset)}") # pytorchjob 이 분산처리에 필요한 인자들을 os.environ 에 자동으로 할당한다
train_data = prepare_dataloader(dataset, batch_size=32)
trainer = Trainer(model, train_data, optimizer) # 기존과 달리 모델을 특정 gpu_id에 올려줄 필요가 없다.
trainer.train()
destroy_process_group()
if __name__ == "__main__":
torch.cuda.empty_cache()
mp.spawn(
main,
)
#Dockerfile
FROM pytorch/pytorch:1.2-cuda10.0-cudnn7-devel
RUN apt-get update -y
RUN apt-get install -y python-pip python-dev build-essential
RUN mkdir /app
COPY . /app
WORKDIR /app
ENV PYTHONPATH=/app
CMD ["python", "main.py"]
- 도커라이징이 되었다면 본인의 registry 에 올리고, 이를 실행할 수 있는 pytorchjob yaml 을 만들어보자
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: dist-fashion-mnist
namespace: my-ns
annotations:
sidecar.istio.io/inject: "false"
spec:
runPolicy:
cleanPodPolicy: None
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch # 학습 container의 이름은 pytorch이어야 한다
image: my-registry/mnist-dist-test
command: ["bash", "-c"]
args:
["python main.py"]
resources:
limits:
memory: "32Gi"
cpu: 4
nvidia.com/gpu: 1
requests:
memory: "32Gi"
cpu: 4
nvidia.com/gpu: 1
Worker:
replicas: 3
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch # 학습 container의 이름은 pytorch이어야 한다
image: rmy-registry/mnist-dist-test
command: ["bash", "-c"]
args:
["python main.py"]
resources:
limits:
memory: "32Gi"
cpu: 4
nvidia.com/gpu: 1
requests:
memory: "32Gi"
cpu: 4
nvidia.com/gpu: 1
- 이제 이 yaml 을 kubectl 명령어를 통해 실행하면 kubectl 이 바라보고 있는 클러스터에 총 4개 (master 1, worker 3) 의 학습이 실행된다.
- 그리고 FashionMNIST 데이터의 경우 60000개 인데, 각각 12500개씩 나뉘어져서 데이터가 학습되게 된다.
- 이때 worker 의 replicas를 늘리면 데이터가 자동으로 master + worker 개수만큼 등분되어 학습이 진행된다.
- 아래 그림은 os.environ 을 프린트한 일부인데 기존에 설정하던
MASTER_ADDR
, MASTER_ADDR
, MASTER_PORT
, RANK
등이 자동으로 생성되고, 서로 통신하게 됨을 알 수 있다. 쿠버네티스에서는 MASTER_ADDR
를 보면 master-0
, worker-0,
worker-1
등을 통해 서로 통신 할 수 있음을 알 수 있다.
- 이상으로 아주아주아주아주 기초적인 분산 병렬 처리에 대해 알아보았다. 코드 리팩토링도 안하고, 그냥 막 정리한 글이라 두서가 없는데... 양해를 부탁드리면서 글을 마무리한다.
댓글