Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Mini-Challenge/final.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
246 lines (192 sloc)
8.34 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""final | |
Automatically generated by Colab. | |
Original file is located at | |
https://colab.research.google.com/drive/10iQhiCiRcuiDJvoNggByaX8XUMEDpdi7 | |
""" | |
import torch | |
from torchvision import transforms | |
from torch.utils.data import random_split | |
from torch.utils.data import Dataset | |
from torch.utils.data import DataLoader | |
from PIL import Image | |
import zipfile | |
import os | |
import pandas as pd | |
import numpy as np | |
import cv2 | |
import matplotlib.pyplot as plt | |
!pip install facenet-pytorch | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
print(f'Using device : {device}') | |
from google.colab import drive | |
drive.mount('/content/drive') | |
# download processed.zip file | |
zip_path = '/content/drive/My Drive/processed.zip' | |
extract_to = '/content/processed/' | |
os.makedirs(extract_to, exist_ok=True) | |
# unzip the file | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(extract_to) | |
print("Files extracted successfully!") | |
# match the index for the preprocessed train data | |
category_csv_file = '/content/drive/MyDrive/category.csv' | |
train_csv_file = '/content/drive/MyDrive/train.csv' | |
image_folder = '/content/processed' | |
def map_category(category_csv_file): | |
category_df = pd.read_csv(category_csv_file) | |
category_to_index = {category: idx for idx, category in enumerate(category_df['Category'])} | |
index_to_category = {idx: category for category, idx in category_to_index.items()} | |
return category_to_index, index_to_category | |
category_to_index, index_to_category = map_category(category_csv_file) | |
category_df = pd.read_csv(category_csv_file) | |
train_df = pd.read_csv(train_csv_file) | |
train_df['exists'] = train_df['File Name'].apply(lambda x: os.path.exists(os.path.join(image_folder, x))) | |
train_df = train_df[train_df['exists']] | |
train_df['Label'] = train_df['Category'].map(category_to_index) | |
filenames = train_df['File Name'].tolist() | |
labels = train_df['Label'].tolist() | |
# Choose an index to check | |
index_to_check = 1 | |
# Check if the index is valid | |
if index_to_check < len(filenames): | |
img_path = os.path.join(image_folder, filenames[index_to_check]) | |
image = Image.open(img_path).convert('RGB') | |
plt.imshow(image) | |
plt.show() | |
label = labels[index_to_check] | |
label_tensor = torch.tensor(label, dtype=torch.long) | |
print(label_tensor, filenames[index_to_check]) | |
else: | |
print(f"Index {index_to_check} is out of range for the number of files.") | |
class ImageDataLoader(Dataset): | |
def __init__(self, images_directory, metadata_file_path, categories_file, transformation=None): | |
self.images_dir = images_directory | |
self.transform = transformation | |
self.metadata_df = pd.read_csv(metadata_file_path) | |
self.metadata_df['file_exists'] = self.metadata_df['Image_Name'].apply( | |
lambda name: os.path.isfile(os.path.join(images_directory, name)) | |
) | |
self.metadata_df = self.metadata_df[self.metadata_df['file_exists']] | |
categories_df = pd.read_csv(categories_file) | |
self.category_map = {cat: index for index, cat in enumerate(categories_df['Category_Name'])} | |
self.metadata_df['Numeric_Label'] = self.metadata_df['Category'].map(self.category_map) | |
self.image_names = self.metadata_df['Image_Name'].tolist() | |
self.labels = self.metadata_df['Numeric_Label'].tolist() | |
def __len__(self): | |
return len(self.image_names) | |
def __getitem__(self, index): | |
file_path = os.path.join(self.images_dir, self.image_names[index]) | |
img = Image.open(file_path).convert('RGB') | |
if self.transform: | |
img = self.transform(img) | |
label = self.labels[index] | |
label_as_tensor = torch.tensor(label, dtype=torch.long) | |
return img, label_as_tensor | |
# Define image transformations | |
image_transforms = transforms.Compose([ | |
transforms.Resize((224, 224), interpolation=transforms.InterpolationMode.BICUBIC), | |
transforms.CenterCrop(224), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) #ImageNet | |
]) | |
# Create an instance of the dataset | |
image_dataset = ImageDataLoader(image_folder, train_csv_file, category_csv_file, image_transforms) | |
# Constants | |
NUM_EPOCHS = 30 | |
BATCH_SIZE = 32 | |
NUM_CLASSES = 100 | |
LEARNING_RATE = 1e-3 | |
STEP_SIZE = 20 | |
GAMMA = 0.5 | |
# Dataset sizes | |
dataset_count = len(image_dataset) | |
training_count = int(dataset_count) | |
validation_count = dataset_count - training_count | |
# Split dataset | |
training_data, validation_data = random_split(image_dataset, [training_count, validation_count]) | |
# Loaders | |
training_loader = DataLoader(training_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True) | |
validation_loader = DataLoader(validation_data, batch_size=BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True) | |
# Model configuration | |
network = models.resnet50(weights='IMAGENET1K_V2') | |
network.fc = nn.Linear(network.fc.in_features, NUM_CLASSES) | |
network = nn.DataParallel(network).to(device) | |
# Training components | |
loss_function = nn.CrossEntropyLoss() | |
optimiser = optim.Adam(network.parameters(), lr=LEARNING_RATE) | |
lr_scheduler = StepLR(optimiser, step_size=STEP_SIZE, gamma=GAMMA) | |
precision_scaler = GradScaler() | |
# Metrics | |
training_losses = [] | |
validation_losses = [] | |
validation_accuracy = [] | |
# Training process | |
for epoch in range(NUM_EPOCHS): | |
network.train() | |
total_loss = 0.0 | |
for images, targets in training_loader: | |
images, targets = images.to(device), targets.to(device) | |
optimiser.zero_grad() | |
with autocast(): | |
predictions = network(images) | |
loss = loss_function(predictions, targets) | |
precision_scaler.scale(loss).backward() | |
precision_scaler.step(optimiser) | |
precision_scaler.update() | |
total_loss += loss.item() | |
average_train_loss = total_loss / len(training_loader) | |
print(f'Epoch {epoch + 1}/{NUM_EPOCHS} - Train Loss: {average_train_loss:.2f}') | |
training_losses.append(average_train_loss) | |
class FacialRecognitionDataset(Dataset): | |
def __init__(self, directory, img_transform=None): | |
self.directory = directory | |
self.img_transform = img_transform | |
self.image_files = sorted([f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]) | |
self.detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
def __len__(self): | |
return len(self.image_files) | |
def __getitem__(self, index): | |
file_path = os.path.join(self.directory, self.image_files[index]) | |
pil_image = Image.open(file_path).convert('RGB') | |
cv_image = np.array(pil_image)[:, :, ::-1] # Convert RGB to BGR for OpenCV | |
gray_image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) | |
detected_faces = self.detector.detectMultiScale(gray_image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) | |
if detected_faces: | |
x, y, width, height = detected_faces[0] | |
cropped_face = cv_image[y:y+height, x:x+width] | |
final_image = Image.fromarray(cropped_face[:, :, ::-1]) # Convert BGR back to RGB | |
else: | |
final_image = pil_image | |
if self.img_transform: | |
final_image = self.img_transform(final_image) | |
return final_image, self.image_files[index] | |
zip_path = '/content/drive/My Drive/test.zip' | |
extract_to = '/content/test/' | |
os.makedirs(extract_to, exist_ok=True) | |
# Unzip the file | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(extract_to) | |
print("Files extracted successfully!") | |
# Setup dataset and loader | |
dataset_directory = '/content/test' | |
face_dataset = FacialRecognitionDataset(dataset_directory, image_transforms) | |
face_loader = DataLoader(face_dataset, batch_size=1, shuffle=False) | |
# Model to evaluation mode | |
model.eval() | |
all_predictions = [] | |
all_filenames = [] | |
# Processing images | |
with torch.no_grad(): | |
for data, names in face_loader: | |
data = data.to(device) | |
model_outputs = model(data) | |
_, predicted_indices = torch.max(model_outputs, 1) | |
predicted_categories = [index_to_category[index.item()] for index in predicted_indices] | |
all_predictions.extend(predicted_categories) | |
all_filenames.extend(names) | |
# Generating CSV file | |
results_df = pd.DataFrame({'Id': all_filenames, 'Category': all_predictions}) | |
results_df['Id'] = results_df['Id'].str.extract('(\d+)').astype(int) | |
results_df.sort_values('Id', inplace=True) | |
results_df.to_csv('/content/drive/MyDrive/predictions.csv', index=False) |