출처 및 코드 : https://youtu.be/U0s0f995w14
Transformer 인코더 만들기
먼저 워드 임베딩을 통해 단어를 256의 임베딩 차원으로 나타내고, 트렌스포머의 멀티 헤드 어텐션 헤드의 개수를 8로 설정하면 각 헤드마다의 차원은 256을 8로 나눈 몫인 32차원이다. V, K, Q는 각각 value, key, query값이고 만약, N = 10, embed_size = 256, src_vocab_size = 7 라고 하면, 10개의 문장이 임베딩 레이어를 거쳐 (10, 7, 256)이 되고 8개의 헤드로 쪼개지기 위해 (10, 7, 8, 32)로 reshape한다. 그리고 나서 value, key, query 각각 Linear 레이어를 통과하고 나면 차원은 마찬가지로 (10, 7, 8, 32)이다.
이제 어텐션을 구하기 위해서 QK^t를 torch.einsum으로 행렬곱하여 구하고 임베딩 크기의 제곱근 한 값으로 나눠 주어 softmax를 취한다. 이 값에 마찬가지로 torch.einsum을 이용하여 V를 곱하면 8개의 head마다 attention이 구해진다. 이것을 다시 rshape으로 concat해주고 마지막 Linear 레이어를 통과시킨다.
import torch
import torch.nn as nn
class SelfAttention(nn.Module):
def __init__(self, embed_size, heads):
super(SelfAttention, self).__init__()
self.embed_size = embed_size # 256
self.heads = heads # 8
self.head_dim = embed_size // heads # 32
assert (self.head_dim * heads == embed_size), "Embed size needs to be div by heads"
self.values = nn.Linear(self.head_dim, self.head_dim, bias=False) # 32 -> 32
self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False) # 32 -> 32
self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False) # 32 -> 32
self.fc_out = nn.Linear(heads * self.head_dim, embed_size) # 8 * 32 = 256 -> 256
def forward(self, values, keys, query, mask):
N = query.shape[0]
value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]
# Split embedding into self.heads pieces
values = values.reshape(N, value_len, self.heads, self.head_dim)
keys = keys.reshape(N, key_len, self.heads, self.head_dim)
queries = query.reshape(N, query_len, self.heads, self.head_dim)
values = self.values(values)
keys = self.keys(keys)
queries = self.queries(queries)
energy = torch.einsum("nqhd, nkhd -> nhqk", [queries, keys]) # QK^t
# queries shape : (N, query_len, heads, head_dim)
# keys shape : (N, key_len, heads, head_dim)
# energy shape : (N, heads, query_len, key_len)
if mask is not None:
energy = energy.masked_fill(mask==0, float("-1e20"))
attention = torch.softmax(energy / (self.embed_size ** 0.5), dim=3)
out = torch.einsum('nhql, nlhd -> nqhd', [attention, values]).reshape(N, query_len, self.heads * self.head_dim)
# attention shape : (N, heads, query_len, key_len)
# values shape : (N, value_len, heads, head_dim)
# after einsum (N, query_len, heads, head_dim) then flatten last two dimensions
out = self.fc_out(out)
return out
위에서 만든 파란색 박스친 멀티 헤드 셀프 어텐션 부분은 가져다 쓴다. value, key query가 어텐션 레이어로 들어가고 이때 query는 residual connection이 들어가는데 어텐션을 통과한 값과 query를 더하여 LayerNormalization해준다. 다음으로 forward_expansion배 만큼 커졌다가 다시 줄어드는 Feed Forward 레이어를 통과하고 그 전의 값과 더하여 두 번째 LayerNormalization 해준다. TransformerBolck 모듈이 완성되었다.
class TransformerBlock(nn.Module):
def __init__(self, embed_size, heads, dropout, forward_expansion):
super(TransformerBlock, self).__init__()
self.attention = SelfAttention(embed_size, heads)
self.norm1 = nn.LayerNorm(embed_size)
self.norm2 = nn.LayerNorm(embed_size)
self.feed_forward = nn.Sequential(
nn.Linear(embed_size, forward_expansion * embed_size),
nn.ReLU(),
nn.Linear(forward_expansion * embed_size, embed_size)
)
self.dropout = nn.Dropout(dropout)
def forward(self, value, key, query, mask):
attention = self.attention(value, key, query, mask)
x = self.dropout(self.norm1(attention + query))
forward = self.feed_forward(x)
out = self.dropout(self.norm2(forward + x))
return out
위에서 TransformerBolck 모듈을 만들었으니 이제 인코더 부분을 완성할 차례이다. Transformer는 RNN을 사용하지 않기 때문에 순서에 대한 정보를 주기 위하여 위치 인코딩(Positional Encoding) 과정을 거친다. 먼저 input으로 문장이 들어오고 워드 임베딩 레이어를 거친다. 이 때 단어의 순서를 나타내기 위하여 1부터 단어길이까지 arange하고 문장개수 만큼 복사한다. 그 다음 임베딩 레이어에 통과 시켜 위치 인코딩을 한다. 논문에서는 사인과 코사인 함수를 이용하여 했지만 이렇게 해도 잘 동작한다고 한다. 이제 워드 임베딩 레이어를 통과한 값과 위치 인코딩 레이어를 통과한 값을 더하여 TrnasformerBolck에 넘겨줄 value, key query로 사용한다. 그리고 이 TrnasformerBlock은 위로 N번 만큼 반복하여 통과 시킬수 있다.
class Encoder(nn.Module):
def __init__(self, src_vocab_size, embed_size, num_layers, heads, device,
forward_expansion, dropout, max_len):
super(Encoder, self).__init__()
self.embed_size = embed_size
self.device = device
self.word_embedding = nn.Embedding(src_vocab_size, embed_size)
self.positional_embedding = nn.Embedding(max_len, embed_size)
self.layers = nn.ModuleList([
TransformerBlock(embed_size, heads, dropout=dropout, forward_expansion=forward_expansion)
for _ in range(num_layers)
])
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
N, seq_length = x.shape
positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
out = self.dropout(self.word_embedding(x) + self.positional_embedding(positions))
for layer in self.layers:
out = layer(out, out, out, mask) # value, key, query
return out
Transformer 디코더 만들기
타겟 문장에서 임베딩 레이어와 위치 인코딩을 거쳐나온 값 x가 디코더의 SelfAttention의 value, key, query로 들어가고 mask도 들어간다. attention 모듈을 통과한 값을 다시 x와 더해 LayerNormalization 해주고 이 값을 다시 파란색 박스에 들어갈 query로 사용한다. 이때 인코더로부터 value와 key를 넘겨 받는다. 준비된 value, key, query를 위에서만든 TransformerBolck에 통과시킨다.
class DecoderBlock(nn.Module):
def __init__(self, embed_size, heads, forward_expansion, dropout, device):
super(DecoderBlock, self).__init__()
self.attention = SelfAttention(embed_size, heads)
self.norm = nn.LayerNorm(embed_size)
self.transformer_block = TransformerBlock(embed_size, heads, dropout, forward_expansion)
self.dropout = nn.Dropout(dropout)
def forward(self, x, value, key, src_mask, trg_mask):
attention = self.attention(x, x, x, trg_mask)
query = self.dropout(self.norm(attention + x))
out = self.transformer_block(value, key, query, src_mask)
return out
타겟 문장에 대해서도 워드 임베딩과 위치 인코딩을 진행하고 DecoderBlock을 N회 반복하여 통과 시킨다. 그런다음 마지막으로 Linear 레이어를 통과시킨다.
class Decoder(nn.Module):
def __init__(self, trg_vocab_size, embed_size, num_layers, heads, forward_expansion, dropout, device, max_lenght):
super(Decoder, self).__init__()
self.device = device
self.word_embedding = nn.Embedding(trg_vocab_size, embed_size)
self.position_embedding = nn.Embedding(max_lenght, embed_size)
self.layers = nn.ModuleList([
DecoderBlock(embed_size, heads, forward_expansion, dropout, device)
for _ in range(num_layers)
])
self.fc_out = nn.Linear(embed_size, trg_vocab_size)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_out, src_mask, trg_mask):
N, seq_length = x.shape
positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
x = self.dropout(self.word_embedding(x) + self.position_embedding(positions))
for layer in self.layers:
x = layer(x, enc_out, enc_out, src_mask, trg_mask)
out = self.fc_out(x)
return out
인코더와 디코더를 결합시킨다. 마스크 부분은 잘 모르겠다;
class Transformer(nn.Module):
def __init__(self, src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx,
embed_size=256, num_layers=6, forward_expansion=4, heads=8, dropout=0.,
device="cpu", max_length=100):
super(Transformer, self).__init__()
self.encoder = Encoder(src_vocab_size,
embed_size,
num_layers,
heads,
device,
forward_expansion,
dropout,
max_length)
self.decoder = Decoder(trg_vocab_size,
embed_size,
num_layers,
heads,
forward_expansion,
dropout,
device,
max_length)
self.src_pad_idx = src_pad_idx
self.trg_pad_idx = trg_pad_idx
self.device = device
def make_src_mask(self, src):
src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
# (N, 1, 1, src_len)
return src_mask.to(self.device)
def make_trg_mask(self, trg):
N, trg_len = trg.shape
trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(N, 1, trg_len, trg_len)
return trg_mask.to(self.device)
def forward(self, src, trg):
src_mask = self.make_src_mask(src)
trg_mask = self.make_trg_mask(trg)
enc_src = self.encoder(src, src_mask)
out = self.decoder(trg, enc_src, src_mask, trg_mask)
return out
if __name__ == "__main__":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
x = torch.tensor([[1, 5, 6, 4, 3, 9, 5, 2, 0], [1, 8, 7, 3, 4, 5, 6, 7, 2]]).to(device)
trg = torch.tensor([[1,7, 4, 3, 5, 9, 2, 0], [1, 5, 6, 2, 4, 7, 6, 2]]).to(device)
src_pad_idx = 0
trg_pad_idx = 0
src_vocab_size = 10
trg_vocab_size = 10
model = Transformer(src_vocab_size, trg_vocab_size, src_pad_idx, trg_pad_idx).to(device)
out = model(x, trg[:, :-1])
print(out.shape)
'NLP' 카테고리의 다른 글
Seq2seq with Attention 도식화 (0) | 2021.05.23 |
---|