Anomaly Detection

[논문 리뷰 및 코드구현] Deep One-Class Classification(Deep SVDD)

Barca 2020. 10. 30. 23:14

[Review] Deep One-Class Classification, ICML 2018

 

 이번 포스팅은 Unsupervised Anomaly Detection 방법론인 Deep SVDD를 다루는 Deep One-Class Classification 논문을 살펴보겠습니다. 먼저, 이 포스팅은 Deep One-Class Classification논문과 고려대학교 산업경영공학부 강필성 교수님의 강의, 그리고 DSBA 최희정님의 세미나 발표자료를 참고하여 작성하였음을 밝힙니다.

 

먼저, SVDD(Support Vector Data Description)의 근간을 이루는 SVM(Support Vector Machine)은 아래 그림과 같이 서로 다른 Class의 Sample들을 잘 분류하는 Classifier를 찾는 것입니다. 데이터의 차원이 2차원일때는 직선 Classifier, 3차원일때는 평면 Classifider, ..., d차원일때는 d-1차원의 Hyperplane Classifier를 찾는 것입니다.

Support Vector Machine

 

그리고, SVDD는 비선형 SVM을 응용한 One-Class Classification을 위한 대표적인 방법으로 이상치를 분류하는 기법입니다.

그림1. SVDD

위의 그림과 같이 SVDD는 기존 데이터 Point들의 Feature space에서 정상 데이터를 둘러싸는 Hypersphere(초구, Boundary)를 찾고, 해당 Boundary를 기준으로 이상치를 탐지합니다.

이렇게 Hypersphere를 찾을 때에는 최적화된 초구 즉, 반지름을 최소화하면서 많은 정상 데이터를 둘러싸는 것을 목표로합니다.

 

SVDD의 Objective 함수는 다음과 같습니다.

Parameters

R : radius(반지름),    c : center point,   \({\xi}\): penalty term for soft margin

Hyper-parameters

\({\phi_k}\): Kernel Function(기존 Feature들의 차원을 변환)

v: Trade-off between volume of sphere and violations of boundary

 

D차원 입력 공간이 n개의 데이터(X)로 구성되어 있을 때, 중심이 c이고 반경이 R인 구를 이용하여 그림1과 같이 학습 클래스의 영역을 표현합니다. 이후, 각 학습 데이터 \(X_i\)를 Kernel Function으로 맵핑시킨 \({\phi_k}\)(\(x_i)\)와 중심 c사이의 거리가 반지름인 R을 초과하는 경우 Penalty를 부과합니다. 따라서, 위 식을 최적화하여 최소한의 반지름(R)을 가지는 구를 찾습니다.

v는 구의 크기를 결정하며, 반지름(R)과 오차간의 Trade-off 관계에 있습니다. v가 작아질수록 구의 크기가 커져서 대부분의 정상 클래스가 해당 구안에 들어가게 되지만, 비정상 클래스가 섞여 들어갈 수 있게 됩니다.

반면에, v가 커질수록 구가 작아지게 되고 이로인해 비정상 클래스를 잘 검출할 수 있지만 몇몇 정상 클래스가 구 경계면 바깥으로 나갈 수 있게 됩니다.

따라서, 위 식을 최적화하여 얻은 초구(Hypersphere)의 경계면을 기준으로 정상과 비정상을 분류합니다.

 

 


Deep SVDD

Deep SVDD는 기존에 Kernel을 기반으로한 SVDD와는 다르게 딥러닝을 기반으로 Feature space를 학습하고, 해당 Feature space에서 정상 데이터를 둘러싸는 최적의 구(반지름이 작은)를 찾습니다. 이후 해당 경계면을 기반으로 이상치를 탐지합니다.

Deep SVDD 중 One-Class Deep SVDD의 목적식은 다음과 같습니다.

먼저, 왼쪽항 같은 경우에는 구의 중심 c와 변환된 차원의 데이터 point사이의 거리를 최소화합니다.

오른쪽 항은 weight decay regularizer항으로 특정 가중치가 비정상적으로 커지는 것을 방지하고 결과적으로 최적화를 통해 각 데이터 point들이 구의 중심 c에 가깝게 Mapping되도록 학습합니다.

 

저도 처음에는 위 수식만으로는 DeepSVDD가 결국 무엇을 하려는건지 잘 이해가 되지 않았으나, 아래 그림을 보고 해당 내용의 의미를 알 수 있었습니다.

위 그림을 보면, 왼쪽에 있는 기본 차원의 데이터 포인트들이 정상과 비정상 클래스로 나뉘어 분포하고 있습니다. SVDD에서는 이것을 \({\phi}\)라는 Kernel Function을 통해 오른쪽과 같이 정상 비정상 클래스를 잘 구분할 수 있는 다른 차원으로 데이터 포인트들을 Mapping하였습니다.

Deep SVDD는 이 Kernel Function 부분을 Deep Learning으로 대체하여, 기존 데이터 포인트들을 Mapping하는 Weight들을 학습시키겠다는 것입니다.

그리고, 그렇게 학습된 Weight들을 이용하여 데이터포인트들을 Mapping시킨 차원에서 Classifier를 이용해 분류를 하는 것입니다.

 

 


Deep SVDD 코드구현

1. Import 라이브러리

import numpy as np
import easydict 
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils import data
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from PIL import Image
from sklearn.metrics import roc_auc_score

 

2. 데이터셋 불러오기 및 전처리

class MNIST_loader(data.Dataset):
    """Preprocessing을 포함한 dataloader를 구성"""
    def __init__(self, data, target, transform):
        self.data = data
        self.target = target
        self.transform = transform

    def __getitem__(self, index):
        x = self.data[index]
        y = self.target[index]
        if self.transform:
            x = Image.fromarray(x.numpy(), mode='L')
            x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.data)


def get_mnist(args, data_dir='../data/'):
    """get dataloders"""
    # min, max values for each class after applying GCN (as the original implementation)
    min_max = [(-0.8826567065619495, 9.001545489292527),
                (-0.6661464580883915, 20.108062262467364),
                (-0.7820454743183202, 11.665100841080346),
                (-0.7645772083211267, 12.895051191467457),
                (-0.7253923114302238, 12.683235701611533),
                (-0.7698501867861425, 13.103278415430502),
                (-0.778418217980696, 10.457837397569108),
                (-0.7129780970522351, 12.057777597673047),
                (-0.8280402650205075, 10.581538445782988),
                (-0.7369959242164307, 10.697039838804978)]

    transform = transforms.Compose([transforms.ToTensor(),
                                    transforms.Lambda(lambda x: global_contrast_normalization(x)),
                                    transforms.Normalize([min_max[args.normal_class][0]],
                                                         [min_max[args.normal_class][1] \
                                                         -min_max[args.normal_class][0]])])
    train = datasets.MNIST(root=data_dir, train=True, download=True)
    test = datasets.MNIST(root=data_dir, train=False, download=True)

    x_train = train.data
    y_train = train.targets

    x_train = x_train[np.where(y_train==args.normal_class)]
    y_train = y_train[np.where(y_train==args.normal_class)]
                                    
    data_train = MNIST_loader(x_train, y_train, transform)
    dataloader_train = DataLoader(data_train, batch_size=args.batch_size, 
                                  shuffle=True, num_workers=0)
    
    x_test = test.data
    y_test = test.targets
    
    # Normal class인 경우 0으로 바꾸고, 나머지는 1로 변환 (정상 vs 비정상 class)
    y_test = np.where(y_test==args.normal_class, 0, 1)

    data_test = MNIST_loader(x_test, y_test, transform)
    dataloader_test = DataLoader(data_test, batch_size=args.batch_size, 
                                  shuffle=False, num_workers=0)
    return dataloader_train, dataloader_test
    
    
def global_contrast_normalization(x):
    """Apply global contrast normalization to tensor. """
    mean = torch.mean(x)  # mean over all features (pixels) per sample
    x -= mean
    x_scale = torch.mean(torch.abs(x))
    x /= x_scale
    return x

논문에서는 MNIST 데이터셋과 CIFAR-10 데이터셋을 실험하였고, 이번 포스팅에서는 MNIST 데이터셋을 가지고 실험하였습니다. 위 코드는 MNIST 데이터셋을 Load하고 약간의 전처리를 한 것입니다.

 

3. 모델 구축

class DeepSVDD_network(nn.Module):
    def __init__(self, z_dim=32):
        super(DeepSVDD_network, self).__init__()
        self.pool = nn.MaxPool2d(2, 2)

        self.conv1 = nn.Conv2d(1, 8, 5, bias=False, padding=2)
        self.bn1 = nn.BatchNorm2d(8, eps=1e-04, affine=False)
        self.conv2 = nn.Conv2d(8, 4, 5, bias=False, padding=2)
        self.bn2 = nn.BatchNorm2d(4, eps=1e-04, affine=False)
        self.fc1 = nn.Linear(4 * 7 * 7, z_dim, bias=False)

    def forward(self, x):
        x = self.conv1(x)
        x = self.pool(F.leaky_relu(self.bn1(x)))
        x = self.conv2(x)
        x = self.pool(F.leaky_relu(self.bn2(x)))
        x = x.view(x.size(0), -1)
        return self.fc1(x)


class pretrain_autoencoder(nn.Module):
    def __init__(self, z_dim=32):
        super(pretrain_autoencoder, self).__init__()
        self.z_dim = z_dim
        self.pool = nn.MaxPool2d(2, 2)

        self.conv1 = nn.Conv2d(1, 8, 5, bias=False, padding=2)
        self.bn1 = nn.BatchNorm2d(8, eps=1e-04, affine=False)
        self.conv2 = nn.Conv2d(8, 4, 5, bias=False, padding=2)
        self.bn2 = nn.BatchNorm2d(4, eps=1e-04, affine=False)
        self.fc1 = nn.Linear(4 * 7 * 7, z_dim, bias=False)

        self.deconv1 = nn.ConvTranspose2d(2, 4, 5, bias=False, padding=2)
        self.bn3 = nn.BatchNorm2d(4, eps=1e-04, affine=False)
        self.deconv2 = nn.ConvTranspose2d(4, 8, 5, bias=False, padding=3)
        self.bn4 = nn.BatchNorm2d(8, eps=1e-04, affine=False)
        self.deconv3 = nn.ConvTranspose2d(8, 1, 5, bias=False, padding=2)
        
    def encoder(self, x):
        x = self.conv1(x)
        x = self.pool(F.leaky_relu(self.bn1(x)))
        x = self.conv2(x)
        x = self.pool(F.leaky_relu(self.bn2(x)))
        x = x.view(x.size(0), -1)
        return self.fc1(x)
   
    def decoder(self, x):
        x = x.view(x.size(0), int(self.z_dim / 16), 4, 4)
        x = F.interpolate(F.leaky_relu(x), scale_factor=2)
        x = self.deconv1(x)
        x = F.interpolate(F.leaky_relu(self.bn3(x)), scale_factor=2)
        x = self.deconv2(x)
        x = F.interpolate(F.leaky_relu(self.bn4(x)), scale_factor=2)
        x = self.deconv3(x)
        return torch.sigmoid(x)
        

    def forward(self, x):
        z = self.encoder(x)
        x_hat = self.decoder(z)
        return x_hat

위 코드는 네트워크의 구조를 선언한 것입니다. DeepSVDD_network와 pretrain_autoencoder 두 모델로 구성되어 있습니다. pretrain_autoencoder는 위에서 언급했던 데이터포인트를 맵핑하는 Kernel Function을 딥러닝으로 대체하는 부분입니다. 즉, pretrain_autoencoder로 기존 데이터 포인트들의 Feature space를 잘 Mapping할 수 있도록 하는 Weight를 먼저 학습하는 것이라고 생각하시면 됩니다.

그리고, DeepSVDD_network는 pretrain 모델로 학습된 Mapping Weight들을 가지고, 분류를 하는 Classifier입니다.

코드를 자세히 살펴보시면, DeepSVDD의 모델과 pretrain_autoencoder구조의 Encoder부분은 완전히 동일한 것을 볼 수 있습니다. 이 부분을 기억하시면서 아래 Train 함수 부분을 살펴봐주시면 감사하겠습니다.

 

4. Train 함수 정의

class TrainerDeepSVDD:
    def __init__(self, args, data_loader, device):
        self.args = args
        self.train_loader = data_loader
        self.device = device

    def pretrain(self):
        """ DeepSVDD 모델에서 사용할 가중치를 학습시키는 AutoEncoder 학습 단계"""
        ae = pretrain_autoencoder(self.args.latent_dim).to(self.device)
        ae.apply(weights_init_normal)
        optimizer = torch.optim.Adam(ae.parameters(), lr=self.args.lr_ae,
                               weight_decay=self.args.weight_decay_ae)
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, 
                    milestones=self.args.lr_milestones, gamma=0.1)
        
        ae.train()
        for epoch in range(self.args.num_epochs_ae):
            total_loss = 0
            for x, _ in self.train_loader:
                x = x.float().to(self.device)
                
                optimizer.zero_grad()
                x_hat = ae(x)
                reconst_loss = torch.mean(torch.sum((x_hat - x) ** 2, dim=tuple(range(1, x_hat.dim()))))
                reconst_loss.backward()
                optimizer.step()
                
                total_loss += reconst_loss.item()
            scheduler.step()
            print('Pretraining Autoencoder... Epoch: {}, Loss: {:.3f}'.format(
                   epoch, total_loss/len(self.train_loader)))
        self.save_weights_for_DeepSVDD(ae, self.train_loader) 
    

    def save_weights_for_DeepSVDD(self, model, dataloader):
        """학습된 AutoEncoder 가중치를 DeepSVDD모델에 Initialize해주는 함수"""
        c = self.set_c(model, dataloader)
        net = DeepSVDD_network(self.args.latent_dim).to(self.device)
        state_dict = model.state_dict()
        net.load_state_dict(state_dict, strict=False)
        torch.save({'center': c.cpu().data.numpy().tolist(),
                    'net_dict': net.state_dict()}, '../weights/pretrained_parameters.pth')
    

    def set_c(self, model, dataloader, eps=0.1):
        """Initializing the center for the hypersphere"""
        model.eval()
        z_ = []
        with torch.no_grad():
            for x, _ in dataloader:
                x = x.float().to(self.device)
                z = model.encoder(x)
                z_.append(z.detach())
        z_ = torch.cat(z_)
        c = torch.mean(z_, dim=0)
        c[(abs(c) < eps) & (c < 0)] = -eps
        c[(abs(c) < eps) & (c > 0)] = eps
        return c

    def train(self):
        """Deep SVDD model 학습"""
        net = DeepSVDD_network().to(self.device)
        
        if self.args.pretrain==True:
            state_dict = torch.load('../weights/pretrained_parameters.pth')
            net.load_state_dict(state_dict['net_dict'])
            c = torch.Tensor(state_dict['center']).to(self.device)
        else:
            net.apply(weights_init_normal)
            c = torch.randn(self.args.latent_dim).to(self.device)
        
        optimizer = torch.optim.Adam(net.parameters(), lr=self.args.lr,
                               weight_decay=self.args.weight_decay)
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, 
                    milestones=self.args.lr_milestones, gamma=0.1)

        net.train()
        for epoch in range(self.args.num_epochs):
            total_loss = 0
            for x, _ in self.train_loader:
                x = x.float().to(self.device)

                optimizer.zero_grad()
                z = net(x)
                loss = torch.mean(torch.sum((z - c) ** 2, dim=1))
                loss.backward()
                optimizer.step()

                total_loss += loss.item()
            scheduler.step()
            print('Training Deep SVDD... Epoch: {}, Loss: {:.3f}'.format(
                   epoch, total_loss/len(self.train_loader)))
        self.net = net
        self.c = c

        return self.net, self.c
        
def weights_init_normal(m):
    classname = m.__class__.__name__
    if classname.find("Conv") != -1 and classname != 'Conv':
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find("Linear") != -1:
        torch.nn.init.normal_(m.weight.data, 0.0, 0.02)

위 코드는 Train 함수를 정의하는 부분입니다. 하나씩 자세히 살펴보겠습니다.

    def pretrain(self):
        """ DeepSVDD 모델에서 사용할 가중치를 학습시키는 AutoEncoder 학습 단계"""
        ae = pretrain_autoencoder(self.args.latent_dim).to(self.device)
        ae.apply(weights_init_normal)
        optimizer = torch.optim.Adam(ae.parameters(), lr=self.args.lr_ae,
                               weight_decay=self.args.weight_decay_ae)
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, 
                    milestones=self.args.lr_milestones, gamma=0.1)
        
        ae.train()
        for epoch in range(self.args.num_epochs_ae):
            total_loss = 0
            for x, _ in self.train_loader:
                x = x.float().to(self.device)
                
                optimizer.zero_grad()
                x_hat = ae(x)
                reconst_loss = torch.mean(torch.sum((x_hat - x) ** 2, dim=tuple(range(1, x_hat.dim()))))
                reconst_loss.backward()
                optimizer.step()
                
                total_loss += reconst_loss.item()
            scheduler.step()
            print('Pretraining Autoencoder... Epoch: {}, Loss: {:.3f}'.format(
                   epoch, total_loss/len(self.train_loader)))
        self.save_weights_for_DeepSVDD(ae, self.train_loader) 

먼저 pretrain 부분은, 앞서 정의했었던 pretrain_autoencoder, 즉 Kernel Function대신 데이터 포인트를 맵핑해주는 딥러닝 모델입니다. 해당 모델은 MNIST(정상:0, 비정상:1~9) 이미지가 주어졌을 때, 정상 이미지(0번)만을 입력으로 하여 해당 이미지를 다시 복원시킨 후 원본 이미지와의 픽셀별로 차이를 구해 Loss로 사용합니다. 즉, AutoEncoder 구조를 이용하여 Reconstruction Error를 최소화하는 방향으로 학습이 됩니다. 이 Reconstruction Error를 최소화한다는 의미는 정상 클래스 이미지의 Feature space를 저차원의 공간에서도 잘 표현(Representation)될 수 있도록 학습한다는 것을 의미합니다. 따라서, 이렇게 다른 공간에서 해당 정상 클래스들이 Representation이 잘 된다면 분류 경계면이 다른 클래스들과 뚜렷해질 수 있음을 의미합니다.

 

    def save_weights_for_DeepSVDD(self, model, dataloader):
        """학습된 AutoEncoder 가중치를 DeepSVDD모델에 Initialize해주는 함수"""
        c = self.set_c(model, dataloader)
        net = DeepSVDD_network(self.args.latent_dim).to(self.device)
        state_dict = model.state_dict()
        net.load_state_dict(state_dict, strict=False)
        torch.save({'center': c.cpu().data.numpy().tolist(),
                    'net_dict': net.state_dict()}, '../weights/pretrained_parameters.pth')

    def train(self):
        """Deep SVDD model 학습"""    
        net = DeepSVDD_network().to(self.device)
        
        if self.args.pretrain==True:
            state_dict = torch.load('../weights/pretrained_parameters.pth')
            net.load_state_dict(state_dict['net_dict'])
            c = torch.Tensor(state_dict['center']).to(self.device)
        else:
            net.apply(weights_init_normal)
            c = torch.randn(self.args.latent_dim).to(self.device)
        
        optimizer = torch.optim.Adam(net.parameters(), lr=self.args.lr,
                               weight_decay=self.args.weight_decay)
        scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, 
                    milestones=self.args.lr_milestones, gamma=0.1)

        net.train()
        for epoch in range(self.args.num_epochs):
            total_loss = 0
            for x, _ in self.train_loader:
                x = x.float().to(self.device)

                optimizer.zero_grad()
                z = net(x)
                loss = torch.mean(torch.sum((z - c) ** 2, dim=1))
                loss.backward()
                optimizer.step()

                total_loss += loss.item()
            scheduler.step()
            print('Training Deep SVDD... Epoch: {}, Loss: {:.3f}'.format(
                   epoch, total_loss/len(self.train_loader)))
        self.net = net
        self.c = c

        return self.net, self.c

위 함수는 AutoEncoder로 학습된 Weight를 불러와서 분류하는 Classifier를 학습시키는 함수입니다.

아까 위에서 DeepSVDD와 pretrain모델의 Encoder구조가 서로 동일하다고 했었습니다. save_weights_for_DeepSVDD 함수의 중간에 net.load_state_dict(state_dict, strict=False)부분을 보시면 net은 DeepSVDD 모델이고 model은 pretrain_autoencoder 모델입니다. 즉, model의 state_dict (Weights)를 DeepSVDD모델인 net에 적용하겠다는 것이고 strict=False 옵션은 pretrain 모델의 Decoder부분은 제외하고 Weights들의 형식이 맞는 부분인 Encoder부분의 가중치만을 사용하겠다는 것입니다. 그렇게 가중치를 불러온 후 train 함수를 통해 모델을 학습시킵니다.

 

 

5. 학습

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
args = easydict.EasyDict({
       'num_epochs':50,
       'num_epochs_ae':50,
       'lr':1e-3,
       'lr_ae':1e-3,
       'weight_decay':5e-7,
       'weight_decay_ae':5e-3,
       'lr_milestones':[50],
       'batch_size':1024,
       'pretrain':True,
       'latent_dim':32,
       'normal_class':0
                })

if __name__ == '__main__':

    # Train/Test Loader 불러오기
    dataloader_train, dataloader_test = get_mnist(args)

    # Network 학습준비, 구조 불러오기
    deep_SVDD = TrainerDeepSVDD(args, dataloader_train, device)

    # DeepSVDD를 위한 DeepLearning pretrain 모델로 Weight 학습
    if args.pretrain:
        deep_SVDD.pretrain()

    # 학습된 가중치로 Deep_SVDD모델 Train
    net, c = deep_SVDD.train()

이후, pretrain모델과 DeepSVDD모델을 학습할 Hyper-Parameter를 설정합니다.

pretrain옵션이 True인 경우에 pretrain 모델을 학습한 가중치로 Deep SVDD모델을 학습하고, False인 경우는 DeepSVDD Classifier를 처음부터 학습하게 됩니다.

 

6. 평가 및 결과

def eval(net, c, dataloader, device):
    """Testing the Deep SVDD model"""

    scores = []
    labels = []
    net.eval()
    print('Testing...')
    with torch.no_grad():
        for x, y in dataloader:
            x = x.float().to(device)
            z = net(x)
            score = torch.sum((z - c) ** 2, dim=1)

            scores.append(score.detach().cpu())
            labels.append(y.cpu())
    labels, scores = torch.cat(labels).numpy(), torch.cat(scores).numpy()
    print('ROC AUC score: {:.2f}'.format(roc_auc_score(labels, scores)*100))
    return labels, scores

평가를 측정하는 Metric은 AUC Score를 사용하였고, Deep SVDD Classifier를 Pretrain한 가중치를 사용하는 것과 Pretrain을 하지 않고 Classifier를 학습한 것의 성능을 비교하였습니다.

 

Pretrain=False

 

Pretrain=True

비슷하게 비교하기 위해 Pretrain을 하지 않은 모델은 150Epoch을 학습하였고, Pretrain 가중치를 사용한 모델은 Pretrain에 75Epoch, Classifier학습에 75Epoch을 진행하였습니다. 위 결과에서 볼 수 있듯이 Pretrain 가중치를 사용한 아래의 모델이 ROC AUC score가 98.67로 98.25보다 약간 높은 것을 확인할 수 있었습니다.

AUC score뿐만 아니라 Loss를 확인해봐도 0.013 > 0.004로 적은 Epoch을 학습했음에도 더 낮은 Loss를 가지는 것을 확인할 수 있었습니다.