Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
# -*- coding: utf-8 -*-
"""ML_FR(1).ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1Uhi5Fj55FUNv6glB-e4-GSjtPRP6YG_x
"""
import torchvision
import matplotlib.pyplot as plt
import os, random
directory = 'ML_FR/train_face/'
image_list = os.listdir(directory)
random.shuffle(image_list)
fig, axs = plt.subplots(1, 5, figsize=(22, 22))
cnt = 0
for i in range(5):
gen_imgs = plt.imread(directory+image_list[i])
axs[i].imshow(gen_imgs)
axs[i].axis('off')
# cnt += 1
plt.show()
import random
import torch
from torch.utils.data import Dataset
import glob
from PIL import Image
import torchvision.transforms.functional as TF
from facenet_pytorch import fixed_image_standardization, MTCNN
import csv
import cv2
import os
class Celeb70K(Dataset):
def __init__(self, root="ML_FR/", val_rate=0.15, mode='train', transform=None):
if mode=="test":
self.mtcnn = MTCNN(margin=20, keep_all=False, post_process=False)
self.root = root
self.mode = mode
self.transform = transform
train_files = glob.glob(self.root+'train_face/*')
length = len(train_files)
self.cut_index = int(length*val_rate)
if mode == 'train':
self.input_files = train_files[self.cut_index:]
self.len = length-self.cut_index
elif mode == 'val':
self.input_files = train_files[:self.cut_index]
self.len = self.cut_index
elif mode == 'test':
self.input_files = []
for i in range(0, 4977):
self.input_files.append(self.root+'test/'+str(i)+'.jpg')
self.len = len(self.input_files)
with open('ML_FR/category.csv','r') as f:
self.labels = f.readlines()[1:]
self.labels = [l.split(',')[1] for l in self.labels]
self.labels = [l[:-1] for l in self.labels]
with open('ML_FR/train.csv','r') as f:
self.trian_labels = f.readlines()[1:]
self.train_dict = {}
for l in self.trian_labels:
self.train_dict[l.split(',')[1]]=l.split(',')[2][:-1]
def __len__(self):
return self.len
def __getitem__(self, index):
img_name = self.input_files[index].split('/')[-1]
if self.transform is None:
img_input = Image.open(self.input_files[index]).convert('RGB').resize((160, 160))
else:
img_input = Image.open(self.input_files[index]).convert('RGB')
if self.mode == "test":
gt = 0
else:
gt = self.labels.index(self.train_dict[img_name])
gt = torch.LongTensor([gt])
if self.mode == "train":
hflip = random.random()
if hflip > 0.5:
img_input = img_input.transpose(Image.FLIP_LEFT_RIGHT)
if self.mode == "test":
face = self.mtcnn(img_input)
if face is not None:
img_input = face/255
else:
img_input = TF.to_tensor(img_input)
else:
img_input = TF.to_tensor(img_input)
if self.transform is None:
img_input = fixed_image_standardization(img_input)
else:
img_input = self.transform(img_input)
return {"img_name": img_name, "img_input": img_input, "gt": gt}
import os
import random
import argparse
from tqdm import tqdm
from pathlib import Path
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, AdamW
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
import torchvision
import pandas as pd
from facenet_pytorch import InceptionResnetV1
from sklearn.metrics import accuracy_score
import warnings
warnings.filterwarnings("ignore")
torch.manual_seed(13)
random.seed(13)
class Trainer:
def __init__(self):
parser = ArgumentParser()
parser.add_argument("--epoch", type=int, default=3)
parser.add_argument("--n_epochs", type=int, default=100)
parser.add_argument("--lr_decay", type=float, default=0.5)
parser.add_argument("--step_size", type=int, default=20)
parser.add_argument("--batch_size", type=int, default=32)
parser.add_argument("--lr", type=float, default=1e-3)
parser.add_argument("--ckpt_interval", type=int, default=1)
parser.add_argument("--log_interval", type=int, default=200)
parser.add_argument("--output_dir", type=str, default="results")
parser.add_argument("--log_name", type=str, default="train_log")
parser.add_argument("--tensorboard", type=str, default="train_tb")
parser.add_argument("--root", type=str, default="./ML_FR/")
parser.add_argument("--mode", type=str, default="train")
parser.add_argument("--scheduler", type=str, default="step")
parser.add_argument("--model", type=str, default="efficientnet")
self.opt, unknown = parser.parse_known_args()
self.eps = 1e-6
self.opt.n_cpu = self.opt.batch_size//8
# (https://pytorch.org/vision/main/models/generated/torchvision.models.efficientnet_v2_s.html#torchvision.models.efficientnet_v2_s)
transform = torchvision.transforms.Compose([torchvision.transforms.Resize(size=(224, 224), interpolation=torchvision.transforms.InterpolationMode.BICUBIC),
torchvision.transforms.CenterCrop(224),
torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])])
if self.opt.model == "efficientnet":
self.transform = transform
else:
self.transform = None
if self.opt.mode != "test":
self.train_dataset = Celeb70K(mode='train', transform=self.transform)
self.train_loader = DataLoader(self.train_dataset, batch_size=self.opt.batch_size,
shuffle=True, num_workers=self.opt.n_cpu, drop_last=False, pin_memory=True)
self.writer = SummaryWriter(os.path.join(self.opt.root, self.opt.output_dir, 'logs', self.opt.tensorboard))
val_dataset = Celeb70K(mode='val', transform=self.transform)
self.val_loader = DataLoader(val_dataset, batch_size=self.opt.batch_size, shuffle=False, num_workers=self.opt.n_cpu, drop_last=False, pin_memory=True)
test_dataset = Celeb70K(mode='test', transform=self.transform)
self.test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, pin_memory=True)
self.init_model_optimizer()
self.prep_model()
def init_model_optimizer(self):
# loss function
self.criterion = nn.CrossEntropyLoss()
# initialize models
if self.opt.model == "facenet":
self.model = InceptionResnetV1(pretrained='vggface2', classify=True, num_classes=100)
elif self.opt.model == "efficientnet":
self.model = torchvision.models.efficientnet_v2_s(weights = 'IMAGENET1K_V1')
self.model.classifier[1] = nn.Linear(1280, 100)
# optimizer and scheduler
if self.opt.mode == "train":
self.optimizer = Adam(self.model.parameters(), lr=self.opt.lr)
if self.opt.scheduler == "step":
self.scheduler = StepLR(self.optimizer, step_size=self.opt.step_size, gamma=self.opt.lr_decay)
self.max_acc, self.max_epoch = 0, 0
def prep_model(self):
self.model = nn.DataParallel(self.model)
if self.opt.epoch!=0:
self.load_model(self.opt.epoch)
self.model.train()
def load_model(self, e):
self.model.load_state_dict(torch.load(os.path.join(self.opt.root, self.opt.output_dir,
'ckpts', self.opt.log_name, 'model_' + str(int(e)) + ".pth")))
def inference(self, img):
pred = self.model(img)
return pred
def train_a_epoch(self, e, data_loader):
loss_print=0
self.model.train()
for i, train_data in enumerate(tqdm(data_loader)):
self.optimizer.zero_grad()
img = train_data["img_input"]
gt = train_data["gt"]
pred = self.inference(img)
loss = self.criterion(pred, gt.squeeze(1))
loss.backward()
loss_print+=loss.item()
self.optimizer.step()
if (i+1) % self.opt.log_interval == 0:
self.writer.add_scalar("Loss/train", loss_print/self.opt.log_interval, i+e*len(data_loader))
print(self.optimizer.param_groups[-1]['lr'], e, i, ': ', loss_print/self.opt.log_interval)
with open('./ML_FR/results/logs/'+self.opt.log_name+'.txt', 'a') as f:
f.write("train: lr %.8f|%d|%d|%4.7f\n"%(self.optimizer.param_groups[-1]['lr'], e, i, loss_print/self.opt.log_interval))
loss_print=0.0
if e % self.opt.ckpt_interval == 0:
best_epoch = self.validate(e, self.val_loader)
if best_epoch==e:
ckpt_model_filename = 'model_' + str(e) + ".pth"
ckpt_model_path = os.path.join(self.opt.root, self.opt.output_dir,
'ckpts', self.opt.log_name, ckpt_model_filename)
directory = os.path.join(self.opt.root, self.opt.output_dir, 'ckpts', self.opt.log_name)
if not os.path.exists(directory):
Path(directory).mkdir(parents=True, exist_ok=True)
state = self.model.state_dict()
torch.save(state, ckpt_model_path)
print("model saved to %s" % ckpt_model_path)
self.scheduler.step()
self.writer.close()
return best_epoch
def train(self):
if self.opt.mode == "train":
# train n_epochs
for e in range(1, int(self.opt.n_epochs)+1):
best_epoch = self.train_a_epoch(e, self.train_loader)
# final test
self.test(best_epoch, file_name=self.opt.log_name)
elif self.opt.mode == "test":
# load model and test
self.load_model(self.opt.epoch)
print("test start")
self.test(self.opt.epoch, file_name=self.opt.log_name)
print("test end")
else:
# pure validation, typically we don't use this mode
for e in range(60, 161):
self.load_model(e)
print("validation %d start", e)
self.validate(self.opt.epoch, self.train_loader)
print("validation %d end", e)
# validation, update the best validation accuracy, the corresponding epoch
def validate(self, e, val_loader):
self.model.eval()
with torch.no_grad():
label_total = []
output_total = []
for i, val_data in enumerate(tqdm(val_loader)):
img = val_data["img_input"]
# .to(self.device)
gt = val_data["gt"]
# .to(self.device)
pred = self.inference(img)
pred = F.softmax(pred, dim=1)
prediction = [int(torch.argmax(output).cpu()) for output in pred]
labels = [int(label.cpu()) for label in gt]
label_total = label_total + labels
output_total = output_total + prediction
acc = accuracy_score(label_total, output_total)
print("val acc: ", acc)
if self.max_acc<acc:
self.max_acc = acc
self.max_epoch = e
print("Best: %f %d/%d"%(self.max_acc, self.max_epoch, e))
with open('./ML_FR/results/logs/'+self.opt.log_name+'.txt', 'a') as f:
f.write("Current: %f |EP %d \n"%(acc, e))
f.write("Best: %f %d |EP %d \n"%(self.max_acc, self.max_epoch, e))
self.model.train()
return self.max_epoch
# test based on the given epoch, save test results into a csv file
def test(self, e, file_name):
with open('ML_FR/category.csv','r') as f:
self.labels = f.readlines()[1:]
self.labels = [l.split(',')[1] for l in self.labels]
self.labels = [l[:-1] for l in self.labels]
with torch.no_grad():
self.load_model(e)
self.model.eval()
prediction_id = []
prediction_name = []
for _, test_data in enumerate(tqdm(self.test_loader)):
img = test_data["img_input"]
# .to(self.device)
img_name = test_data["img_name"][0]
pred = self.inference(img)
pred = F.softmax(pred, dim=1)
pred_id = [int(torch.argmax(output).cpu()) for output in pred][0]
prediction_id.append(self.labels[pred_id])
prediction_name.append(img_name[:-4])
csv = {'Id': prediction_name, 'Category': prediction_id}
df = pd.DataFrame(csv)
df.to_csv('./ML_FR/results/logs/'+file_name+'2.csv',index=False)
print("test done")
from argparse import ArgumentParser
import torchvision
if __name__ == '__main__':
Trainer = Trainer()
Trainer.train()
! ls ./ML_FR/results/
! ls ./ML_FR/results/logs/
predictions = pd.read_csv('./ML_FR/results/logs/train_log.csv')
predictions.head(100)