Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
minichallenge/minichallenge.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
445 lines (280 sloc)
12.2 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
# In[1]: | |
import cv2 | |
import torch | |
import torchvision | |
from torch.utils.data import Dataset, DataLoader | |
from torchvision import transforms | |
from PIL import Image | |
import pandas as pd | |
import os | |
import matplotlib.image as mpimg | |
import matplotlib.pyplot as plt | |
from torch import nn | |
from torch.utils.data import random_split | |
from tqdm.auto import tqdm | |
from torchvision.transforms import v2 | |
import torch.nn.functional as F | |
from shutil import copyfile | |
from sklearn.model_selection import train_test_split | |
import dlib | |
# In[2]: | |
# df = pd.read_csv('train.csv') | |
# # Split the dataset into training and validation sets | |
# train_df, val_df = train_test_split(df, test_size=0.1, random_state=42) | |
# # Save the datasets to csv files | |
# train_df.to_csv('train_set.csv', index=False) | |
# val_df.to_csv('val_set.csv', index=False) | |
# print("The train.csv file was successfully split into train_set.csv and val_set.csv.") | |
# In[22]: | |
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml') | |
for filename in os.listdir('train\\train\\'): | |
path = os.path.join('train\\train\\', filename) | |
image = cv2.imread(path) | |
if image is None: | |
# copyfile(path, os.path.join("train_preprocessed\\", filename)) | |
continue | |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
faces = face_cascade.detectMultiScale(gray, 1.05, 10) | |
if len(faces) > 1: | |
faces = [max(faces, key=lambda rect: rect[2] * rect[3])] | |
for (x, y, w, h) in faces: | |
if y+h > image.shape[0]: | |
h = image.shape[0] - y | |
if x+w > image.shape[1]: | |
w = image.shape[1] - x | |
image = image[y:y+h, x:x+w] | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
Image.fromarray(image).save(os.path.join("train_preprocessed\\", filename)) | |
# In[86]: | |
class SimpleCNN(nn.Module): | |
def __init__(self, category_dict): | |
super(SimpleCNN, self).__init__() | |
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1) | |
self.pool = nn.MaxPool2d(kernel_size=2, stride=2) | |
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1) | |
self.dropout = nn.Dropout(0.2) | |
self.fc1 = nn.Linear(64 * 32 * 32, 500) | |
self.fc2 = nn.Linear(500, len(category_dict)) | |
def forward(self, x): | |
x = self.pool(F.relu(self.conv1(x))) | |
x = self.pool(F.relu(self.conv2(x))) | |
x = x.view(-1, 64 * 32 * 32) | |
x = F.relu(self.fc1(x)) | |
x = self.dropout(x) | |
x = F.relu(self.fc2(x)) | |
return x | |
# In[3]: | |
prep_dir = 'train_preprocessed\\' | |
device = "cuda" if torch.cuda.is_available() else "cpu"#running on cuda | |
classes=pd.read_csv('category.csv') | |
category_dict = classes.set_index('Category')['Unnamed: 0'].to_dict() | |
# model = SimpleCNN(category_dict) | |
# model.to(device) | |
# Load the trained model state dict | |
# state_dict = torch.load('simple_cnn_model.pth') | |
# model.load_state_dict(state_dict) | |
class FaceDataset(Dataset): | |
def __init__(self, csv_file, root_dir, transform=None): | |
self.annotations = pd.read_csv(csv_file) | |
self.root_dir = root_dir | |
self.transform = transform | |
self.normalize=transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
self.count=0 | |
def __len__(self): | |
return len(self.annotations) | |
def __getitem__(self, index): | |
while True: | |
img_path = os.path.join(self.root_dir, str(self.annotations.iloc[index, 1])) | |
if not os.path.exists(img_path): | |
index = (index + 1) % len(self) | |
continue | |
image = Imadie.open(img_path) | |
# if image is None: | |
# index = (index + 1) % len(self) | |
# continue | |
if self.transform: | |
image = self.transform(image) | |
label = self.annotations.iloc[index, 2] | |
label = category_dict[label] | |
return image, label | |
# transform_train = transforms.Compose([ | |
# transforms.RandomHorizontalFlip(), | |
# transforms.RandomRotation(10), | |
# transforms.Resize((64, 64)), | |
# transforms.ToTensor(), | |
# transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) | |
# ]) | |
# transform_train = transforms.Compose([ | |
# transforms.TrivialAugmentWide(), | |
# transforms.Resize((128, 128)), | |
# transforms.ToTensor(), | |
# ]) | |
# train_set = FaceDataset(csv_file='train_set.csv', root_dir=prep_dir,transform=transform_train) | |
# test_set = FaceDataset(csv_file='val_set.csv', root_dir=prep_dir,transform=transform_train) | |
# train_loader = DataLoader(train_set, batch_size=128, shuffle=True) | |
# test_loader = DataLoader(test_set, batch_size=128, shuffle=False) | |
# In[3]: | |
#train function | |
def train(model: torch.nn.Module, | |
dataloader: torch.utils.data.DataLoader, | |
loss_fn: torch.nn.Module, | |
optimizer: torch.optim.Optimizer, | |
device: torch.device): | |
model.train() | |
train_loss, train_acc = 0, 0 | |
for batch, (X, y) in enumerate(dataloader): | |
X, y = X.to(device), y.to(device) | |
y_pred = model(X) | |
loss = loss_fn(y_pred, y) | |
train_loss += loss.item() | |
optimizer.zero_grad() | |
loss.backward() | |
optimizer.step() | |
y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1) | |
batch_acc=(y_pred_class == y).sum().item()/len(y_pred) | |
train_acc += batch_acc | |
if batch % 100 == 0: | |
print(batch, loss.item(), str(batch_acc*100)+"%") | |
train_loss = train_loss / len(dataloader) | |
train_acc = train_acc / len(dataloader) | |
return train_loss, train_acc | |
# In[4]: | |
#test function | |
def test(model: torch.nn.Module, | |
dataloader: torch.utils.data.DataLoader, | |
loss_fn: torch.nn.Module, | |
device: torch.device): | |
model.eval() | |
test_loss, test_acc = 0, 0 | |
with torch.inference_mode(): | |
for batch, (X, y) in enumerate(dataloader): | |
X, y = X.to(device), y.to(device) | |
test_pred_logits = model(X) | |
loss = loss_fn(test_pred_logits, y) | |
test_loss += loss.item() | |
test_pred_labels = test_pred_logits.argmax(dim=1) | |
batch_acc=((test_pred_labels == y).sum().item()/len(test_pred_labels)) | |
test_acc += batch_acc | |
if batch % 100 == 0: | |
print(batch, loss.item(), str(batch_acc*100)+"%") | |
test_loss = test_loss / len(dataloader) | |
test_acc = test_acc / len(dataloader) | |
return test_loss, test_acc | |
# In[132]: | |
optimizer = torch.optim.Adam(model.parameters(),0.001) | |
loss_fn = nn.CrossEntropyLoss() | |
# In[133]: | |
best_val_loss = float('inf') | |
patience = 0 | |
transform_train = transforms.Compose([ | |
transforms.Resize((128, 128)), | |
transforms.ToTensor(), | |
# v2.AutoAugment(), | |
transforms.RandomErasing(), | |
v2.TrivialAugmentWide() | |
]) | |
transform_test = transforms.Compose([ | |
transforms.Resize((128, 128)), | |
transforms.ToTensor(), | |
]) | |
train_set = FaceDataset(csv_file='train_set.csv', root_dir=prep_dir,transform=transform_train) | |
test_set = FaceDataset(csv_file='val_set.csv', root_dir=prep_dir,transform=transform_test) | |
train_loader = DataLoader(train_set, batch_size=128, shuffle=True) | |
test_loader = DataLoader(test_set, batch_size=128, shuffle=False) | |
# In[75]: | |
get_ipython().run_cell_magic('time', '', '\nmodel_results = {\n "train_loss": [],\n "train_acc": [],\n }\nmodel = model.to(device)\nfor epoch in tqdm(range(100)):\n\n train_loss, train_acc = train(model=model,\n dataloader=train_loader,\n loss_fn=loss_fn,\n optimizer=optimizer,\n device=device)\n \n val_loss, val_acc = test(model=model,\n dataloader=test_loader, \n loss_fn=loss_fn,\n device=device)\n \n\n if val_loss < best_val_loss:\n best_val_loss = val_loss\n patience = 0\n\n else:\n patience += 1\n \n print(f"Epoch: {epoch+1} | train_loss: {train_loss:.4f} | train_acc: {train_acc:.4f} | val_loss: {val_loss:.4f} | val_acc: {val_acc:.4f}")\n \n if patience > 5:\n print("Early stopping due to validation loss not improving")\n break\n model_results["train_loss"].append(train_loss)\n model_results["train_acc"].append(train_acc)') | |
# In[136]: | |
# torch.save(model.state_dict(), 'simple_cnn_model.pth') | |
# In[5]: | |
from torchvision import transforms | |
resnet50 = torchvision.models.resnet50(pretrained=True) | |
resnet50.fc=nn.Linear(in_features=2048, out_features=100, bias=True) | |
resnet50_transform = transforms.Compose([ | |
transforms.Resize(256), | |
transforms.CenterCrop(224), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
# v2.RandAugment() | |
# v2.TrivialAugmentWide(), | |
# v2.TrivialAugmentWide() | |
# v2.TrivialAugmentWide() | |
]) | |
resnet50_transform_test = transforms.Compose([ | |
transforms.Resize(256), | |
transforms.CenterCrop(224), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
# v2.TrivialAugmentWide() | |
]) | |
# In[12]: | |
train_set = FaceDataset(csv_file='train_set.csv', root_dir=prep_dir,transform=resnet50_transform) | |
test_set = FaceDataset(csv_file='val_set.csv', root_dir=prep_dir,transform=resnet50_transform_test) | |
train_loader = DataLoader(train_set, batch_size=64, shuffle=True) | |
test_loader = DataLoader(test_set, batch_size=64, shuffle=False) | |
# In[13]: | |
optimizer = torch.optim.Adam(resnet50.parameters(), lr=0.001) | |
state_dict = torch.load('resnet50_epoch15.pth') | |
resnet50.load_state_dict(state_dict) | |
loss_fn = nn.CrossEntropyLoss() | |
resnet50=resnet50.to(device) | |
best_val_acc = 0.0 | |
best_model = None | |
for epoch in tqdm(range(100)): | |
train_loss, train_acc = train(model=resnet50, | |
dataloader=train_loader, | |
loss_fn=loss_fn, | |
optimizer=optimizer, | |
device=device) | |
val_loss, val_acc = test(model=resnet50, | |
dataloader=test_loader, | |
loss_fn=loss_fn, | |
device=device) | |
print(f"Epoch: {epoch+1} | train_loss: {train_loss:.4f} | train_acc: {train_acc:.4f} | val_loss: {val_loss:.4f} | val_acc: {val_acc:.4f}") | |
if val_acc > best_val_acc: | |
best_val_acc = val_acc | |
best_model = resnet50 | |
model_path = f"resnet50_epppppp{epoch+1}.pth" | |
torch.save(best_model.state_dict(), model_path) | |
# In[45]: | |
# torch.save(resnet50.state_dict(), 'resnet50.pth') | |
# In[4]: | |
from torchvision import datasets | |
from PIL import Image | |
import random | |
resnet50 = torchvision.models.resnet50(pretrained=True) | |
resnet50.fc=nn.Linear(in_features=2048, out_features=100, bias=True) | |
resnet50_transform = transforms.Compose([ | |
transforms.Resize(256), | |
transforms.CenterCrop(224), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
]) | |
t= transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
state_dict = torch.load('resnet50_epoch15.pth') | |
resnet50.load_state_dict(state_dict) | |
resnet50=resnet50.to(device) | |
predictions = [] | |
resnet50.eval() | |
test_dir = 'test_preprocessed' | |
for i, filename in enumerate(os.listdir(test_dir)): | |
img_path = os.path.join(test_dir, filename) | |
image = Image.open(img_path) | |
image=image.convert("RGB") | |
image = resnet50_transform(image).unsqueeze(0).to(device) | |
with torch.no_grad(): | |
output = resnet50(image) | |
# probabilities = F.softmax(output, dim=1) | |
_, predicted_class = torch.max(output, 1) | |
predictions.append((filename.split('.')[0],predicted_class.item())) | |
# In[5]: | |
print(len(predictions)) | |
swapped_category_dict = {v: k for k, v in category_dict.items()} | |
df = pd.DataFrame({ | |
'id': [int(i[0]) for i in predictions], | |
'category': [swapped_category_dict[i[1]] for i in predictions] | |
}) | |
sorted_df = df.sort_values(by='id') | |
sorted_df.to_csv('pppp.csv', index=False) | |
# In[ ]: | |