본문 바로가기
IT/python

[python - kubernetes] pytorchjob을 활용한 분산 데이터 병렬 처리 기초

by 통섭이 2023. 2. 3.

서두

  • 이 글은 pytorch DDP(DistributedDataParallel) 로 기본 병렬처리를 짜고, 이를 k8s 위에서 pytorchjob 으로 돌리는 예시로 변환하는 방식을 설명한다. 또 아주 기초적으로 수박 겉핥기로 설명한다. 필자도 아주 깊게는 이제 공부하는 중이다...;;
  • pytorch 의 병렬처리에 대한 깊은 이해가 필요하다면 공식문서를 참조하시길 바란다.
  • pytorch 코드를 pytorchjob으로 돌리기 위해서 필요한 작업은 간단하다
    • 기존에 직접 정해주던 rank, world_size 등을 init_process_group 에서 삭제
    • model 을 몇번째 gpu 에 할당할지 정하는 코드를 삭제
  • 위 두가지만 해주면 끝이다... pytorchjob 에서 master - worker 를 띄울때 이를 자동으로 할당하기 때문에 신경 쓸 필요가 없어진다

Pytorch 분산 처리 코드 짜기

  • (기본만 할때는...) 분산 처리 코드 짜는 것을 어렵지 않게 생각하면 좋겠다. pytorch를 사용하여 분산 처리 코드를 짤때는 world_size, rank 만 알면 반은 성공이라 생각한다. 하나의 서버가 아닌 경우는 global_rank, local_rank 로 나뉜다. 쉽게 설명하자면
    • world_size: 사용할 전체 gpu 개수
    • rank: 사용할 gpu 중 몇 번째 인지
      • 그리고 global 과 local 차이는 만약 2개 노드에 각각 2개씩 총 4개 gpu 가 붙어있으면 local_rank 는 [0, 1], [0, 1] 이 되지만, global_rank 는 [0, 1, 2, 3] 이 된다. (process id지만.. 일단 쉽게 생각하고 익숙해지면 어렵게 들어가시길 추천한다)
  • 새로운 개념은 끝이다! 이제는 model 을 각 gpu 로 분배해주고, data를 나눠서 학습하고, 모델 sync를 맞추는 작업을 할 수 있도록 세팅하면 된다.
    • mp.spawn : 학습을 여러 프로세스에서 돌릴 수 있도록 torch.multiprocessing 을 통해 여러개의 프로세스를 생성하자. 그리고 학습을 시작하자.
    • init_process_group 가 가장 중요한 세팅이다!
      • backend: gloo, nccl, mpi 중 선택가능. (gpu 학습을 할거니 nccl 을 쓴다고 생각하자. gloo 는 cpu 분산학습, mpi 는 고성능 컴퓨팅에 사용된다고 한다)
      • rank: 분산 처리는 다른 프로세서로 돌아가기 때문에 지금 어떤 gpu 인지 (이말인즉 이 부분이 돌아가는 시점은 mp.spawn을 통해 여러 프로세스가 뜬 시점이다.)
      • world_size: 학습 할 전체 프로세스가 몇개인지
    • DistributedSampler : 데이터를 여러 gpu 에 나누는 역할
      • 기존의 DataLoader 에서 sampler 인자에 DistributedSampler 로 감싼 데이터셋을 넘겨주면 pytorch 가 알아서 데이터를 분배한다. (참 똑똑하다.. 쉽다..)
  • 이렇게 하면 끝이다!! 정말 어려운 개념이라 깊게 들어가면 한도끝도 없지만 코드를 돌리기 위해서는 이정도면 충분하다. 이제 아래 코드의 주석을 통해 부분 부분을 설명해보고자 한다. (실제로 돌려보고, 각 gpu 마다 데이터가 n 등분 되어 학습이 진행되는지 확인해보자!)
  • 아래의 예제는 당연히 n 개의 gpu 가 붙어있는 서버에서 코드를 돌린다고 가정한다. (없으면 cpu로 돌리고 이런 기본적인 코드도 안넣었다...;;)
import os  
import torch  
import torch.multiprocessing as mp  
from torch import nn  
from torch.distributed import init_process_group, destroy_process_group  
from torch.nn.parallel import DistributedDataParallel as DDP  
from torch.utils.data import DataLoader, Dataset  
from torch.utils.data.distributed import DistributedSampler  
from torchvision import datasets  
from torchvision.transforms import ToTensor  

MAX_EPOCHS = 10  
SAVE_EVERY = 1  


def ddp_setup(rank, world_size):  
    os.environ["MASTER_ADDR"] = "localhost"  # 1 node, N gpu 인 경우에는 localhost를 사용해도 됨  
    os.environ["MASTER_PORT"] = "12345"  # 임의 숫자 할당해도 됨  
    init_process_group(backend="nccl", rank=rank, world_size=world_size)  # backend 는 Gloo, NCCL, MPI 선택 가능  


class Trainer:  
    def __init__(  
        self,  
        model: torch.nn.Module,  
        train_data: DataLoader,  
        optimizer: torch.optim.Optimizer,  
        gpu_id: int,  
    ) -> None:  
        self.gpu_id = gpu_id  
        self.train_data = train_data  # 이 train_data는 한 gpu 에서 할당해야 하는 데이터 수. 즉 total_dataset / n_gpus 한 숫자  
        self.optimizer = optimizer  
        self.model = DDP(model.to(gpu_id), device_ids=[self.gpu_id])  # master - worker 에 모델 복사  

    def _run_batch(self, source, targets):  
        self.optimizer.zero_grad()  
        output = self.model(source)  
        loss = torch.nn.CrossEntropyLoss()(output, targets)  
        loss.backward()  
        self.optimizer.step()  

    def _run_epoch(self):  
        b_sz = len(next(iter(self.train_data))[0])  
        print(f"b_sz : {b_sz}, gpu_id : {self.gpu_id}, len(train_data) : {len(self.train_data)}")  

        for source, targets in self.train_data:  
            source = source.to(self.gpu_id)  
            targets = targets.to(self.gpu_id)  
            self._run_batch(source, targets)  

    def _save_checkpoint(self, epoch):  
        ckp = self.model.module.state_dict()  
        torch.save(ckp, "ckpt.pt")  
        print(f"Epoch {epoch} | Training ckpt saved at ckpt.pt")  

    def train(self):  
        for epoch in range(MAX_EPOCHS):  
            self._run_epoch()  
            if self.gpu_id == 0 and epoch % SAVE_EVERY == 0:  
                self._save_checkpoint(epoch)  


class NeuralNetwork(nn.Module):  
    def __init__(self):  
        super().__init__()  
        self.flatten = nn.Flatten()  
        self.linear_relu_stack = nn.Sequential(  
            nn.Linear(28 * 28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10)  
        )  

    def forward(self, x):  
        x = self.flatten(x)  
        logits = self.linear_relu_stack(x)  
        return logits  


def load_dataset_model_and_optimizer():  
    train_set = datasets.FashionMNIST(  
        root="data",  
        train=True,  
        download=True,  
        transform=ToTensor(),  
    )  

    model = NeuralNetwork()  
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)  
    return train_set, model, optimizer  


def prepare_dataloader(dataset: Dataset, batch_size: int):  
    return DataLoader(  
        dataset, batch_size=batch_size, pin_memory=True, shuffle=False, sampler=DistributedSampler(dataset)  
    )  


def main(rank: int, world_size: int):  # rank 는 자동으로 mp.spawn에서 할딩  
    ddp_setup(rank, world_size)  # 분산 처리를 하기 위해 필요한 master-worker 간의 네트워크 설정  
    dataset, model, optimizer = load_dataset_model_and_optimizer()  
    print(f" len(dataset) : {len(dataset)}")  # 데이터가 나눠지기 전 전체 데이터 수를 확인하기 위한 프린팅  
    train_data = prepare_dataloader(  
        dataset, batch_size=32  
    )  # 분산 처리를 위해 데이터 나누기. DistributedSampler 사용. batch_size는 한 gpu가 처리하는 숫자. (이 숫자를 / n_gpu 해서 최종 batch_size 되는 것이 아님)  
    trainer = Trainer(model, train_data, optimizer, rank)  # DDP를 사용해 모델을 master, worker 에 복사  
    trainer.train()  # 실제 학습 시작  
    destroy_process_group()  


if __name__ == "__main__":  
    world_size = torch.cuda.device_count()  # main 파일이 도는 환경의 gpu 수 확인  
    mp.spawn(  
        main,  
        args=(world_size,),  
        nprocs=world_size,  
    )

Pytorchjob 분산 처리 코드로 변환

  • 위의 코드가 이해가 되었다면 이제는 이를 k8s 클러스터 위에서 돌릴 수 있도록 변환해보자
  • 변환은 크게 init_process_group 부분 삭제, gpu_id 할당부분 삭제만 하면 된다.
  • 그러면 위의 두가지를 실행한 코드를 만들고 이를 도커라이징 해보자.
# main.py
import os  
import torch  
import torch.multiprocessing as mp  
from torch import nn  
from torch.distributed import init_process_group, destroy_process_group  
from torch.nn.parallel import DistributedDataParallel as DDP  
from torch.utils.data import DataLoader, Dataset  
from torch.utils.data.distributed import DistributedSampler  
from torchvision import datasets  
from torchvision.transforms import ToTensor  

MAX_EPOCHS = 10  
SAVE_EVERY = 1  
device = torch.device("cuda")  


class Trainer:  
    def __init__(  
        self,  
        model: torch.nn.Module,  
        train_data: DataLoader,  
        optimizer: torch.optim.Optimizer,  
    ) -> None:  
        self.model = DDP(model.to(device))  
        self.train_data = train_data  
        self.optimizer = optimizer  

    def _run_epoch(self, epoch):  
        b_sz = len(next(iter(self.train_data))[0])  
        print(f"b_sz : {b_sz} / epoch : {epoch} / len data : {len(self.train_data)}")  
        for source, targets in self.train_data:  
            source = source.to(device)  
            targets = targets.to(device)  
            self.optimizer.zero_grad()  
            output = self.model(source)  
            loss = torch.nn.CrossEntropyLoss()(output, targets)  
            loss.backward()  
            self.optimizer.step()  

    def _save_checkpoint(self, epoch):  
        ckp = self.model.module.state_dict()  
        torch.save(ckp, "ckpt.pt")  
        print(f"Epoch {epoch} | Training ckpt saved at ckpt.pt")  

    def train(self):  
        for epoch in range(MAX_EPOCHS):  
            self._run_epoch(epoch)  
            if epoch % SAVE_EVERY == 0:  
                self._save_checkpoint(epoch)  


class NeuralNetwork(nn.Module):  
    def __init__(self):  
        super().__init__()  
        self.flatten = nn.Flatten()  
        self.linear_relu_stack = nn.Sequential(  
            nn.Linear(28 * 28, 512), nn.ReLU(), nn.Linear(512, 512), nn.ReLU(), nn.Linear(512, 10)  
        )  

    def forward(self, x):  
        x = self.flatten(x)  
        logits = self.linear_relu_stack(x)  
        return logits  


def load_train_dataset_model_and_opt():  
    train_set = datasets.FashionMNIST(  
        root="data",  
        train=True,  
        download=True,  
        transform=ToTensor(),  
    )  

    model = NeuralNetwork()  
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)  
    return train_set, model, optimizer  


def prepare_dataloader(dataset: Dataset, batch_size: int):  
    return DataLoader(  
        dataset, batch_size=batch_size, pin_memory=True, shuffle=False, sampler=DistributedSampler(dataset)  
    )  


def main(rank: int): # rank 는 mp.spawn에서 자동으로 할당하기 때문에 사용하지 않아도 받아야 한다. 그렇지 않으면 에러가 난다.  
    init_process_group(backend="nccl") # 이부분이 가장 큰 변화이다! 기존의 rank 와 world_size를 직접 넘겨주던 부분이 더이상 불필요해졌다!  
    dataset, model, optimizer = load_train_dataset_model_and_opt()  
    print(f"WORLD_SIZE : {os.environ['WORLD_SIZE']} / len(dataset) : {len(dataset)}") # pytorchjob 이 분산처리에 필요한 인자들을 os.environ 에 자동으로 할당한다  
    train_data = prepare_dataloader(dataset, batch_size=32)  
    trainer = Trainer(model, train_data, optimizer) # 기존과 달리 모델을 특정 gpu_id에 올려줄 필요가 없다.  
    trainer.train()  
    destroy_process_group()  


if __name__ == "__main__":  
    torch.cuda.empty_cache()  
    mp.spawn(  
        main,
    )
  • 이제는 이 파일을 도커라이징 해보자
#Dockerfile
FROM pytorch/pytorch:1.2-cuda10.0-cudnn7-devel  

RUN apt-get update -y  
RUN apt-get install -y python-pip python-dev build-essential  

RUN mkdir /app  
COPY . /app  
WORKDIR /app  

ENV PYTHONPATH=/app  
CMD ["python", "main.py"]
  • 도커라이징이 되었다면 본인의 registry 에 올리고, 이를 실행할 수 있는 pytorchjob yaml 을 만들어보자
apiVersion: kubeflow.org/v1  
kind: PyTorchJob  
metadata:  
  name: dist-fashion-mnist  
  namespace: my-ns  
  annotations:  
    sidecar.istio.io/inject: "false"  
spec:  
  runPolicy:  
    cleanPodPolicy: None  
  pytorchReplicaSpecs:  
    Master:  
      replicas: 1  
      restartPolicy: OnFailure  
      template:  
        spec:  
          containers:  
            - name: pytorch # 학습 container의 이름은 pytorch이어야 한다
              image: my-registry/mnist-dist-test  
              command: ["bash", "-c"]  
              args:   
                ["python main.py"]  
              resources:  
                limits:  
                  memory: "32Gi"  
                  cpu: 4  
                  nvidia.com/gpu: 1  
                requests:  
                  memory: "32Gi"  
                  cpu: 4  
                  nvidia.com/gpu: 1  
    Worker:  
      replicas: 3
      restartPolicy: OnFailure  
      template:  
        spec:  
          containers:  
            - name: pytorch  # 학습 container의 이름은 pytorch이어야 한다
              image: rmy-registry/mnist-dist-test  
              command: ["bash", "-c"]  
              args:  
                ["python main.py"]  
              resources:  
                limits:  
                  memory: "32Gi"  
                  cpu: 4  
                  nvidia.com/gpu: 1  
                requests:  
                  memory: "32Gi"  
                  cpu: 4  
                  nvidia.com/gpu: 1
  • 이제 이 yaml 을 kubectl 명령어를 통해 실행하면 kubectl 이 바라보고 있는 클러스터에 총 4개 (master 1, worker 3) 의 학습이 실행된다.
  • 그리고 FashionMNIST 데이터의 경우 60000개 인데, 각각 12500개씩 나뉘어져서 데이터가 학습되게 된다.
  • 이때 worker 의 replicas를 늘리면 데이터가 자동으로 master + worker 개수만큼 등분되어 학습이 진행된다.
  • 아래 그림은 os.environ 을 프린트한 일부인데 기존에 설정하던 MASTER_ADDR, MASTER_ADDR, MASTER_PORT, RANK 등이 자동으로 생성되고, 서로 통신하게 됨을 알 수 있다. 쿠버네티스에서는 MASTER_ADDR 를 보면 master-0, worker-0, worker-1 등을 통해 서로 통신 할 수 있음을 알 수 있다.
  • 이상으로 아주아주아주아주 기초적인 분산 병렬 처리에 대해 알아보았다. 코드 리팩토링도 안하고, 그냥 막 정리한 글이라 두서가 없는데... 양해를 부탁드리면서 글을 마무리한다.

댓글