自编码器

自编码器提出年代较早,因此最初只是三层的全连接网络,后面加深网络、引入卷积层、增加了各种正则(去噪、稀疏、收缩)等等改进。

参考链接:
PyTorch 学习笔记:自动编码器
Illustrating latent space

https://github.com/nathanhubens/Autoencoders
https://github.com/L1aoXingyu/pytorch-beginner/tree/master/08-AutoEncoder

Simple_Autoencoder_Solution
Variational Autoencoders Explained
https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html

准备工作:下载数据、定义绘图函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import matplotlib.pyplot as plt
from torchvision.utils import make_grid
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor
# ToTensor: PIL Image / ndarray to Tensor
# (HxWxC) in range [0, 255] to (CxHxW) in range [0., 1.]

train_data = MNIST(root='CVdata', train=True, download=True, transform=ToTensor())

def show_img(img_tensor, nrow=8, normalized=False, **kwargs):
data = img_tensor.detach().cpu() # Tensor data(detached) on CPU
if normalized:
data = (data +1)/2 # [-1, 1] -> [0, 1]
imgs = data.view(-1, 1, 28, 28) # (NxCxHxW): N, 1, 28, 28
img_grid = make_grid(imgs, nrow) # (NxCxHxW) to (C'xH'xW')
img = img_grid.permute(1, 2, 0) # (CxHxW) to (HxWxC)
#from torchvision.transforms.functional import to_pil_image
#img = to_pil_image(img_grid) # Tensor to PIL Image
plt.imshow(img, **kwargs)
plt.show()

AutoEncoder

自编码器:全连接网络、均方误差损失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import torch.nn as nn

# Vanilla AutoEncoder
class AutoEncoder(nn.Module):
def __init__(self):
super().__init__()

self.encoder = nn.Sequential(
nn.Linear(28*28, 2),
nn.Tanh()) #ReLU

self.decoder = nn.Sequential(
nn.Linear(2, 28*28),
nn.Sigmoid())

def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)

return encoded, decoded

loss_func = nn.MSELoss() #nn.MAELoss()

这是一个简单的自编码器,本质上是三层的全连接网络,特殊之处在于输入与输出维度及取值范围一致,且中间隐含层维度通常比输入层低。模型被刻意拆解为编码器及解码器两部分,乍看之下显得比全连接复杂,其实并无区别。模型经训练后,使用时通常只需其中之一:数据降维用编码器,数据生成用解码器。
具体的,上述模型中,输入层及输出层维度均为28*28(MNIST图片),输入数据经to_tensor转换后位于(0, 1),因此输出层激活函数选择Sigmoid,使输出与输入范围一致,便于对比。此外,为了便于之后可视化隐变量空间,这里隐含层维度设置为2。在这些限制之下,网络结构本身可调整的并不多,可修改的只有损失函数以及隐含层的激活函数。
上述模型中,损失函数取均方误差MSE,隐含层激活函数则设为Tanh。模型性能优化只能靠模型训练超参数的调整,如批尺寸、初始化(?)、优化器、学习率、(重复训练)回合数等。最后,还可将输入数据转换至(-1,1),与输入保持一致,此时输出层激活需改为Tanh。

  • 编码器用Tanh激活,意味解码器输入位于(-1, 1),用ReLU则解码器输入始终大于0

训练准备:计算平台、数据分批

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import torch
from torch import optim
from torch import autograd
from torch.utils.data import DataLoader

torch.manual_seed(0) #Fix random seed for reproducibility
device = torch.device("cpu")
#device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
train_loader = DataLoader(dataset=train_data, batch_size=128, shuffle=True)
num_train_data = len(train_data)

def training(model, loss_func, optimizer, lr_scheduler=None,
epochs=10, show_plot=True, normalized=False):
avg_losses = []
for epoch in range(epochs):
running_loss = 0
for imgs, label in train_loader:
batch_data = autograd.Variable(imgs.view(-1, 28*28)).to(device)
encoded, decoded = model(batch_data)
loss = loss_func(decoded, batch_data)
running_loss += loss.detach() * imgs.shape[0]

optimizer.zero_grad()
loss.backward()
optimizer.step()
if lr_scheduler is not None:
lr_scheduler.step()

avg_loss = running_loss/num_train_data # avg loss of each epoch
avg_losses.append(avg_loss)
if (epoch+1) % (epochs//5) == 0: # display training info(5 times in total)
print(f'Epoch:{epoch+1}/{epochs} | train loss: {avg_loss:.4f}')
if show_plot:
print(label.view(16,-1))
dataset = torch.hstack([batch_data, decoded])
show_img(dataset, nrow=16, normalized=normalized)
return avg_losses

模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
model = AutoEncoder().to(device)
learning_rate = 1e-2
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
losses = training(model, loss_func, optimizer)
plt.plot(losses)
plt.show()

#https://zhuanlan.zhihu.com/p/83626029
#https://www.zhihu.com/question/32673260
#https://pytorch.org/docs/stable/optim.html
epochs = 100
model = AutoEncoder().to(device)
for learning_rate in (10, 1, 1e-1, 1e-2, 1e-3):
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
losses = training(model, loss_func, optimizer, epochs=epochs, show_plot=False)
plt.plot(losses, label=f"lr:{learning_rate};optim:Adam")

for learning_rate in (10, 1, 1e-1, 1e-2, 1e-3):
optimizer = optim.SGD(model.parameters(), lr=learning_rate)
losses = training(model, loss_func, optimizer, epochs=epochs, show_plot=False)
plt.plot(losses, label=f"lr:{learning_rate};optim:SGD")
plt.legend()
plt.show()


epochs=100
learning_rate = 1e-2
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)
losses = training(model, loss_func, optimizer, lr_scheduler, epochs)
plt.plot(losses)
plt.show()

模型性能直接由隐变量维度决定,这里设置为2,便于可视化隐变量空间

数据压缩

1
2
3
4
5
6
7
test_data = MNIST(root='CVdata', train=False, download=True, transform=to_tensor)
encode_loader = DataLoader(dataset=test_data, batch_size=len(test_data))
imgs, labels = next(iter(encode_loader))
encode_batch_data = imgs.view(-1, 28*28)

encoded = model.encoder(encode_batch_data.to(device))
code = encoded.detach()

数据生成

1
2
3
4
5
6
7
8
num = 21
x = torch.linspace(-5, 5, num)
y = torch.linspace(5, -5, num)
# X, Y = torch.meshgrid(x, y) # result img transposed
Y, X = torch.meshgrid(y, x)
seed_code = torch.stack([X.flatten(), Y.flatten()], dim=1)
# seed_code = 10*torch.rand(15, 2)-5
decoded = model.decoder(seed_code.to(device)) # Tensor(num*num, 28*28)

隐空间可视化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
plt.figure(figsize=(15, 6))
plt.subplot(1, 2, 1)
plt.gca().set_aspect('equal')
plt.scatter(code[:, 0], code[:, 1], c=labels, s=1)
plt.xlim([-5, 5])
plt.ylim([-5, 5])
plt.colorbar()
plt.grid()

dx = (x[1]-x[0])/2
dy = (y[1]-y[0])/2
extent = [x[0]-dx, x[-1]+dx, y[-1]+dy, y[0]-dy]
plt.subplot(1, 2, 2)
plt.gca().set_aspect('equal')
show_img(decoded, nrow=num, extent=extent)
# data = decoded.detach().cpu()
# imgs = data.view(-1, 28, 28)
# canvas = torch.empty((28*num, 28*num))
# for k in range(num):
# for l in range(num):
# canvas[k*28:(k+1)*28, l*28:(l+1)*28] = imgs[k*num+l]
# plt.imshow(canvas, extent=extent, cmap="gray")
# plt.show()

ReLU

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Vanilla AutoEncoder
class AutoEncoder(nn.Module):
def __init__(self):
super().__init__()

self.encoder = nn.Sequential(
nn.Linear(28*28, 2),
nn.ReLU())

self.decoder = nn.Sequential(
nn.Linear(2, 28*28),
nn.Sigmoid())

def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)

return encoded, decoded

loss_func = nn.MSELoss() #nn.MAELoss()
model = AutoEncoder().to(device)

learning_rate = 5e-3
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
trained_model, losses = training(model, loss_func, optimizer)

Data Normalize

1
2
3
4
5
6
7
8
9
10
11
12
13
from torchvision.transforms import Compose, ToTensor, Normalize
transform = Compose([
ToTensor(),
Normalize([0.5,], [0.5,]) # mean, std
])
train_data = MNIST(root='CVdata', train=True, transform=transform)

model = AutoEncoder().to(device)
learning_rate = 1e-2
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
losses = training(model, loss_func, optimizer)
plt.plot(losses)
plt.show()

Deep/Conv AE

加深网络层数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import torch.nn as nn

class DeepAE(nn.Module):
def __init__(self):
super().__init__()

self.encoder = nn.Sequential(
nn.Linear(28*28, 1000),
nn.ReLU(),
nn.Linear(1000, 1000),
nn.ReLU(),
nn.Linear(1000, 2),
#nn.Tanh()
)

self.decoder = nn.Sequential(
nn.Linear(2, 1000),
nn.ReLU(),
nn.Linear(1000, 1000),
nn.ReLU(),
nn.Linear(1000, 28*28),
nn.Sigmoid()
)

def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)

return encoded, decoded

loss_func = nn.MSELoss() #nn.MAELoss()

model = DeepAE().to(device)
learning_rate = 5e-3
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
trained_model, losses = training(model, loss_func, optimizer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class DeepAE(nn.Module):
def __init__(self):
super().__init__()

self.encoder = nn.Sequential(
nn.Linear(28*28, 256),
nn.ReLU(),
nn.Linear(256, 64),
nn.ReLU(),
nn.Linear(64, 16),
nn.ReLU(),
nn.Linear(16, 2),
#nn.Tanh()
)

self.decoder = nn.Sequential(
nn.Linear(2, 16),
nn.ReLU(),
nn.Linear(16, 64),
nn.ReLU(),
nn.Linear(64, 256),
nn.ReLU(),
nn.Linear(256, 28*28),
nn.Sigmoid()
)

def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)

return encoded, decoded

loss_func = nn.MSELoss() #nn.MAELoss()
model = DeepAE().to(device)
1
training(model, loss_func, optimizer)

使用卷积网络

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ConvAE(nn.Module):
def __init__(self):
super().__init__()

self.encoder = nn.Sequential(
nn.Conv2d(1, 8, 3, stride=2, padding=1), # 28->15; C_in, C_out, kernel
nn.ReLU(),
nn.MaxPool2d(2, stride=1), # 15->14
nn.Conv2d(8, 16, 3, stride=2, padding=1), # 14->8
nn.ReLU(),
nn.MaxPool2d(2, stride=1), # 8->7
nn.Conv2d(16, 16, 3, stride=3, padding=1),# 7->3
nn.ReLU(),
nn.MaxPool2d(2, stride=1)) # 3->2

self.decoder = nn.Sequential(
nn.ConvTranspose2d(16, 16, 3, stride=2), # 2->5
nn.ReLU(),
nn.ConvTranspose2d(16, 8, 5, stride=3, padding=1), # 5->15
nn.ReLU(),
nn.ConvTranspose2d(8, 1, 2, stride=2, padding=1), # 15->28
nn.Sigmoid())

def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)

return encoded, decoded

loss_func = nn.MSELoss() #nn.MAELoss()
model = DeepAE().to(device)

training(model, loss_func, optimizer)

Regularization

加入正则化

Variational AE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class VAE(nn.Module):
def __init__(self):
super().__init__()

self.fc1 = nn.Linear(784, 400)
self.fc21 = nn.Linear(400, 20)
self.fc22 = nn.Linear(400, 20)
self.fc3 = nn.Linear(20, 400)
self.fc4 = nn.Linear(400, 784)

def encode(self, x):
h1 = F.relu(self.fc1(x))
return self.fc21(h1), self.fc22(h1)

def reparametrize(self, mu, logvar):
std = logvar.mul(0.5).exp_()
if torch.cuda.is_available():
eps = torch.cuda.FloatTensor(std.size()).normal_()
else:
eps = torch.FloatTensor(std.size()).normal_()
eps = Variable(eps)
return eps.mul(std).add_(mu)

def decode(self, z):
h3 = F.relu(self.fc3(z))
return F.sigmoid(self.fc4(h3))

def forward(self, x):
mu, logvar = self.encode(x)
z = self.reparametrize(mu, logvar)
return self.decode(z), mu, logvar

reconstruction_function = nn.BCELoss(size_average=False) # mse loss

def loss_func(recon_x, x, mu, logvar):
"""
recon_x: generating images
x: origin images
mu: latent mean
logvar: latent log variance
"""
BCE = reconstruction_function(recon_x, x)
# loss = 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
KLD_element = mu.pow(2).add_(logvar.exp()).mul_(-1).add_(1).add_(logvar)
KLD = torch.sum(KLD_element).mul_(-0.5)
# KL divergence
return BCE + KLD

model = VAE().to(device)
training(model, loss_func, optimizer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# VAE model
class VAE(nn.Module):
def __init__(self, image_size=784, h_dim=400, z_dim=20):
super(VAE, self).__init__()
self.fc1 = nn.Linear(image_size, h_dim)
self.fc2 = nn.Linear(h_dim, z_dim)
self.fc3 = nn.Linear(h_dim, z_dim)
self.fc4 = nn.Linear(z_dim, h_dim)
self.fc5 = nn.Linear(h_dim, image_size)

def encode(self, x):
h = F.relu(self.fc1(x))
return self.fc2(h), self.fc3(h)

def reparameterize(self, mu, log_var):
std = torch.exp(log_var/2)
eps = torch.randn_like(std)
return mu + eps * std

def decode(self, z):
h = F.relu(self.fc4(z))
return F.sigmoid(self.fc5(h))

def forward(self, x):
mu, log_var = self.encode(x)
z = self.reparameterize(mu, log_var)
x_reconst = self.decode(z)
return x_reconst, mu, log_var

def loss_func(x, x_reconst, mu, log_var):
reconst_loss = nn.BCELoss(size_average=False)(x_reconst, x)
kl_div = - 0.5 * torch.sum(1 + log_var - mu.pow(2) - log_var.exp())
loss = reconst_loss + kl_div
return loss

model = VAE().to(device)
training(model, loss_func, optimizer)

Stable Diffusion is a latent diffusion model which combines an autoencoder with a diffusion model that is trained in the latent space of the autoencoder.
稳定扩散模型是在自编码器的隐空间中训练扩散模型