출처 및 코드 : https://youtu.be/U0s0f995w14

 

Transformer 인코더 만들기


 먼저 워드 임베딩을 통해 단어를 256의 임베딩 차원으로 나타내고, 트렌스포머의 멀티 헤드 어텐션 헤드의 개수를 8로 설정하면 각 헤드마다의 차원은 256을 8로 나눈 몫인 32차원이다. V, K, Q는 각각 value, key, query값이고 만약, N = 10, embed_size = 256, src_vocab_size = 7 라고 하면, 10개의 문장이 임베딩 레이어를 거쳐 (10, 7, 256)이 되고 8개의 헤드로 쪼개지기 위해 (10, 7, 8, 32)로 reshape한다. 그리고 나서 value, key, query 각각 Linear 레이어를 통과하고 나면 차원은 마찬가지로 (10, 7, 8, 32)이다. 

 이제 어텐션을 구하기 위해서 QK^t를 torch.einsum으로 행렬곱하여 구하고 임베딩 크기의 제곱근 한 값으로 나눠 주어 softmax를 취한다. 이 값에 마찬가지로 torch.einsum을 이용하여 V를 곱하면 8개의 head마다 attention이 구해진다. 이것을 다시 rshape으로 concat해주고 마지막 Linear 레이어를 통과시킨다.

import torch
import torch.nn as nn

class SelfAttention(nn.Module):
    def __init__(self, embed_size, heads):
        super(SelfAttention, self).__init__()
        self.embed_size = embed_size # 256
        self.heads = heads # 8
        self.head_dim = embed_size // heads # 32

        assert (self.head_dim * heads == embed_size), "Embed size needs to be div by heads"

        self.values = nn.Linear(self.head_dim, self.head_dim, bias=False) # 32 -> 32
        self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False) # 32 -> 32
        self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)  # 32 -> 32
        self.fc_out = nn.Linear(heads * self.head_dim, embed_size) # 8 * 32 = 256 -> 256

    def forward(self, values, keys, query, mask):
        N = query.shape[0]
        value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]

        # Split embedding into self.heads pieces
        values = values.reshape(N, value_len, self.heads, self.head_dim)
        keys = keys.reshape(N, key_len, self.heads, self.head_dim)
        queries = query.reshape(N, query_len, self.heads, self.head_dim)

        values = self.values(values)
        keys = self.keys(keys)
        queries = self.queries(queries)

        energy = torch.einsum("nqhd, nkhd -> nhqk", [queries, keys]) # QK^t
        # queries shape : (N, query_len, heads, head_dim)
        # keys shape : (N, key_len, heads, head_dim)
        # energy shape : (N, heads, query_len, key_len)

        if mask is not None:
            energy = energy.masked_fill(mask==0, float("-1e20"))

        attention = torch.softmax(energy / (self.embed_size ** 0.5), dim=3)

        out = torch.einsum('nhql, nlhd -> nqhd', [attention, values]).reshape(N, query_len, self.heads * self.head_dim)
        # attention shape : (N, heads, query_len, key_len)
        # values shape : (N, value_len, heads, head_dim)
        # after einsum (N, query_len, heads, head_dim) then flatten last two dimensions
        out = self.fc_out(out)
        return out

 위에서 만든 파란색 박스친 멀티 헤드 셀프 어텐션 부분은 가져다 쓴다. value, key query가 어텐션 레이어로 들어가고 이때 query는 residual connection이 들어가는데 어텐션을 통과한 값과 query를 더하여 LayerNormalization해준다. 다음으로 forward_expansion배 만큼 커졌다가 다시 줄어드는 Feed Forward 레이어를 통과하고 그 전의 값과 더하여 두 번째 LayerNormalization 해준다. TransformerBolck 모듈이 완성되었다.

class TransformerBlock(nn.Module):
    def __init__(self, embed_size, heads, dropout, forward_expansion):
        super(TransformerBlock, self).__init__()
        self.attention = SelfAttention(embed_size, heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)

        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, forward_expansion * embed_size),
            nn.ReLU(),
            nn.Linear(forward_expansion * embed_size, embed_size)
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, value, key, query, mask):
        attention = self.attention(value, key, query, mask)

        x = self.dropout(self.norm1(attention + query))
        forward = self.feed_forward(x)
        out = self.dropout(self.norm2(forward + x))
        return out

위에서 TransformerBolck 모듈을 만들었으니 이제 인코더 부분을 완성할 차례이다. Transformer는 RNN을 사용하지 않기 때문에 순서에 대한 정보를 주기 위하여 위치 인코딩(Positional Encoding) 과정을 거친다. 먼저 input으로 문장이 들어오고 워드 임베딩 레이어를 거친다. 이 때 단어의 순서를 나타내기 위하여 1부터 단어길이까지 arange하고 문장개수 만큼 복사한다. 그 다음 임베딩 레이어에 통과 시켜 위치 인코딩을 한다. 논문에서는 사인과 코사인 함수를 이용하여 했지만 이렇게 해도 잘 동작한다고 한다. 이제 워드 임베딩 레이어를 통과한 값과 위치 인코딩 레이어를 통과한 값을 더하여 TrnasformerBolck에 넘겨줄 value, key query로 사용한다. 그리고 이 TrnasformerBlock은 위로 N번 만큼 반복하여 통과 시킬수 있다.

 

class Encoder(nn.Module):
    def __init__(self, src_vocab_size, embed_size, num_layers, heads, device,
                 forward_expansion, dropout, max_len):
        super(Encoder, self).__init__()
        self.embed_size = embed_size
        self.device = device
        self.word_embedding = nn.Embedding(src_vocab_size, embed_size)
        self.positional_embedding = nn.Embedding(max_len, embed_size)

        self.layers = nn.ModuleList([
            TransformerBlock(embed_size, heads, dropout=dropout, forward_expansion=forward_expansion)
            for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        N, seq_length = x.shape
        positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)

        out = self.dropout(self.word_embedding(x) + self.positional_embedding(positions))

        for layer in self.layers:
            out = layer(out, out, out, mask) # value, key, query
        return out

Transformer 디코더 만들기


타겟 문장에서 임베딩 레이어와 위치 인코딩을 거쳐나온 값 x가 디코더의 SelfAttention의 value, key, query로 들어가고 mask도 들어간다. attention 모듈을 통과한 값을 다시 x와 더해 LayerNormalization 해주고 이 값을 다시 파란색 박스에 들어갈 query로 사용한다. 이때 인코더로부터 value와 key를 넘겨 받는다. 준비된 value, key, query를 위에서만든 TransformerBolck에 통과시킨다.

class DecoderBlock(nn.Module):
    def __init__(self, embed_size, heads, forward_expansion, dropout, device):
        super(DecoderBlock, self).__init__()
        self.attention = SelfAttention(embed_size, heads)
        self.norm = nn.LayerNorm(embed_size)
        self.transformer_block = TransformerBlock(embed_size, heads, dropout, forward_expansion)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, value, key, src_mask, trg_mask):
        attention = self.attention(x, x, x, trg_mask)
        query = self.dropout(self.norm(attention + x))
        out = self.transformer_block(value, key, query, src_mask)
        return out

 


타겟 문장에 대해서도 워드 임베딩과 위치 인코딩을 진행하고 DecoderBlock을 N회 반복하여 통과 시킨다. 그런다음 마지막으로 Linear 레이어를 통과시킨다.

class Decoder(nn.Module):
    def __init__(self, trg_vocab_size, embed_size, num_layers, heads, forward_expansion, dropout, device, max_lenght):
        super(Decoder, self).__init__()
        self.device = device
        self.word_embedding = nn.Embedding(trg_vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_lenght, embed_size)

        self.layers = nn.ModuleList([
            DecoderBlock(embed_size, heads, forward_expansion, dropout, device)
            for _ in range(num_layers)
        ])

        self.fc_out = nn.Linear(embed_size, trg_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, src_mask, trg_mask):
        N, seq_length = x.shape
        positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
        x = self.dropout(self.word_embedding(x) + self.position_embedding(positions))

        for layer in self.layers:
            x = layer(x, enc_out, enc_out, src_mask, trg_mask)

        out = self.fc_out(x)
        return out

인코더와 디코더를 결합시킨다. 마스크 부분은 잘 모르겠다;

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx,
                 embed_size=256, num_layers=6, forward_expansion=4, heads=8, dropout=0.,
                 device="cpu", max_length=100):
        super(Transformer, self).__init__()
        self.encoder = Encoder(src_vocab_size,
                               embed_size,
                               num_layers,
                               heads,
                               device,
                               forward_expansion,
                               dropout,
                               max_length)

        self.decoder = Decoder(trg_vocab_size,
                               embed_size,
                               num_layers,
                               heads,
                               forward_expansion,
                               dropout,
                               device,
                               max_length)

        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def make_src_mask(self, src):
        src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
        # (N, 1, 1, src_len)
        return src_mask.to(self.device)

    def make_trg_mask(self, trg):
        N, trg_len = trg.shape
        trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(N, 1, trg_len, trg_len)
        return trg_mask.to(self.device)

    def forward(self, src, trg):
        src_mask = self.make_src_mask(src)
        trg_mask = self.make_trg_mask(trg)
        enc_src = self.encoder(src, src_mask)
        out = self.decoder(trg, enc_src, src_mask, trg_mask)
        return out

if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    x = torch.tensor([[1, 5, 6, 4, 3, 9, 5, 2, 0], [1, 8, 7, 3, 4, 5, 6, 7, 2]]).to(device)
    trg = torch.tensor([[1,7, 4, 3, 5, 9, 2, 0], [1, 5, 6, 2, 4, 7, 6, 2]]).to(device)

    src_pad_idx = 0
    trg_pad_idx = 0
    src_vocab_size = 10
    trg_vocab_size = 10
    model = Transformer(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx).to(device)
    out = model(x, trg[:, :-1])
    print(out.shape)

'NLP' 카테고리의 다른 글

Seq2seq with Attention 도식화  (0) 2021.05.23

Seq2Seq


 Seq2seq는 크게 인코더와 디코더 두 개로 구성된 아키텍처로 인코더는 입력 시퀀스(Sequence)를 순차적으로 입력 받아 순환신경망을 통해 입력 시퀀스의 특징을 반영하는 값을 추출하여 하나의 벡터로 만든다. 이 하나의 벡터를 context Vector라고 한다. 디코더에서는 인코더에서 압축한 context vector를 전송받아 순환신경망의 초기 상태(Initial State)로 활용하여 순차적으로 시퀀스를 출력한다.

 

 

 디코더의 순환신경망 첫 번째 초기 입력으로 시퀀스의 시작을 의미하는 <sos>를 받고 인코더로부터 넘어온 context vector를 첫 번째 셀의 hidden state로 사용하여 다음에 등장할 확률이 높은 단어를 예측한다. 이 예측된 단어는 다음 시점인 두 번째 셀의 입력 값이 되고 또 다시 다음에 등장할 단어를 예측하게 된다.  

 

Seq2Seq2를 활용하여 시퀀스 'I am a student'를 시퀀스 '난 학생입니다'를 출력하는 과정 도식화

 

 한편 인코더의 길이가 길어지면 인코더가 압축하는 context vector가 입력 시쿼스의 정보를 제대로 압축하기 어려워지는 병목현상이 발생한다. 이를 해결하기 위해 Attention 기반 Seq2seq 모델이 연구되었다.

 

 

Seq2seq with Attetion


 기본적인 Seq2seq 모델이 디코딩과정에서 context vector만으로 모든 출력을 생성했다면 Attention기반 Seq2seq는 디코딩 과정에서 각 시퀀스마다 서로 다른 context vector를 사용한다. 먼저 인코더에서 시퀀스를 순차적으로 입력받아 순환신경망을 거쳐 Attention layer로 전달한다. Attention layer에서는 입력 시퀀스의 어떤 단어에 집중할지에 대하여 가중치를 조정하는 방식으로 Context vector를 만들어 디코더에 전달하게 되고, 디코더는 Context vector의 정보를 각 step에서 활용하여 시퀀스를 출력한다.

 

Attention layer에 앞서 순환신경망의 인코더의 각 step의 hidden state(hEt)와 디코더의 hidden state(hDt, 처음에는 인코더의 마지막 hidden state hEj값을 사용)를 통해 Attention score를 계산하고 Softmax함수를 통해 정규화하여 Attension wight를 만든다. 다음으로 인코더의 hidden sate와 Attention weights의 가중 합을 계산하여 가중치가 적용된 context vector를 만든다. 마지막으로 디코더에서 Context vector와 디코더의 hidden state 그리고 이전 디코더의 출력값을 이용해 디코더의 다음 hidden state를 출력한다.

 

i = 디코더의 현재 출력 인덱스

j = 인코더의 현재 입력 인덱스

 

에너지(Energy)

$$ e_{ij}=v_{a}^{T}tanh(W_{a}h_{i-1}^{D}+U_{a}h_{j}^{E}) $$

가중치(Weight)

$$ a_{ij}=\frac{exp(e_{ij})}{\sum_{k}exp(e_{ik})} $$

Context Vector

$$ c_{i}=\sum_{j}a_{ij}h_{j}^{E} $$

 

 

구체적인 과정을 살펴보면, 에너지를 구하기 위하여 인코더의 각 스텝의 모든 hidden state hEj를 Fully Connected layer에 통과시킨다 (Ua*hEj). 다음으로 디코더의 현재 step의 hidden sate를 가져오는데, 초기에는 디코더에서 출력한 값이 없으므로 인코더의 마지막 셀의 hidden state(hETx)를 가져와 Fully Connected layer에 통과시킨다 (Wa*h_i-1). 그 다음 두 값을  더한 후 하이퍼블릭 탄젠트 활성화함수를 통과시키고 다시 Fully Connected layer에 통과 시킨다 (vT*tanh(Wa*h_i-1 + Ua*hEj)). 이렇게 구한 에너지 eij를 softmax 함수를 통해 정규화하여 attention score aij를 구한다. 최종적으로 인코더의 hidden state에 attention score를 곱해 context vector ci를 만들게 된다.

Attention의 context vector는 이처럼 시퀀스에서 더 중요한 부분이라고 생각되는 state에 더 높은 가중치를 주게 되어 예측할 때 더 중요한 부분에 집중하게 된다.

 

'NLP' 카테고리의 다른 글

Transformer  (0) 2021.06.30

+ Recent posts