import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import spacy
import torch
from torch.nn import *
from torch.nn import functional as F
import torchvision as tv
import torchvision.transforms as T
from torchtext.datasets import LanguageModelingDataset
from torchtext.data import Field, LabelField, TabularDataset, BucketIterator, BPTTIterator
torch.backends.cudnn.deterministic = True
def train(model, train_loader, epochs=1):
criterion = CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(epochs):
for inputs, targets in train_loader:
inputs, targets = inputs.cuda(), targets.cuda()
loss = criterion(model(inputs), targets)
optimizer.zero_grad()
loss.backward()
optimizer.step()
def test(model, test_loader):
model.eval()
with torch.no_grad():
correct = 0
for inputs, targets in test_loader:
inputs, targets = inputs.cuda(), targets.cuda()
preds = model(inputs).argmax(1)
correct += (preds==targets).sum().item()
print(f'Test Acc: {correct / len(test_loader.dataset):.3f}')
train_set = tv.datasets.MNIST('', True, T.ToTensor(), download=True)
test_set = tv.datasets.MNIST('', False, T.ToTensor(), download=True)
train_loader = torch.utils.data.DataLoader(train_set, 32, num_workers=4)
test_loader = torch.utils.data.DataLoader(test_set, 32, num_workers=4)
cnn = Sequential(
Conv2d(1,16,5), BatchNorm2d(16), ReLU(), MaxPool2d(2),
Conv2d(16,32,5), BatchNorm2d(32), ReLU(), MaxPool2d(2),
Flatten(), Linear(32*4*4,128), ReLU(), Linear(128,10)
).cuda()
train(cnn, train_loader)
test(cnn, test_loader)
transform = T.Compose([T.Resize((224,224),1), T.ToTensor(), T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])])
img = transform(Image.open('resnet_test.jpg')).unsqueeze(0)
resnet50 = tv.models.resnet50(True).eval()
probs = F.softmax(resnet50(img),1)[0]
display(Image.open('resnet_test.jpg').resize((320,180)))
classes = [s[s.find(' ')+1:-1] for s in open('imagenet_classes.txt').readlines()]
for i in probs.sort(descending=True)[1][:5]: print(f'Pred/prob: {classes[i]}, {probs[i]}')
Tip:DataLoader载入数据时按照类别顺序,需将 [shuffle] 设置为 [True]
trans_train = T.Compose([T.RandomResizedCrop(224) , T.ToTensor(), T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])])
trans_test = T.Compose([T.Resize((224,224),1), T.ToTensor(), T.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])])
train_set = tv.datasets.ImageFolder('Ant&Bee/train', trans_train)
test_set = tv.datasets.ImageFolder('Ant&Bee/test', trans_test)
train_loader = torch.utils.data.DataLoader(train_set, 32, True, num_workers=4)
test_loader = torch.utils.data.DataLoader(test_set, 32, True, num_workers=4)
model = tv.models.resnet50(True).cuda()
for p in model.parameters(): p.requires_grad=False
model.fc = Linear(model.fc.in_features, 2).cuda()
train(model, train_loader, 5)
test(model, test_loader)
img = trans_test(Image.open('ant.jpg')).unsqueeze(0).cuda()
probs = F.softmax(model(img),1)[0]
display(Image.open('ant.jpg').resize((280,160)))
print(f'Names: {train_set.classes}\nProbs: {[round(p,3) for p in probs.tolist()]}')
print(f'Result: {train_set.classes[probs.argmax()]}')
def train_gan(D, G, data_loader, epochs=20):
criterion = BCELoss()
d_optimizer = torch.optim.Adam(D.parameters(), 0.0002)
g_optimizer = torch.optim.Adam(G.parameters(), 0.0002)
for epoch in range(epochs):
for images, _ in data_loader:
batch_size = images.shape[0]
images = images.reshape(batch_size, -1).cuda()
real_labels = torch.ones(batch_size, 1).cuda()
fake_labels = torch.zeros(batch_size, 1).cuda()
fake_images = G(torch.rand(batch_size, 64).cuda())
d_loss_real = criterion(D(images), real_labels)
d_loss_fake = criterion(D(fake_images.detach()), fake_labels)
d_loss = d_loss_real + d_loss_fake
d_optimizer.zero_grad()
d_loss.backward()
d_optimizer.step()
g_loss = criterion(D(fake_images), real_labels)
g_optimizer.zero_grad()
g_loss.backward()
g_optimizer.step()
transform = T.Compose([T.ToTensor(), T.Normalize([0.5],[0.5])])
data_set = tv.datasets.MNIST('',True, transform, download=True)
data_loader = torch.utils.data.DataLoader(data_set, 100, num_workers=4)
D = Sequential(Linear(28*28,256), LeakyReLU(0.2), Linear(256,256), LeakyReLU(0.2), Linear(256,1), Sigmoid()).cuda()
G = Sequential(Linear(64,256), ReLU(), Linear(256,256), ReLU(), Linear(256,28*28), Tanh()).cuda()
train_gan(D, G, data_loader)
fake_image = G(torch.rand(1,64).cuda())
plt.imshow(fake_image.view(28,28).data.cpu().numpy())
print(f'Discriminator result: {D(fake_image).item():.3f}')
Tip:注意检查文本数据是否为 [utf-8] 编码
def train_sentiment(model, train_iter, epochs=5):
criterion = BCEWithLogitsLoss().cuda()
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(epochs):
for batch in train_iter:
text, lengths = batch.text
preds = model(text, lengths).squeeze()
loss = criterion(preds, batch.label)
optimizer.zero_grad()
loss.backward()
optimizer.step()
def test_sentiment(model, test_iter):
model.eval()
with torch.no_grad():
correct = 0
for batch in test_iter:
text, lengths = batch.text
preds = model(text, lengths).squeeze()
rounded_preds = torch.round(torch.sigmoid(preds))
correct += (rounded_preds==batch.label).sum().item()
print(f'Test Acc: {correct / len(test_iter.dataset):.3f}')
def pred_sentiment(model, sentence):
tokens = [tok.text for tok in nlp.tokenizer(sentence.lower())]
sequence = [TEXT.vocab.stoi[t] for t in tokens]
text = torch.LongTensor(sequence).unsqueeze(1).T.cuda()
length = torch.LongTensor([len(sequence)])
pred = torch.sigmoid(model(text, length)).item()
result = 'Insincere' if pred>0.5 else 'Sincere'
print(f'\nSentence: {sentence}\nPrediction: {pred:.3f}\nResult: {result}')
class RNN(Module):
def __init__(self, vocab_size, emb_dim, hid_dim, n_layers, out_dim):
super().__init__()
self.emb = Embedding(vocab_size, emb_dim)
self.lstm = LSTM(emb_dim, hid_dim, n_layers, batch_first=True, dropout=0.2, bidirectional=True)
self.fc = Linear(hid_dim*2, out_dim)
def forward(self, text, lengths):
embed = self.emb(text)
packed_emb = utils.rnn.pack_padded_sequence(embed, lengths, True)
packed_out, (hidden, cell) = self.lstm(packed_emb)
hidden = torch.cat((hidden[-2,:,:],hidden[-1,:,:]), 1)
return self.fc(hidden)
TEXT = Field(lower=True, tokenize='spacy', include_lengths=True, batch_first=True)
LABEL = LabelField(dtype=torch.float, batch_first=True)
dataset = TabularDataset('quora.csv', 'csv', [('text',TEXT),('label',LABEL)], True)
train_set, test_set = dataset.split()
TEXT.build_vocab(train_set, vectors='glove.6B.100d')
LABEL.build_vocab(train_set)
train_iter = BucketIterator(train_set, 32, lambda x:len(x.text), 'cuda', sort_within_batch=True)
test_iter = BucketIterator(test_set, 32, lambda x:len(x.text), 'cuda', sort_within_batch=True)
rnn = RNN(len(TEXT.vocab), 100, 32, 2, 1).cuda()
rnn.emb.weight.data.copy_(TEXT.vocab.vectors)
train_sentiment(rnn, train_iter)
test_sentiment(rnn, test_iter)
nlp = spacy.load('en')
pred_sentiment(rnn, 'Which framework is better TensorFlow or PyTorch?')
pred_sentiment(rnn, 'Why does that Chinese guy prefer TensorFlow?')
困惑度Perplexity:交叉熵的指数形式,度量概率分布与样本的契合程度,用于评价语言模型
def train_lm(model, train_iter, epochs=20):
criterion = CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(epochs):
states = model.init_state(train_iter.batch_size)
for batch in train_iter:
inputs, targets = batch.text.cuda(), batch.target.cuda()
outputs, states = model(inputs, states)
states = [state.detach() for state in states]
loss = criterion(outputs, targets.reshape(-1))
optimizer.zero_grad()
loss.backward()
utils.clip_grad_norm_(model.parameters(), 0.5)
optimizer.step()
def test_lm(model, test_iter):
model.eval()
with torch.no_grad():
total_loss, correct, count = 0, 0, 0
states = model.init_state(test_iter.batch_size)
for batch in test_iter:
inputs, targets = batch.text.cuda(), batch.target.cuda()
outputs, states = model(inputs, states)
correct += (outputs.argmax(1)==targets.reshape(-1)).sum().item()
loss = F.cross_entropy(outputs, targets.reshape(-1))
count += np.multiply(*inputs.size())
total_loss += loss.item()*np.multiply(*inputs.size())
print(f'Test Acc: {correct/count:.3f} Test PPL: {np.exp(total_loss/count):.3f}')
def generate(model, seed, length):
seed_idx = torch.tensor(TEXT.vocab.stoi[seed.lower()])
inputs = torch.stack([seed_idx]).unsqueeze(1).cuda()
states = model.init_state(1)
for i in range(length):
output, states = rnn(inputs, states)
word_weights = output.squeeze().exp()
word_idx = torch.multinomial(word_weights, 1)[0]
word = '\n' if word_idx==2 else ' '+TEXT.vocab.itos[word_idx]
seed += word
inputs.fill_(word_idx)
print(f'\nGenerated Sonnet:\n\n{seed}')
class RNN(Module):
def __init__(self, vocab_size, emb_dim, hid_dim, n_layers):
super().__init__()
self.hid_dim = hid_dim
self.n_layers = n_layers
self.emb = Embedding(vocab_size, emb_dim)
self.lstm = LSTM(emb_dim, hid_dim, n_layers, batch_first=True)
self.fc = Linear(hid_dim, vocab_size)
def forward(self, inputs, states):
embed = self.emb(inputs)
out, states = self.lstm(embed, states)
out = out.reshape(out.size(0)*out.size(1), out.size(2))
return self.fc(out), states
def init_state(self, batch_size):
return (torch.zeros(self.n_layers, batch_size, self.hid_dim, device='cuda'),
torch.zeros(self.n_layers, batch_size, self.hid_dim, device='cuda'))
TEXT = Field(lower=True, tokenize='spacy', batch_first=True)
train_set = LanguageModelingDataset('sonnets_train.txt', TEXT)
test_set = LanguageModelingDataset('sonnets_test.txt', TEXT)
TEXT.build_vocab(train_set)
train_iter = BPTTIterator(train_set, 32, 64, shuffle=True, device='cuda')
test_iter = BPTTIterator(test_set, 32, 64, device='cuda')
rnn = RNN(len(TEXT.vocab), 64, 32, 2).cuda()
train_lm(rnn, train_iter)
test_lm(rnn, test_iter)
generate(rnn, 'Look', 100)