Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
HT-Kaggle_Challenge/ece500_kaggle_challenge.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
995 lines (864 sloc)
36.3 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""ECE500 Kaggle Challenge.ipynb | |
Automatically generated by Colaboratory. | |
Original file is located at | |
https://colab.research.google.com/drive/1m4GA5Ps-wkehiUpBwbj-v4_NGIKfelEH | |
# Create Functions | |
## Imports | |
""" | |
import os | |
import cv2 | |
import time | |
import datetime | |
import numpy as np | |
import pandas as pd | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
import torchvision | |
import torchvision.transforms as tvt | |
import matplotlib.pyplot as plt | |
from tqdm import tqdm | |
import warnings | |
from PIL import Image | |
from google.colab import files | |
"""## Create Face Detector""" | |
def img_size(img): | |
num_pix = 1 | |
for dim in img.shape: | |
num_pix *= dim | |
return num_pix | |
def visualize(img, faces, thickness=2): | |
if faces[1] is not None: | |
for idx, face in enumerate(faces[1]): | |
coords = face[:-1].astype(np.int32) | |
cv2.rectangle(img, (coords[0], coords[1]), | |
(coords[0]+coords[2], coords[1]+coords[3]), | |
(0, 255, 0), thickness) | |
cv2.circle(img, (coords[4], coords[5]), 2, (255, 0, 0), thickness) | |
cv2.circle(img, (coords[6], coords[7]), 2, (0, 0, 255), thickness) | |
cv2.circle(img, (coords[8], coords[9]), 2, (0, 255, 0), thickness) | |
cv2.circle(img, (coords[10], coords[11]), 2, (255, 0, 255), thickness) | |
cv2.circle(img, (coords[12], coords[13]), 2, (0, 255, 255), thickness) | |
## Modified version of example given here: | |
## https://docs.opencv.org/4.x/d0/dd4/tutorial_dnn_face.html | |
def extract_faces(img, img_shape=None, img_name='', no_faces='Img'): | |
if img_shape == None: | |
pass | |
elif isinstance(img_shape, int): | |
img_shape = (img_shape,img_shape) | |
elif isinstance(img_shape, (tuple, list)): | |
if not len(img_shape) == 2: | |
raise ValueError('img_shape must be None, int, or 2 element iterable') | |
else: | |
raise TypeError('img_shape must be None, int, or 2 element iterable') | |
# Scale down very large images | |
while (img.shape[1] > 800 or img.shape[0] > 800): | |
new_size = (int(img.shape[0]*0.9),int(img.shape[1]*0.9)) | |
img = cv2.resize(img, dsize=(new_size[1],new_size[0])) | |
# Define face detector | |
score_threshold = 0.8 # Filtering out faces of score < score_threshold. (default=0.9) | |
nms_threshold = 0.3 # Suppress bounding boxes of iou >= nms_threshold. (default=0.3) | |
top_k = 5000 # Keep top_k bounding boxes before NMS. (default=5000) | |
detector = cv2.FaceDetectorYN.create('./face_detection_yunet_2023mar.onnx', | |
"", (320,320), score_threshold, | |
nms_threshold, top_k) | |
detector.setInputSize((img.shape[1],img.shape[0])) | |
faces_locs = detector.detect(img) | |
face_imgs = [] | |
scores = [] | |
# Crop out and store each face in a list | |
if faces_locs[1] is not None: | |
for idx, face in enumerate(faces_locs[1]): | |
face = [max(0,n) for n in face] | |
(x, y, w, h) = face[:4] | |
# Get a square image of the face but keep it centered | |
tmp_face = img[int(y):int(y+h), int(x):int(x+w)] | |
face_imgs.append(tmp_face) | |
scores.append(face[-1]) | |
max_score = max(scores) | |
for i, s in enumerate(scores[1:]): | |
if ((s >= (max_score - 0.025)) and | |
(img_size(face_imgs[0]) < img_size(face_imgs[i+1]))): | |
tmp_img = face_imgs[0] | |
face_imgs[0] = face_imgs[i+1] | |
face_imgs[i+1] = tmp_img | |
elif no_faces == 'Img': # Use full image if no face detected | |
face_imgs.append(img) | |
# Resize image to specified shape if shape is specified | |
if img_shape is not None: | |
for tmp_face in face_imgs: | |
tmp_face = cv2.resize(tmp_face, dsize=img_shape) | |
return face_imgs | |
from PIL import Image | |
from tqdm import tqdm | |
import warnings | |
def extract_dataset_faces(src_root, dest_root, data_shape): | |
## Ensure usability of root paths | |
src_root = os.path.expanduser(src_root) | |
if src_root[-1] != '/': | |
src_root = src_root + '/' | |
dest_root = os.path.expanduser(dest_root) | |
if dest_root[-1] != '/': | |
dest_root = dest_root + '/' | |
if not os.path.exists(dest_root): ## Create destination root folder if needed | |
os.makedirs(dest_root) | |
else: ## Remove any files in the root to create clean dataset in folder | |
for file in os.listdir(dest_root): | |
file = dest_root + file | |
if os.path.isfile(file): | |
os.remove(file) | |
## Loop for all .jpg images in src_root | |
for fn in tqdm(os.listdir(src_root)): | |
if fn.endswith('.jpg'): | |
with warnings.catch_warnings(): # Suppress warning about RGBA | |
warnings.simplefilter("ignore") | |
img = Image.open(src_root + fn).convert('RGB') # Open image in RGB mode | |
img = np.asarray(img) # Convert to numpy array for face detection | |
face_img = extract_faces(img, img_shape=None, | |
img_name=fn, no_faces='None') # Detect face(s) | |
if len(face_img) > 0: # If face(s) detected save the best face | |
img = face_img[0] | |
img = cv2.resize(img, dsize=data_shape) | |
img = np.asarray(img) | |
img = Image.fromarray(img) | |
img.save(dest_root + fn) # Save extracted face with same filename | |
"""## Create Custom Dataset""" | |
## Custom Dataset Loads from preformatted data input | |
class MyDataset(torch.utils.data.Dataset): | |
def __init__(self, root, name2num, csv_file, transform=tvt.ToTensor(), | |
label_format='int'): | |
super().__init__() | |
if not root[-1] == '/': | |
root = root+'/' | |
self.root = root | |
self.transform = transform | |
## Add all .jpg files in the Dataset folder to filename list | |
self.filenames = [fn for fn in os.listdir(self.root) | |
if fn.endswith('.jpg')] | |
self.labels = {} | |
df = pd.read_csv(csv_file, dtype=str) | |
namesList = list(name2num.keys()) | |
for name in namesList: | |
for i, row in df.loc[df['Category'] == name].iterrows(): | |
if label_format == 'int': | |
# Store int label | |
self.labels[i] = name2num[name] | |
elif label_format == 'one_hot': | |
# Store one hot label | |
one_hot = [0] * len(namesList) | |
one_hot[name2num[name]] = 1 | |
self.labels[i] = one_hot | |
def __len__(self): | |
return len(self.filenames) # length of filenames is the number of images | |
def __getitem__(self, index): | |
if torch.is_tensor(index): | |
index = index.item() | |
# print(self.filenames[index]) | |
with warnings.catch_warnings(): # Suppress RGBA warning | |
warnings.simplefilter("ignore") | |
# Open image in RGB mode | |
img = Image.open(self.root + self.filenames[index]).convert('RGB') | |
img = (np.asarray(img) - 127.5) / 128 # -> np.array and compress values | |
# print(img.shape) | |
img_tensor = self.transform(img) # convert to tensor image | |
## Return image and label | |
return img_tensor, self.labels[index] | |
def train_val_split(train_dataset, classes, percent_val=10): | |
train_indices_path = "/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/train_indices.txt" | |
val_indices_path = "/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/val_indices.txt" | |
val_indices = [] | |
if (os.path.isfile(train_indices_path) and os.path.isfile(val_indices_path)): | |
train_indices = [] | |
with open(train_indices_path, "r") as f: | |
for line in f: | |
train_indices.append(int(line.strip())) | |
with open(val_indices_path, "r") as f: | |
for line in f: | |
val_indices.append(int(line.strip())) | |
val_size = int((len(train_dataset) * percent_val)//100) | |
val_per_class = int(val_size / len(classes)) | |
if (val_per_class * len(classes)) != len(val_indices): | |
batch_size = 1 | |
dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=False) | |
train_indices = range(len(train_dataset)) | |
val_counts = [0] * len(classes) | |
for index, (images, labels) in enumerate(dataloader): | |
label = labels[0].item() | |
if val_counts[label] < val_per_class: | |
val_indices.append(index) | |
val_counts[label] += 1 | |
train_indices = [x for x in train_indices if x not in val_indices] | |
with open(train_indices_path, "w") as f: | |
for idx in train_indices: | |
f.write(str(idx) +"\n") | |
with open(val_indices_path, "w") as f: | |
for idx in val_indices: | |
f.write(str(idx) +"\n") | |
return train_indices, val_indices | |
"""## Create Inception-ResNetV1 Model | |
### Blocks | |
""" | |
## Version of the Inception-ResNetV1 layers based on: | |
##https://github.com/timesler/facenet-pytorch/blob/master/models/inception_resnet_v1.py | |
class BasicConv2d(nn.Module): | |
def __init__(self, in_planes, out_planes, kernel_size, stride, padding=0): | |
super().__init__() | |
self.conv = nn.Conv2d( | |
in_planes, out_planes, | |
kernel_size=kernel_size, stride=stride, | |
padding=padding, bias=False | |
) # verify bias false | |
self.bn = nn.BatchNorm2d( | |
out_planes, | |
eps=0.001, # value found in tensorflow | |
momentum=0.1, # default pytorch value | |
affine=True | |
) | |
self.relu = nn.ReLU(inplace=False) | |
def forward(self, x): | |
x = self.conv(x) | |
x = self.bn(x) | |
x = self.relu(x) | |
return x | |
class Block35(nn.Module): | |
def __init__(self, scale=1.0): | |
super().__init__() | |
self.scale = scale | |
self.branch0 = BasicConv2d(256, 32, kernel_size=1, stride=1) | |
self.branch1 = nn.Sequential( | |
BasicConv2d(256, 32, kernel_size=1, stride=1), | |
BasicConv2d(32, 32, kernel_size=3, stride=1, padding=1) | |
) | |
self.branch2 = nn.Sequential( | |
BasicConv2d(256, 32, kernel_size=1, stride=1), | |
BasicConv2d(32, 32, kernel_size=3, stride=1, padding=1), | |
BasicConv2d(32, 32, kernel_size=3, stride=1, padding=1) | |
) | |
self.conv2d = nn.Conv2d(96, 256, kernel_size=1, stride=1) | |
self.relu = nn.ReLU(inplace=False) | |
def forward(self, x): | |
x0 = self.branch0(x) | |
x1 = self.branch1(x) | |
x2 = self.branch2(x) | |
out = torch.cat((x0, x1, x2), 1) | |
out = self.conv2d(out) | |
out = out * self.scale + x | |
out = self.relu(out) | |
return out | |
class Block17(nn.Module): | |
def __init__(self, scale=1.0): | |
super().__init__() | |
self.scale = scale | |
self.branch0 = BasicConv2d(896, 128, kernel_size=1, stride=1) | |
self.branch1 = nn.Sequential( | |
BasicConv2d(896, 128, kernel_size=1, stride=1), | |
BasicConv2d(128, 128, kernel_size=(1,7), stride=1, padding=(0,3)), | |
BasicConv2d(128, 128, kernel_size=(7,1), stride=1, padding=(3,0)) | |
) | |
self.conv2d = nn.Conv2d(256, 896, kernel_size=1, stride=1) | |
self.relu = nn.ReLU(inplace=False) | |
def forward(self, x): | |
x0 = self.branch0(x) | |
x1 = self.branch1(x) | |
out = torch.cat((x0, x1), 1) | |
out = self.conv2d(out) | |
out = out * self.scale + x | |
out = self.relu(out) | |
return out | |
class Block8(nn.Module): | |
def __init__(self, scale=1.0, noReLU=False): | |
super().__init__() | |
self.scale = scale | |
self.noReLU = noReLU | |
self.branch0 = BasicConv2d(1792, 192, kernel_size=1, stride=1) | |
self.branch1 = nn.Sequential( | |
BasicConv2d(1792, 192, kernel_size=1, stride=1), | |
BasicConv2d(192, 192, kernel_size=(1,3), stride=1, padding=(0,1)), | |
BasicConv2d(192, 192, kernel_size=(3,1), stride=1, padding=(1,0)) | |
) | |
self.conv2d = nn.Conv2d(384, 1792, kernel_size=1, stride=1) | |
if not self.noReLU: | |
self.relu = nn.ReLU(inplace=False) | |
def forward(self, x): | |
x0 = self.branch0(x) | |
x1 = self.branch1(x) | |
out = torch.cat((x0, x1), 1) | |
out = self.conv2d(out) | |
out = out * self.scale + x | |
if not self.noReLU: | |
out = self.relu(out) | |
return out | |
class Mixed_6a(nn.Module): | |
def __init__(self): | |
super().__init__() | |
self.branch0 = BasicConv2d(256, 384, kernel_size=3, stride=2) | |
self.branch1 = nn.Sequential( | |
BasicConv2d(256, 192, kernel_size=1, stride=1), | |
BasicConv2d(192, 192, kernel_size=3, stride=1, padding=1), | |
BasicConv2d(192, 256, kernel_size=3, stride=2) | |
) | |
self.branch2 = nn.MaxPool2d(3, stride=2) | |
def forward(self, x): | |
x0 = self.branch0(x) | |
x1 = self.branch1(x) | |
x2 = self.branch2(x) | |
out = torch.cat((x0, x1, x2), 1) | |
return out | |
class Mixed_7a(nn.Module): | |
def __init__(self): | |
super().__init__() | |
self.branch0 = nn.Sequential( | |
BasicConv2d(896, 256, kernel_size=1, stride=1), | |
BasicConv2d(256, 384, kernel_size=3, stride=2) | |
) | |
self.branch1 = nn.Sequential( | |
BasicConv2d(896, 256, kernel_size=1, stride=1), | |
BasicConv2d(256, 256, kernel_size=3, stride=2) | |
) | |
self.branch2 = nn.Sequential( | |
BasicConv2d(896, 256, kernel_size=1, stride=1), | |
BasicConv2d(256, 256, kernel_size=3, stride=1, padding=1), | |
BasicConv2d(256, 256, kernel_size=3, stride=2) | |
) | |
self.branch3 = nn.MaxPool2d(3, stride=2) | |
def forward(self, x): | |
x0 = self.branch0(x) | |
x1 = self.branch1(x) | |
x2 = self.branch2(x) | |
x3 = self.branch3(x) | |
out = torch.cat((x0, x1, x2, x3), 1) | |
return out | |
"""### Model""" | |
## Version of the Inception-ResNetV1 layers based on: | |
##https://github.com/timesler/facenet-pytorch/blob/master/models/inception_resnet_v1.py | |
class InceptionResnetV1(nn.Module): | |
"""Inception Resnet V1 model with optional loading of pretrained weights. | |
Model parameters can be loaded based on pretraining on the VGGFace2 or CASIA-Webface | |
datasets. Pretrained state_dicts are automatically downloaded on model instantiation if | |
requested and cached in the torch cache. Subsequent instantiations use the cache rather than | |
redownloading. | |
Keyword Arguments: | |
pretrained {str} -- Optional pretraining dataset. Either 'vggface2' or 'casia-webface'. | |
(default: {None}) | |
classify {bool} -- Whether the model should output classification probabilities or feature | |
embeddings. (default: {False}) | |
num_classes {int} -- Number of output classes. If 'pretrained' is set and num_classes not | |
equal to that used for the pretrained model, the final linear layer will be randomly | |
initialized. (default: {None}) | |
dropout_prob {float} -- Dropout probability. (default: {0.6}) | |
""" | |
def __init__(self, pretrained=None, classify=False, num_classes=None, dropout_prob=0.6, device=None): | |
super().__init__() | |
# Set simple attributes | |
self.pretrained = pretrained | |
self.classify = classify | |
self.num_classes = num_classes | |
if pretrained == 'vggface2': | |
tmp_classes = 8631 | |
elif pretrained == 'casia-webface': | |
tmp_classes = 10575 | |
elif pretrained is None and self.classify and self.num_classes is None: | |
raise Exception('If "pretrained" is not specified and "classify" is True, "num_classes" must be specified') | |
# Define layers | |
self.conv2d_1a = BasicConv2d(3, 32, kernel_size=3, stride=2) | |
self.conv2d_2a = BasicConv2d(32, 32, kernel_size=3, stride=1) | |
self.conv2d_2b = BasicConv2d(32, 64, kernel_size=3, stride=1, padding=1) | |
self.maxpool_3a = nn.MaxPool2d(3, stride=2) | |
self.conv2d_3b = BasicConv2d(64, 80, kernel_size=1, stride=1) | |
self.conv2d_4a = BasicConv2d(80, 192, kernel_size=3, stride=1) | |
self.conv2d_4b = BasicConv2d(192, 256, kernel_size=3, stride=2) | |
self.repeat_1 = nn.Sequential( | |
Block35(scale=0.17), | |
Block35(scale=0.17), | |
Block35(scale=0.17), | |
Block35(scale=0.17), | |
Block35(scale=0.17), | |
) | |
self.mixed_6a = Mixed_6a() | |
self.repeat_2 = nn.Sequential( | |
Block17(scale=0.10), | |
Block17(scale=0.10), | |
Block17(scale=0.10), | |
Block17(scale=0.10), | |
Block17(scale=0.10), | |
Block17(scale=0.10), | |
Block17(scale=0.10), | |
Block17(scale=0.10), | |
Block17(scale=0.10), | |
Block17(scale=0.10), | |
) | |
self.mixed_7a = Mixed_7a() | |
self.repeat_3 = nn.Sequential( | |
Block8(scale=0.20), | |
Block8(scale=0.20), | |
Block8(scale=0.20), | |
Block8(scale=0.20), | |
Block8(scale=0.20), | |
) | |
self.block8 = Block8(noReLU=True) | |
self.avgpool_1a = nn.AdaptiveAvgPool2d(1) | |
self.dropout = nn.Dropout(dropout_prob) | |
self.last_linear = nn.Linear(1792, 512, bias=False) | |
self.last_bn = nn.BatchNorm1d(512, eps=0.001, momentum=0.1, affine=True) | |
if pretrained is not None: | |
self.logits = nn.Linear(512, tmp_classes) | |
load_weights(self, pretrained) | |
if self.classify and self.num_classes is not None: | |
self.logits = nn.Linear(512, self.num_classes) | |
self.device = torch.device('cpu') | |
if device is not None: | |
self.device = device | |
self.to(device) | |
def forward(self, x): | |
"""Calculate embeddings or logits given a batch of input image tensors. | |
Arguments: | |
x {torch.tensor} -- Batch of image tensors representing faces. | |
Returns: | |
torch.tensor -- Batch of embedding vectors or multinomial logits. | |
""" | |
x = self.conv2d_1a(x) | |
x = self.conv2d_2a(x) | |
x = self.conv2d_2b(x) | |
x = self.maxpool_3a(x) | |
x = self.conv2d_3b(x) | |
x = self.conv2d_4a(x) | |
x = self.conv2d_4b(x) | |
x = self.repeat_1(x) | |
x = self.mixed_6a(x) | |
x = self.repeat_2(x) | |
x = self.mixed_7a(x) | |
x = self.repeat_3(x) | |
x = self.block8(x) | |
x = self.avgpool_1a(x) | |
x = self.dropout(x) | |
x = self.last_linear(x.view(x.shape[0], -1)) | |
x = self.last_bn(x) | |
if self.classify: | |
x = self.logits(x) | |
else: | |
x = nn.functional.normalize(x, p=2, dim=1) | |
return x | |
def load_weights(mdl, name): | |
"""Download pretrained state_dict and load into model. | |
Arguments: | |
mdl {torch.nn.Module} -- Pytorch model. | |
name {str} -- Name of dataset that was used to generate pretrained state_dict. | |
Raises: | |
ValueError: If 'pretrained' not equal to 'vggface2' or 'casia-webface'. | |
""" | |
if name == 'vggface2': | |
path = '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/20180402-114759-vggface2.pt' | |
else: | |
raise ValueError('Pretrained model only exists for "vggface2"') | |
state_dict = torch.load(path) | |
mdl.load_state_dict(state_dict) | |
"""## Create extra helper functions""" | |
## Create a list of the categories in the data | |
def get_categories(file_loc): | |
df = pd.read_csv(file_loc) | |
# Rename columns if unnamed | |
for col_num in range(len(df.columns.values)): | |
if 'Unnamed' in df.columns.values[col_num]: | |
if col_num == 0: | |
df.columns.values[col_num] = 'ID' | |
elif col_num == 1: | |
df.columns.values[col_num] = 'Category' | |
# Extract category IDs and labels | |
categories = {} | |
id2num = {} | |
for i, row in df.iterrows(): | |
categories[int(row['ID'])] = row['Category'] | |
id2num[row['Category']] = row['ID'] | |
return categories, id2num | |
## Plot any given statistic with a single line of code | |
def plot_statistic(vals, title=None, xlabel=None, ylabel=None, legend=None): | |
if isinstance(vals[0], list): | |
for v in vals: | |
plt.plot(v) | |
else: | |
plt.plot(vals) | |
if isinstance(title, str): | |
plt.title(title) | |
if isinstance(xlabel, str): | |
plt.xlabel(xlabel) | |
if isinstance(ylabel, str): | |
plt.ylabel(ylabel) | |
if isinstance(legend, str): | |
plt.legend([legend]) | |
elif isinstance(legend, list): | |
plt.legend(legend) | |
plt.show() | |
plt.close() | |
"""## Create Model Training Function""" | |
def train_face_id(epochs, categories, name2num, data_root, data_file, batch_size, | |
model_name='InceptionResnetV1', data_type='precropped', | |
pretrained=None, use_validation_set=True): | |
# Set training variables | |
epochs = epochs | |
batch_size = batch_size | |
learning_rate = 5e-6 # 1e-3 | |
# Set Transforms for Training Data and Val Data | |
train_transform = tvt.Compose([ | |
# tvt.ToPILImage(), | |
# tvt.ColorJitter(brightness=0.4, hue=0.2), | |
# tvt.RandomEqualize(p=0.2), | |
# tvt.RandomGrayscale(p=0.2), | |
# tvt.RandomHorizontalFlip(p=0.5), | |
# tvt.GaussianBlur((5,9), sigma=(0.1, 2.0)), | |
# tvt.RandomPerspective(distortion_scale=0.2, p=0.5), | |
tvt.ToTensor()]) | |
val_transform = tvt.Compose([tvt.ToTensor()]) | |
## Create dataloader for training | |
train_dataset = MyDataset(data_root, name2num, data_file, | |
transform=train_transform, label_format='int') | |
if use_validation_set: # Split off a validation set | |
val_dataset = MyDataset(data_root, name2num, data_file, | |
transform=val_transform, label_format='int') | |
train_indices, val_indices = train_val_split(train_dataset, identities, percent_val=10) | |
train_dataset = torch.utils.data.Subset(train_dataset, train_indices) | |
val_dataset = torch.utils.data.Subset(val_dataset, val_indices) | |
else: | |
val_dataset = None | |
# Load as training and validation datasets | |
if val_dataset is not None: | |
dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, | |
shuffle=True) | |
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, | |
shuffle=True) | |
else: # Load all data as train | |
dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, | |
shuffle=True) | |
val_loader = None | |
osd = None | |
initial_epoch = 0 | |
# Create model to train | |
if model_name == 'InceptionResnetV1': | |
if pretrained == 'vggface2': | |
model = InceptionResnetV1(pretrained=pretrained, classify=True, num_classes=len(categories)) | |
elif '.pt' in pretrained and 'checkpoint' not in pretrained: # Load a full model | |
model = InceptionResnetV1(classify=True, num_classes=len(categories)) | |
model.load_state_dict(torch.load(pretrained)) | |
elif 'checkpoint' in pretrained and '.pt' in pretrained: # Load a checkpoint | |
model = InceptionResnetV1(classify=True, num_classes=len(categories)) | |
checkpoint = torch.load(pretrained) | |
model.load_state_dict(checkpoint['model_state_dict']) | |
osd = checkpoint['optimizer_state_dict'] | |
initial_epoch = checkpoint['epoch'] | |
else: | |
model = InceptionResnetV1(classify=True, num_classes=len(categories)) | |
else: | |
raise ValueError('Model '+model_name+' not available') | |
# Use GPU if available | |
device = torch.device('cuda' if torch.cuda.is_available() else "cpu") | |
model = model.to(device) | |
opt = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0, amsgrad=True) | |
# if osd: | |
# opt.load_state_dict(osd) | |
# scheduler = optim.lr_scheduler.ReduceLROnPlateau(opt, mode='min', factor=0.5, | |
# threshold=0.01, patience=5) | |
loss_func = nn.CrossEntropyLoss() | |
losses, acc_per_epoch = [], [] | |
val_losses, val_acc_per_epoch = [], [] | |
best_val_acc = 0.0 | |
start = time.time() | |
# Train model | |
for epoch in range(epochs): | |
acc = 0 | |
total = 0 | |
avg_loss = 0 | |
############################################################################ | |
model.train() | |
## Freeze model params | |
# for param in model.parameters(): | |
# param.requires_grad = False | |
## Unfreeze last layer params | |
# for param in model.logits.parameters(): | |
# param.requires_grad = True | |
for (images, labels) in tqdm(dataloader): | |
#### Train model | |
## Prepare Data | |
if not torch.is_tensor(images): | |
images = torch.from_numpy(images) | |
images = images.type(torch.FloatTensor) | |
images = images.to(device) | |
if not torch.is_tensor(labels): | |
labels = torch.from_numpy(labels) | |
labels = nn.functional.one_hot(labels, num_classes=len(categories)) | |
labels = labels.type(torch.FloatTensor) | |
labels = labels.to(device) | |
model.zero_grad(set_to_none=True) | |
## Apply model to Data | |
with torch.set_grad_enabled(True): | |
outputs = model(images) | |
# Use CE loss to update | |
loss = loss_func(outputs, labels) | |
# apply update to model | |
loss.backward() | |
opt.step() | |
avg_loss += loss.detach().cpu() * outputs.size(0) | |
for l, o in zip(labels, outputs): | |
if torch.argmax(l) == torch.argmax(o): | |
acc += 1 | |
total += 1 | |
# Update learning rate | |
# scheduler.step(loss.item()) | |
############################################################################ | |
if val_loader is not None: | |
val_acc = 0 | |
val_total = 0 | |
avg_val_loss = 0 | |
model.eval() | |
## Freeze logits layer params | |
# for param in model.logits.parameters(): | |
# param.requires_grad = False | |
for (images, labels) in tqdm(val_loader): | |
model.zero_grad(set_to_none=True) | |
## Prepare Data | |
if not torch.is_tensor(images): | |
images = torch.from_numpy(images) | |
images = images.to(device, dtype=torch.float) | |
if not torch.is_tensor(labels): | |
labels = torch.from_numpy(labels) | |
labels = torch.nn.functional.one_hot(labels, num_classes=len(categories)) | |
labels = labels.to(device, dtype=torch.float) | |
## Apply model to Data | |
with torch.set_grad_enabled(False): | |
outputs = model(images) | |
# Use CE loss to update | |
loss = loss_func(outputs, labels) | |
avg_val_loss += loss.detach().cpu() * outputs.size(0) | |
for l, o in zip(labels, outputs): | |
if torch.argmax(l) == torch.argmax(o): | |
val_acc += 1 | |
val_total += 1 | |
############################################################################ | |
# Print progress per epoch | |
curr_time = time.time()-start | |
print(("\n[epoch: %3d/%3d] Avg Train loss: %.3f | ") % | |
(epoch + 1 + initial_epoch, epochs + initial_epoch, (avg_loss) / total), end='') | |
if val_loader is not None: | |
print("Avg Val loss: %.3f | "%((avg_val_loss)/val_total),end='') | |
val_acc_epoch = int((val_acc / val_total) * 10000) / 100 | |
val_acc_per_epoch.append(val_acc_epoch) | |
val_losses.append((avg_val_loss) / val_total) | |
print(("Time: %d:%02d:%02d") % (int(curr_time / 3600), | |
int(int(curr_time / 60) % 60), | |
int(curr_time % 60))) | |
acc_epoch = int((acc / total) * 10000) / 100 | |
acc_per_epoch.append(acc_epoch) | |
losses.append((avg_loss) / total) | |
print('Accuracy in epoch: (Train) ', acc_epoch, end='') | |
if val_loader is not None: | |
val_acc_epoch = int((val_acc / val_total) * 10000) / 100 | |
print(' | (Val) ', val_acc_epoch, end='') | |
if best_val_acc < val_acc_epoch: | |
best_val_acc = val_acc_epoch | |
torch.save(model, '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/FaceNet.pt') | |
torch.save({ # Save the training state with the highest Validation Accuracy | |
'epoch': epoch + initial_epoch + 1, | |
'model_state_dict': model.state_dict(), | |
'optimizer_state_dict': opt.state_dict() | |
}, '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/FaceNet_checkpoint_best.pt') | |
print('\n') | |
# Save Latest training state | |
torch.save({'epoch': epoch + initial_epoch + 1, | |
'model_state_dict': model.state_dict(), | |
'optimizer_state_dict': opt.state_dict() | |
}, '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/FaceNet_checkpoint_last.pt') | |
# Plot loss | |
plot_statistic(losses, title="Loss at "+str(learning_rate), xlabel="Epoch", | |
ylabel="Loss", legend=["Training", "Validation"]) | |
# Plot accuracy | |
plot_statistic([acc_per_epoch, val_acc_per_epoch], title="Accuracy at "+str(learning_rate), | |
xlabel="Epoch", ylabel="Accuracy(%)", | |
legend=["Training", "Validation"]) | |
return model | |
"""## Create Model Test Function""" | |
def identify_face(img, model, preprocessed=True): | |
# Load model from file specified by string in model | |
if torch.cuda.is_available(): | |
model = torch.load(model) | |
else: | |
model = torch.load(model, map_location=torch.device('cpu')) | |
if not preprocessed: | |
faces = extract_faces(img, img_shape=None, no_faces='Img') | |
face = faces[0] | |
else: | |
face = img | |
transform = tvt.Compose([tvt.ToTensor()]) | |
face = transform(face) | |
# Use GPU if available | |
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
model = model.to(device) | |
## Prepare Data | |
face = face.type(torch.FloatTensor).unsqueeze(dim=0) | |
face = face.to(device) | |
## Apply model to Data | |
prediction = model(face) | |
identity = torch.argmax(prediction) | |
return identity.item() | |
"""## Select Dataset to use""" | |
# dataset_name = 'train_small' | |
dataset_name = 'train' | |
"""# Training Functions | |
## Load Raw Data and Run Dataset Alteration | |
""" | |
## Extract dataset to current working directory from Google Drive | |
from google.colab import drive | |
drive.mount('/content/gdrive') | |
if dataset_name == 'train_small': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/train_small.zip' | |
if dataset_name == 'train': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/train.zip' | |
src_root = './'+dataset_name+'/' | |
dest_root = './'+dataset_name+'_face/' | |
data_shape = (128,128) | |
extract_dataset_faces(src_root, dest_root, data_shape) | |
"""Timing: | |
~ 1 hour 20 mins for train | |
## Load preprocessed dataset | |
""" | |
import zipfile | |
from shutil import rmtree | |
def resize_face_set(set_name='train', target_size=(160,160)): | |
if dataset_name == 'train_small': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/train_small_face256x256.zip' -d './data_tmp/' | |
elif dataset_name == 'train': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/train_face256x256.zip' -d './data_tmp/' | |
elif dataset_name == 'test': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/test_face256x256.zip' -d './data_tmp/' | |
dest_path = './' + set_name + '_face/' | |
if not os.path.exists(dest_path): ## Create destination root folder if needed | |
os.makedirs(dest_path) | |
else: ## Remove any files in the root to create clean dataset in folder | |
for dirpath, dirnames, filenames in os.walk(dest_path): | |
for f in filenames: | |
os.remove(os.path.join(dirpath, f)) | |
for d in dirnames: | |
rmtree(os.path.join(dirpath, d)) | |
## Loop for all .jpg images in src_root | |
for dirpath, dirnames, filenames in os.walk('./data_tmp/'): | |
for fn in tqdm(filenames): | |
if fn.endswith('.jpg'): | |
with warnings.catch_warnings(): # Suppress warning about Lazy Modules | |
warnings.simplefilter("ignore") | |
img = Image.open(os.path.join(dirpath, fn)).convert('RGB') # Open image in RGB mode | |
img = img.resize(size=target_size) | |
img.save(dest_path + fn) # Save extracted face with same filename | |
for dirpath, dirnames, filenames in os.walk('./data_tmp/'): | |
for f in filenames: | |
os.remove(os.path.join(dirpath, f)) | |
for d in dirnames: | |
rmtree(os.path.join(dirpath, d)) | |
rmtree(dirpath) | |
## Extract preprocessed dataset to current working directory from Google Drive | |
from google.colab import drive | |
drive.mount('/content/gdrive') | |
data_shape = (160,160) | |
if data_shape == (256,256) and dataset_name == 'train_small': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/train_small_face256x256.zip' | |
elif data_shape == (256,256) and dataset_name == 'train': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/train_face256x256.zip' | |
elif data_shape == (160,160) and dataset_name == 'train_small': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/train_small_face.zip' | |
elif data_shape == (160,160) and dataset_name == 'train': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/train_face.zip' | |
elif dataset_name == 'train_small': | |
resize_face_set(set_name=dataset_name, target_size=data_shape) | |
elif dataset_name == 'train': | |
resize_face_set(set_name=dataset_name, target_size=data_shape) | |
with warnings.catch_warnings(): # Suppress warning about RGBA | |
warnings.simplefilter("ignore") | |
img = Image.open('./'+dataset_name+'_face/0.jpg').convert('RGB') # Open image in RGB mode | |
img = np.asarray(img) # Convert to numpy array for face detection | |
print(img.shape) # Print input shape | |
"""Timing: | |
~ 15 secs for train_face pure loading | |
~ 2 mins for train_face with resizing | |
## Run Training | |
""" | |
identities, name2num = get_categories('/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/category.csv') | |
train_csv_loc = '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/'+dataset_name+'.csv' | |
train_data_loc = './'+dataset_name+'_face' | |
epochs = 100 | |
batchsize = 256 #200 | |
model = train_face_id(epochs=epochs, categories=identities, name2num=name2num, batch_size=batchsize, | |
data_root=train_data_loc, data_file=train_csv_loc, | |
model_name='InceptionResnetV1', | |
pretrained='vggface2', | |
# pretrained='/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/FaceNet.pt', | |
# pretrained='/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/FaceNet_checkpoint_best.pt', | |
# pretrained='/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/FaceNet_checkpoint_last.pt', | |
# use_validation_set=False | |
) | |
## Close current Colab Runtime | |
from google.colab import runtime | |
runtime.unassign() | |
"""## Continue training""" | |
## Continue Training | |
identities, name2num = get_categories('/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/category.csv') | |
train_csv_loc = '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/'+dataset_name+'.csv' | |
train_data_loc = './'+dataset_name+'_face' | |
epochs = 50 | |
batchsize = 320 #256 | |
model = train_face_id(epochs=epochs, categories=identities, name2num=name2num, batch_size=batchsize, | |
data_root=train_data_loc, data_file=train_csv_loc, | |
model_name='InceptionResnetV1', | |
pretrained='/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/FaceNet_checkpoint_last.pt' | |
) | |
## Close current Colab Runtime | |
from google.colab import runtime | |
runtime.unassign() | |
"""# Testing Functions | |
## Load Unprocessed Test Data and Process the Data | |
""" | |
## Extract dataset to current working directory from Google Drive | |
from google.colab import drive | |
drive.mount('/content/gdrive') | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/test.zip' | |
src_root = './test/' | |
dest_root = './test_face/' | |
data_shape = (160,160) | |
extract_dataset_faces(src_root, dest_root, data_shape, no_faces='Img') | |
"""## Load Preprocessed Test Data""" | |
## Extract dataset to current working directory from Google Drive | |
from google.colab import drive | |
drive.mount('/content/gdrive') | |
data_shape = (160,160) | |
dataset_name = 'test' | |
if data_shape == (256,256) and dataset_name == 'test': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/test_face256x256.zip' | |
elif data_shape == (160,160) and dataset_name == 'test': | |
!unzip -q -n '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/test_face.zip' | |
elif dataset_name == 'test': | |
resize_face_set(set_name=dataset_name, target_size=data_shape) | |
with warnings.catch_warnings(): # Suppress warning about RGBA | |
warnings.simplefilter("ignore") | |
img = Image.open('./'+dataset_name+'_face/0.jpg').convert('RGB') # Open image in RGB mode | |
img = np.asarray(img) # Convert to numpy array for face detection | |
print(img.shape) # Print input shape | |
"""## Run Test""" | |
import csv | |
identities, name2num = get_categories('/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/category.csv') | |
## Load a saved model | |
model = '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/FaceNet.pt' | |
## Add all .jpg files in the Dataset folder to filename list | |
test_root = './test_face/' | |
testfiles = [fn for fn in os.listdir(test_root) if fn.endswith('.jpg')] | |
predicted_IDs = [['Id', 'Category']] * (len(testfiles) + 1) | |
for fn in tqdm(testfiles): | |
img_id = fn.removesuffix('.jpg') | |
img_loc_num = int(img_id) + 1 | |
with warnings.catch_warnings(): # Suppress RGBA warning | |
warnings.simplefilter("ignore") | |
img = Image.open(test_root + fn).convert('RGB') # Open image in RGB mode | |
img = (np.asarray(img) - 127.5) / 128 # -> np.array and compress values | |
category = identities[identify_face(img, model)] | |
predicted_IDs[img_loc_num] = [img_id, category] | |
filename = '/content/gdrive/MyDrive/Colab Notebooks/ECE50024/Kaggle Challenge/test_predictions.csv' | |
if os.path.isfile(filename): # Remove any existing file | |
os.remove(filename) | |
with open(filename, 'w+', newline='') as fp: | |
writer = csv.writer(fp) | |
writer.writerows(predicted_IDs) |