대부분의 상황에서 분류기를 학습할 때, 객체 클래스에 비해 매우 많은 배경 클래스가 존재하는 클래스 불균형(Class Imbalance) 문제를 만나게 된다. 큰 차이의 클래스 불균형은 학습시 교차 엔트로피 손실(Cross Entropy Loss)에 영향을 줘 분류기가 쉽게 다수 클래스를 선택하게 만든다. Focal Loss는 교차 엔트로피 손실 함수를 다수 클래스의 가중치를 줄이도록 재구성하여 훈련 중 소수 클래스에 더 집중하게 만든다.

 

$$ FL(p_{t})=-(1-p_{t})^{\gamma}\log(p_{t}) $$

 

 여기서 pt 는 신경망에서 softmax activation을 통과 후 예측된 확률값이고, 조정가능한 γ≥0 focusing 파라미터이다. Focal Loss는 기존 교차 엔트로피 손실에 Modulating factor (1-pt)^γ 를 추가하여 교차 엔트로피에 대한 가중치를 조정한다.

 

 신경망이 잘못 분류하여 pt가 작으면, modulating factor 1에 가까워지고 Focal Loss는 영향을 받지 않는다. 그러나 신경망이 잘 분류하여 pt1에 가까워지면 modulating factor0에 가까워지고 잘 분류한 클래스에 대한 손실의 가중치는 줄어들게 된다. 한편, Focusing parameter 는 다수 클래스의 가중치를 줄이는 비율을 부드럽게 조정한다. γ=0이면 FL는 교차 엔트로피 손실과 동일하고, γ가 증가하면 modulating factor에 대한 영향도 마찬가지로 증가하게 된다. 결론적으로 moduling factor는 쉬운 샘플에 대한 손실 기여도를 줄이게 된다.

 

pytorch 이용한 Focal Loss 구현

import torch
import torch.nn as nn
import torch.nn.functional as F

class FocalLoss(nn.Module):
    def __init__(self, gamma=0, alpha=None, size_average=True, device='cpu'):
        super(FocalLoss, self).__init__()
        """
        gamma(int) : focusing parameter.
        alpha(list) : alpha-balanced term.
        size_average(bool) : whether to apply reduction to the output.
        """
        self.gamma = gamma
        self.alpha = alpha
        self.size_average = size_average
        self.device = device

    def forward(self, input, target):
        # input : N * C (btach_size, num_class)
        # target : N (batch_size)

        CE = F.cross_entropy(input, target, reduction='none')  # -log(pt)
        pt = torch.exp(-CE)  # pt
        loss = (1 - pt) ** self.gamma * CE  # -(1-pt)^rlog(pt)

        if self.alpha is not None:
            alpha = torch.tensor(self.alpha, dtype=torch.float).to(self.device)
            # in case that a minority class is not selected when mini-batch sampling
            if len(self.alpha) != len(torch.unique(target)):
                temp = torch.zeros(len(self.alpha)).to(self.device)
                temp[torch.unique(target)] = alpha.index_select(0, torch.unique(target))
                alpha_t = temp.gather(0, target)
                loss = alpha_t * loss
            else:
                alpha_t = alpha.gather(0, target)
                loss = alpha_t * loss

        if self.size_average:
            loss = torch.mean(loss)

        return loss

gamma = 0이면 Focal Loss는 CrossEntropyLoss와 동일

 

gamma = 5일때 Focal Loss
alpha-balanced term을 적용한 Focal Loss

 

 

LSTM

import torch
from torch import nn

input = torch.randn(5, 3, 10) # (seq_len, batch, input_size)

 input 데이터는 3개, LSTM에 순차적으로 들어갈 시간은 5개, 입력 노드의 개수(설명변수)는 10개라고 하자. torch.randn로 rnn에 들어갈 input을 만들면 첫 번째 원소가 sequence length = 5, 두 번째 원소가 batch size = 3, 세 번째 원소가 input size = 10 인 차원으로 생성한다.

 PyTorch에서 다음과 같이 sequence 길이는 5, 출력 노드의 개수는 20개, LSTM층이 2개인 RNN 구조를 만들어보자.

 

 nn.LSTM에서 입력층의 크기(input_size)는 10 (주황색 화살표), 은닉층의 크기(hidden_size)는 20 (파란색 화살표), 레이어의 개수(num_layers)는 2개인 순환신경망이다. input 데이터를 rnn에 집어 넣으면 output과 (hn, cn)의 튜플을 반환한다.

rnn = nn.LSTM(input_size = 10, hidden_size = 20, num_layers = 2)
output, (hn, cn) = rnn(input)

 

output은 2층 layer의 시간 순서에 따른 출력이다. 첫 번째 원소는 sequence 길이인 5이고, 두 번째 원소는 batch 크기, 세 번째 원소는 20으로 은닉층의 크기(출력층의 크기)이다.

output.size()

> torch.Size([5, 3, 20])

 

 

 

다음과 같이 last sequence의 출력만 사용하려면 output에서 마지막 값을 가져온다. output[-1]의 shape를 확인해보면 batch가 3개, 은닉층(출력)의 크기가 20이 됨을 알 수 있다.

output[-1].size()

> torch.Size([3, 20])

 

 

 다음으로 (hn, cn)은 각각 다음 LSTMcell로 넘겨주는 hidden state, cell sate를 말하고, input으로 (hn, cn)이 주어지지 않으면 c0와 h0는 zeros로 초기화한다. hn과 cn의 shape를 살펴보자.

hn과 cn의 차원의 첫 번째 원소는 2층 레이어이므로 2, 두 번째원 소는 batch 3개, 은닉층의 크기이므로 20이다.

print(hn.size())
print(cn.size())

> torch.Size([2, 3, 20])

> torch.Size([2, 3, 20])

 

 

Bidirectional LSTM

nn.LSTM에서는 bidirectional = True로 설정하므로써 간단하게 위와 같은 biderectional LSTM을 구현할 수 있다.

import torch
from torch import nn

input = torch.randn(5, 3, 10) # (seq_len, batch, input_size)
rnn = nn.LSTM(input_size = 10, hidden_size = 20, num_layers = 2, bidirectional = True)
output, (hn, cn) = rnn(input)

 이 때 output은 왼쪽에서 오른쪽으로 한 개, 오른쪽에서 왼쪽으로 한 개가 concatenate 되어 출력이 된다. 따라서 output의 hidden_size(출력층의 크기)는 두 배인 2 * 20 = 40이 된다.

 

output.size()

> torch.Size([5, 3, 40])

 

 

 hn과 cn의 첫 번째 원소는 bidirection이므로 2개 * 2층 레이어이므로 2개 = 총 4개이다. 다음 LSTM cell로 넘겨주는 hn과 cn은 hidden_size가 concatenate 되지 않으므로 20인 것을 알 수 있다.

print(hn.size())
print(cn.size())

> torch.Size([4, 3, 20])

> torch.Size([4, 3, 20])

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
import tqdm
import copy
from sklearn.metrics import confusion_matrix, classification_report

class Trainer:
    def __init__(self, net, train_loader, test_loader, criterion, epochs=100, lr = 0.001, l2_norm = None, device=None):
        self.net = net
        self.train_loader = train_loader
        self.test_loader = test_loader
        self.criterion = criterion
        self.epochs = epochs
        self.lr = lr
        self.l2_norm = l2_norm

        if device is not None:
            self.device = device
        else:
            self.device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

        self.train_losses = []
        self.train_acc = []
        # self.val_losses = []
        self.val_acc = []

        self.best_model_wts = copy.deepcopy(self.net.state_dict())
        self.best_acc = 0.

    def train_net(self):
        optimizer = optim.Adam(self.net.parameters(), lr = self.lr)
        for epoch in range(self.epochs):
            running_loss = 0.
            self.net.train()
            n = 0
            n_acc = 0

            for i, (X_batch, y_batch) in enumerate(tqdm.tqdm(self.train_loader, total = len(self.train_loader))):
                X_batch = X_batch.to(self.device, dtype=torch.float)
                y_batch = y_batch.to(self.device, dtype=torch.int64)
                y_pred_batch = self.net(X_batch)

                # regularization
                if self.l2_norm is not None:
                    lambda2 = self.l2_norm
                    fc_params = torch.cat([x.view(-1) for x in self.net.out.parameters()])
                    l2_regularization = lambda2 * torch.norm(fc_params, p=2)
                else:
                    l2_regularization = 0.

                loss = self.criterion(y_pred_batch, y_batch) + l2_regularization
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                running_loss += loss.item()
                n += len(X_batch)
                _, y_pred_batch = y_pred_batch.max(1)
                n_acc += (y_batch == y_pred_batch).float().sum().item()
            self.train_losses.append(running_loss / i)
            self.train_acc.append(n_acc / n)

            # 검증 데이터의 예측 정확도
            self.val_acc.append(self.eval_net(self.test_loader, self.device))

            # epoch 결과 표시
            print('epoch: {}/{}, train_loss: {:.4f}, train_acc: {:.2f}%, test_acc: {:.2f}%'.format(epoch + 1, self.epochs,
                                                                                                 self.train_losses[-1],
                                                                                                 self.train_acc[-1] * 100,
                                                                                                 self.val_acc[-1] * 100))
        print('best acc : {:.2f}%'.format(self.best_acc * 100))

    def eval_net(self, data_loader, device):
        self.net.eval()
        ys = []
        y_preds = []
        for X_batch, y_batch in data_loader:
            X_batch = X_batch.to(device, dtype=torch.float)
            y_batch = y_batch.to(device, dtype=torch.int64)

            with torch.no_grad():
                _, y_pred_batch = self.net(X_batch).max(1)
            ys.append(y_batch)
            y_preds.append(y_pred_batch)

        ys = torch.cat(ys)
        y_preds = torch.cat(y_preds)

        acc = (ys == y_preds).float().sum() / len(ys)

        if acc.item() > self.best_acc:
            self.best_acc = acc
            self.best_model_wts = copy.deepcopy(self.net.state_dict())

        return acc.item()

    def evaluation(self, data_loader, device):
        model = self.get_best_model()
        ys = []
        y_preds = []
        for X_batch, y_batch in data_loader:
            X_batch = X_batch.to(device, dtype=torch.float)
            y_batch = y_batch.to(device, dtype=torch.int64)

            with torch.no_grad():
                _, y_pred_batch = self.net(X_batch).max(1)
            ys.append(y_batch)
            y_preds.append(y_pred_batch)

        ys = torch.cat(ys)
        y_preds = torch.cat(y_preds)

        acc = (ys == y_preds).float().sum() / len(ys)
		
        print("Confusion Matrix")
        print(confusion_matrix(ys.to('cpu'), y_preds.to('cpu')))
        print("Classification Report")
        print(classification_report(ys.to('cpu'), y_preds.to('cpu'), digits = 4))

        return acc.item()


    def get_best_model(self):
        self.net.load_state_dict(self.best_model_wts)
        return self.net

 

완전연결층 fully connected layer가 포함된 신경망을 준비한다.

import torch
from torch import nn, optim
from torch.nn import functional as F

class net(nn.Module):
    def __init__(self):
        super(net, self).__init__()
        self.conv1_1 = nn.Conv2d(in_channels = 3, out_channels = 32, kernel_size = 7, stride = 3)
        self.bn1_1 = nn.BatchNorm2d(32)
        self.conv_dropout1_1 = nn.Dropout2d(0.1)
        
        self.conv2_1 = nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 5, stride = 2)
        self.bn2_1 = nn.BatchNorm2d(64)
        self.pool1 = nn.MaxPool2d(2, 2)
        self.conv_dropout2_1 = nn.Dropout2d(0.1)
        
        self.conv3_1 = nn.Conv2d(in_channels = 64, out_channels = 128, kernel_size = 3, stride = 2)
        self.bn3_1 = nn.BatchNorm2d(128)
        self.conv_dropout3_1 = nn.Dropout2d(0.25)
        self.conv3_2 = nn.Conv2d(in_channels = 128, out_channels = 128, kernel_size = 3, stride = 1)
        self.bn3_2 = nn.BatchNorm2d(128)
        self.conv_dropout3_2 = nn.Dropout2d(0.25)
        
        self.conv4_1 = nn.Conv2d(in_channels = 128, out_channels = 256, kernel_size = 2, stride = 1)
        self.bn4_1 = nn.BatchNorm2d(256)
        self.conv_dropout4_1 = nn.Dropout2d(0.3)
        self.conv4_2 = nn.Conv2d(in_channels = 256, out_channels = 256, kernel_size = 2, stride = 1)
        self.bn4_2 = nn.BatchNorm2d(256)
        self.conv_dropout4_2 = nn.Dropout2d(0.3)
        self.conv4_3 = nn.Conv2d(in_channels = 256, out_channels = 512, kernel_size = 2, stride = 1)
        self.bn4_3 = nn.BatchNorm2d(512)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.conv_dropout4_3 = nn.Dropout2d(0.3)
        
                     
        self.fc1 = nn.Linear(512*7*7, 1000)
        self.fc_dropout1 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(1000, 1000)
        self.fc_dropout2 = nn.Dropout(0.5)
        self.fc3 = nn.Linear(1000, 256)
        self.fc_dropout3 = nn.Dropout(0.1)
        self.out = nn.Linear(256, 1)

    def forward(self, x):
        x = self.bn1_1(F.relu(self.conv1_1(x)))
        x = self.conv_dropout1_1(x)
        
        x = self.bn2_1(F.relu(self.conv2_1(x)))
        x = self.conv_dropout2_1(self.pool1(x))
        
        x = self.bn3_1(F.leaky_relu(self.conv3_1(x)))
        x = self.conv_dropout3_1(x)
        x = self.bn3_2(F.leaky_relu(self.conv3_2(x)))
        x = self.conv_dropout3_2(x)
        
        x = self.bn4_1(F.leaky_relu(self.conv4_1(x)))
        x = self.conv_dropout4_1(x)        
        x = self.bn4_2(F.leaky_relu(self.conv4_2(x)))
        x = self.conv_dropout4_2(x)
        x = self.bn4_3(F.leaky_relu(self.conv4_3(x)))
        x = self.pool2(x)
        x = self.conv_dropout4_3(x)
        
        x = x.view(-1, 512*7*7)
        
        x = self.fc_dropout1(F.relu(self.fc1(x)))
        x = self.fc_dropout2(F.relu(self.fc2(x)))
        x = self.fc_dropout3(F.relu(self.fc3(x)))
        out = self.out(x)
        
        return out
    
net = net()

 

Θ = {β,W}이 신경망 net의 모든 parameter일 때, β는 마지막 완전연결층(fc3)에서 예측값 out 사이의 가중치(절편 포함), W는 이 layer 전의 모든 weights의 집합이라고 하자. 이때 모든 parameter와 fc3의 weights의 regularization을 추가한 Loss를 최적화한다.

 

CrossEntropyLoss에 각각 l1규제와 l2규제 추가한 Loss

lambda1과 lambda2는 과적합을 방지하기 위한 정규화 모수이다.

lambda1 = 0.01
lambda2 = 0.005

fc_params = torch.cat([x.view(-1) for x in net.out.parameters()])
l1_regularization = lambda1 * torch.norm(fc_params, p=1)
l2_regularization = lambda2 * torch.norm(fc_params, p=2)

print(fc_params.size())

last fully conncected layer에서 prediction layer 사이의 가중치는 256개의 coefficients와 1개의 bias로 총 257개이다

 

 

loss_fn = nn.CrossEntropyLoss()
loss = loss_fn(ypred, y) + l1_regularization # loss에 마지막 fc layer의 parameter l1 규제 추가
optimizer.zero_grad()
loss.backward()
optimizer.step()

 

'PyTorch' 카테고리의 다른 글

[PyTorch] Focal Loss  (0) 2021.04.22
[PyTorch] nn.LSTM input, output shape  (0) 2020.06.30
[PyTorch] Training 헬퍼 함수 만들기  (0) 2020.06.17
[PyTorch] PyTorch 1.5.0 설치하기  (0) 2020.06.16

1. PyTorch 설치를 위해 Anaconda에 새로운 가상환경("pytorch")을 생성하고 활성화시킨다

 

2. PyTorch 홈페이지에서 설치할 버전을 선택하여 설치 명령어를 찾는다. GPU 가속을 사용하기 위해 CUDA 10.2 버전으로 선택한다.

https://pytorch.org/get-started/locally/

 

PyTorch

An open source deep learning platform that provides a seamless path from research prototyping to production deployment.

pytorch.org

conda install pytorch torchvision cudatoolkit=10.2 -c pytorch

 

아래 NVIDIA 홈페이지에서 CUDA도 찾아서 설치한다

https://developer.nvidia.com/cuda-10.2-download-archive

 

CUDA Toolkit 10.2 Download

Select Target Platform Click on the green buttons that describe your target platform. Only supported platforms will be shown. Operating System Architecture Distribution Version Installer Type Do you want to cross-compile? Yes No Select Host Platform Click

developer.nvidia.com

 

3. 활성화된 가상환경에서 conda 명령어를 입력하여 PyTorch 설치

 

4. 주피터 노트북을 실행하고 PyTorch가 설치된 가상환경 커널로 선택하여 빈 노트를 작성한다.

PyToch를 import 한다. cuda가 정상적으로 사용가능한지 확인해본다.

import torch

torch.cuda.is_available()

변수를 생성해서 GPU로 보내본다.

x = torch.randn(5).to("cuda:0")
print(x)

cuda가 정상적으로 사용가능하다

주피터 노트북에서 파이토치 설치가 끝났다

+ Recent posts