Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
# -*- coding: utf-8 -*-
"""
HW5_V4.ipynb
"""
# The code for this Mini Challenge Project (HW5) is modified based on the following resources:
# (1) https://github.com/google/automl/tree/master/efficientnetv2
# (2) https://github.com/lukemelas/EfficientNet-PyTorch
# (3) https://arxiv.org/abs/2104.00298
import torch
import torchvision
import os
import random
import numpy as np
import math
import cv2
import pandas as pd
import csv
import glob
import argparse
from argparse import ArgumentParser
from facenet_pytorch import fixed_image_standardization, MTCNN
from torch.optim.lr_scheduler import _LRScheduler
from tqdm import tqdm
from pathlib import Path
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from torch.optim import Adam, AdamW
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from PIL import Image
import torchvision.transforms.functional as TF
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings("ignore")
torch.manual_seed(20)
# get img from the large dataset
directory = 'Mini_Challenge/Large_Dataset/'
img_list = os.listdir(directory)
random.shuffle(img_list)
# convert the image
def Check_Is_img_numpy(I):
return isinstance(I, np.ndarray) and (I.ndim in {2, 3})
def Img2Tensor(I):
if not (Check_Is_img_numpy(I)):
raise TypeError('Error! Format Incorrect')
if I.ndim == 2:
I = I[:, :, None]
if I.dtype == np.uint8:
I = I.astype(np.float32)/255
elif I.dtype == np.uint16:
I = I.astype(np.float32)/65535
elif I.dtype in [np.float32, np.float64]:
I = I.astype(np.float32)/1
else:
raise TypeError('Error! Img Value Format Incorrect')
I = torch.from_numpy(I.transpose((2, 0, 1)))
return I
# Define schedulers
def get_position_from_periods(iteration, cumulative_period):
for i, period in enumerate(cumulative_period):
if iteration <= period:
return i
# Define dataset
class FaceCollection(Dataset):
def __init__(self, root="Mini_Challenge/", ratio_val=0.15, mode='train', transform=None):
self.root = root
self.mode = mode
self.transform = transform
if mode=="test":
self.mtcnn = MTCNN(margin=20, keep_all=False, post_process=False)
train_files = glob.glob(os.path.join(root, 'Large_Dataset', '*'))
total_files = len(train_files)
idx_stop = int(total_files * ratio_val)
if mode == 'train':
self.input_files = train_files[idx_stop:]
self.len = total_files - idx_stop
elif mode == 'val':
self.input_files = train_files[:idx_stop]
self.len = idx_stop
elif mode == 'test':
self.input_files = [os.path.join(root, 'test', f'{i}.jpg') for i in range(4977)]
self.len = len(self.input_files)
# Get Labels from the given category file
with open('Mini_Challenge/category.csv','r') as f:
self.labels = f.readlines()[1:]
self.labels = [l.split(',')[1] for l in self.labels]
self.labels = [l[:-1] for l in self.labels]
with open('Mini_Challenge/train.csv','r') as f:
self.trian_labels = f.readlines()[1:]
self.train_dict = {}
for l in self.trian_labels:
self.train_dict[l.split(',')[1]]=l.split(',')[2][:-1]
def __len__(self):
return self.len
def __getitem__(self, index):
img_name = self.input_files[index].split('/')[-1]
if self.transform is None:
img_input = Image.open(self.input_files[index]).convert('RGB').resize((160, 160))
else:
img_input = Image.open(self.input_files[index]).convert('RGB')
if self.mode == "test":
gt = 0
else:
gt = self.labels.index(self.train_dict[img_name])
gt = torch.LongTensor([gt])
if self.mode == "test":
face = self.mtcnn(img_input)
if face is not None:
img_input = face/255
else:
img_input = TF.Img2Tensor(img_input)
else:
img_input = TF.Img2Tensor(img_input)
if self.mode == "train":
hflip = random.random()
if hflip > 0.5:
img_input = img_input.transpose(Image.FLIP_LEFT_RIGHT)
if self.transform is None:
img_input = fixed_image_standardization(img_input)
else:
img_input = self.transform(img_input)
return {"img_name": img_name, "img_input": img_input, "gt": gt}
class Trainer:
def __init__(self):
parser = ArgumentParser()
# Deifine argument values for training
self.epoch = 3
self.n_epochs = 100
self.lr_decay = 0.5
self.step_size = 20
self.batch_size = 32
self.lr = 1e-3
self.ckpt_interval = 1
self.log_interval = 200
self.output_dir = "results"
self.tensorboard = "train_tb"
self.root = "./Mini_Challenge/"
parser.add_argument("--mode", type=str, default="train", help="train/val/test")
self.scheduler = "step"
self.model = "efficientnet"
self.opt, unknown = parser.parse_known_args()
self.eps = 1e-6
self.opt.n_cpu = self.opt.batch_size//8
# This is the modification code from https://github.com/lukemelas/EfficientNet-PyTorch
transform = torchvision.transforms.Compose([torchvision.transforms.Resize(size=(224, 224),
interpolation=torchvision.transforms.InterpolationMode.BICUBIC),
torchvision.transforms.CenterCrop(224),
torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])])
if self.opt.mode != "test":
self.train_dataset = FaceCollection(mode='train', transform=self.transform)
self.train_loader = DataLoader(self.train_dataset, batch_size=self.opt.batch_size, shuffle=True, num_workers=self.opt.n_cpu, drop_last=False, pin_memory=True)
Test_data = FaceCollection(mode='test', transform=self.transform)
self.test_loader = DataLoader(Test_data, batch_size=1, shuffle=False, pin_memory=True)
Val_data = FaceCollection(mode='val', transform=self.transform)
self.val_loader = DataLoader(Val_data, batch_size=1, shuffle=False, pin_memory=True)
# initialize the model
self.init_model_optimizer()
def load_model(self, e):
self.model.load_state_dict(torch.load(os.path.join(self.opt.root, self.opt.output_dir, 'ckpts', self.opt.log_name, 'model_' + str(int(e)) + ".pth")))
def Prediction(self, img):
pred = self.model(img)
return pred
def init_model_optimizer(self):
self.criterion = nn.CrossEntropyLoss()
if self.opt.model == "efficientnet":
self.model = torchvision.models.efficientnet_v2_s(weights = 'IMAGENET1K_V1')
self.model.classifier[1] = nn.Linear(1280, 100)
if self.opt.mode == "train":
self.optimizer = Adam(self.model.parameters(), lr=self.opt.lr)
if self.opt.scheduler == "step":
self.scheduler = StepLR(self.optimizer, step_size=self.opt.step_size, gamma=self.opt.lr_decay)
self.Max_Accuracy, self.Max_Epoch = 0, 0
def train_a_epoch(self, e, data_loader):
loss_print=0
self.model.train()
for i, train_data in enumerate(tqdm(data_loader)):
self.optimizer.zero_grad()
img = train_data["img_input"]
gt = train_data["gt"]
pred = self.Prediction(img)
# back propogation
loss = self.criterion(pred, gt.squeeze(1))
loss.backward()
loss_print+=loss.item()
self.optimizer.step()
# validation, save checkpoint
if e % self.opt.ckpt_interval == 0:
# validation
best_epoch = self.validate(e, self.val_loader)
if best_epoch==e:
ckpt_model_filename = 'model_' + str(e) + ".pth"
ckpt_model_path = os.path.join(self.opt.root, self.opt.output_dir,
'ckpts', self.opt.log_name, ckpt_model_filename)
directory = os.path.join(self.opt.root, self.opt.output_dir, 'ckpts', self.opt.log_name)
if not os.path.exists(directory):
Path(directory).mkdir(parents=True, exist_ok=True)
state = self.model.state_dict()
torch.save(state, ckpt_model_path)
print("model saved to %s" % ckpt_model_path)
self.scheduler.step()
self.writer.close()
return best_epoch
def train(self):
if self.opt.mode == "train":
for e in range(1, int(self.opt.n_epochs)+1):
best_epoch = self.train_a_epoch(e, self.train_loader)
self.test(best_epoch, file_name=self.opt.log_name)
elif self.opt.mode == "test":
self.load_model(self.opt.epoch)
self.test(self.opt.epoch, file_name=self.opt.log_name)kljk;ioj;il*__________//_________
def validate(self, e, val_loader):
self.model.eval()
with torch.no_grad():
out_results = []
L_All = []
for i, val_data in enumerate(tqdm(val_loader)):
img = val_data["img_input"]
gt = val_data["gt"]
pred = self.Prediction(img)
pred = F.softmax(pred, dim=1)
prediction = [int(torch.argmax(output).cpu()) for output in pred]
labels = [int(label.cpu()) for label in gt]
out_results = out_results + prediction
L_All = L_All + labels
Accuracy = accuracy_score(L_All, out_results)
print("Accuracy: ", Accuracy)
if self.Max_Accuracy < Accuracy:
self.Max_Accuracy = Accuracy
self.Max_Epoch = e
self.model.train()
return self.Max_Epoch
def test(self, e, file_name):
with open('Mini_Challenge/category.csv','r') as f:
self.labels = f.readlines()[1:]
self.labels = [l.split(',')[1] for l in self.labels]
self.labels = [l[:-1] for l in self.labels]
with torch.no_grad():
self.load_model(e)
self.model.eval()
pred_id = []
pred_name = []
for _, test_data in enumerate(tqdm(self.test_loader)):
img = test_data["img_input"]
img_name = test_data["img_name"][0]
pred = self.Prediction(img)
pred = F.softmax(pred, dim=1)
pred_id = [int(torch.argmax(output).cpu()) for output in pred][0]
pred_id.append(self.labels[pred_id])
pred_name.append(img_name[:-4])
csv_data = {'Id': pred_name, 'Category': pred_id}
df = pd.DataFrame(csv_data)
df.to_csv('./Mini_Challenge/training/results.csv',index=False)
# This is the main Python Module
# Compile here
if __name__ == '__main__':
Trainer = Trainer()
Trainer.train()