-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
0 parents
commit 197a9b0
Showing
2 changed files
with
366 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,246 @@ | ||
# -*- coding: utf-8 -*- | ||
"""final | ||
Automatically generated by Colab. | ||
Original file is located at | ||
https://colab.research.google.com/drive/10iQhiCiRcuiDJvoNggByaX8XUMEDpdi7 | ||
""" | ||
|
||
import torch | ||
from torchvision import transforms | ||
from torch.utils.data import random_split | ||
from torch.utils.data import Dataset | ||
from torch.utils.data import DataLoader | ||
from PIL import Image | ||
|
||
import zipfile | ||
import os | ||
import pandas as pd | ||
import numpy as np | ||
import cv2 | ||
import matplotlib.pyplot as plt | ||
|
||
!pip install facenet-pytorch | ||
|
||
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | ||
print(f'Using device : {device}') | ||
|
||
from google.colab import drive | ||
drive.mount('/content/drive') | ||
|
||
# download processed.zip file | ||
zip_path = '/content/drive/My Drive/processed.zip' | ||
extract_to = '/content/processed/' | ||
|
||
os.makedirs(extract_to, exist_ok=True) | ||
|
||
# unzip the file | ||
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | ||
zip_ref.extractall(extract_to) | ||
|
||
print("Files extracted successfully!") | ||
|
||
# match the index for the preprocessed train data | ||
category_csv_file = '/content/drive/MyDrive/category.csv' | ||
train_csv_file = '/content/drive/MyDrive/train.csv' | ||
image_folder = '/content/processed' | ||
|
||
def map_category(category_csv_file): | ||
category_df = pd.read_csv(category_csv_file) | ||
category_to_index = {category: idx for idx, category in enumerate(category_df['Category'])} | ||
index_to_category = {idx: category for category, idx in category_to_index.items()} | ||
return category_to_index, index_to_category | ||
|
||
category_to_index, index_to_category = map_category(category_csv_file) | ||
category_df = pd.read_csv(category_csv_file) | ||
train_df = pd.read_csv(train_csv_file) | ||
|
||
train_df['exists'] = train_df['File Name'].apply(lambda x: os.path.exists(os.path.join(image_folder, x))) | ||
train_df = train_df[train_df['exists']] | ||
train_df['Label'] = train_df['Category'].map(category_to_index) | ||
|
||
filenames = train_df['File Name'].tolist() | ||
labels = train_df['Label'].tolist() | ||
|
||
# Choose an index to check | ||
index_to_check = 1 | ||
|
||
# Check if the index is valid | ||
if index_to_check < len(filenames): | ||
img_path = os.path.join(image_folder, filenames[index_to_check]) | ||
image = Image.open(img_path).convert('RGB') | ||
plt.imshow(image) | ||
plt.show() | ||
|
||
label = labels[index_to_check] | ||
label_tensor = torch.tensor(label, dtype=torch.long) | ||
print(label_tensor, filenames[index_to_check]) | ||
else: | ||
print(f"Index {index_to_check} is out of range for the number of files.") | ||
|
||
class ImageDataLoader(Dataset): | ||
def __init__(self, images_directory, metadata_file_path, categories_file, transformation=None): | ||
self.images_dir = images_directory | ||
self.transform = transformation | ||
self.metadata_df = pd.read_csv(metadata_file_path) | ||
self.metadata_df['file_exists'] = self.metadata_df['Image_Name'].apply( | ||
lambda name: os.path.isfile(os.path.join(images_directory, name)) | ||
) | ||
self.metadata_df = self.metadata_df[self.metadata_df['file_exists']] | ||
categories_df = pd.read_csv(categories_file) | ||
self.category_map = {cat: index for index, cat in enumerate(categories_df['Category_Name'])} | ||
self.metadata_df['Numeric_Label'] = self.metadata_df['Category'].map(self.category_map) | ||
self.image_names = self.metadata_df['Image_Name'].tolist() | ||
self.labels = self.metadata_df['Numeric_Label'].tolist() | ||
|
||
def __len__(self): | ||
return len(self.image_names) | ||
|
||
def __getitem__(self, index): | ||
file_path = os.path.join(self.images_dir, self.image_names[index]) | ||
img = Image.open(file_path).convert('RGB') | ||
if self.transform: | ||
img = self.transform(img) | ||
label = self.labels[index] | ||
label_as_tensor = torch.tensor(label, dtype=torch.long) | ||
return img, label_as_tensor | ||
|
||
# Define image transformations | ||
image_transforms = transforms.Compose([ | ||
transforms.Resize((224, 224), interpolation=transforms.InterpolationMode.BICUBIC), | ||
transforms.CenterCrop(224), | ||
transforms.ToTensor(), | ||
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) #ImageNet | ||
]) | ||
|
||
# Create an instance of the dataset | ||
image_dataset = ImageDataLoader(image_folder, train_csv_file, category_csv_file, image_transforms) | ||
|
||
# Constants | ||
NUM_EPOCHS = 30 | ||
BATCH_SIZE = 32 | ||
NUM_CLASSES = 100 | ||
LEARNING_RATE = 1e-3 | ||
STEP_SIZE = 20 | ||
GAMMA = 0.5 | ||
|
||
# Dataset sizes | ||
dataset_count = len(image_dataset) | ||
training_count = int(dataset_count) | ||
validation_count = dataset_count - training_count | ||
|
||
# Split dataset | ||
training_data, validation_data = random_split(image_dataset, [training_count, validation_count]) | ||
|
||
# Loaders | ||
training_loader = DataLoader(training_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True) | ||
validation_loader = DataLoader(validation_data, batch_size=BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True) | ||
|
||
# Model configuration | ||
network = models.resnet50(weights='IMAGENET1K_V2') | ||
network.fc = nn.Linear(network.fc.in_features, NUM_CLASSES) | ||
network = nn.DataParallel(network).to(device) | ||
|
||
# Training components | ||
loss_function = nn.CrossEntropyLoss() | ||
optimiser = optim.Adam(network.parameters(), lr=LEARNING_RATE) | ||
lr_scheduler = StepLR(optimiser, step_size=STEP_SIZE, gamma=GAMMA) | ||
precision_scaler = GradScaler() | ||
|
||
# Metrics | ||
training_losses = [] | ||
validation_losses = [] | ||
validation_accuracy = [] | ||
|
||
# Training process | ||
for epoch in range(NUM_EPOCHS): | ||
network.train() | ||
total_loss = 0.0 | ||
|
||
for images, targets in training_loader: | ||
images, targets = images.to(device), targets.to(device) | ||
|
||
optimiser.zero_grad() | ||
|
||
with autocast(): | ||
predictions = network(images) | ||
loss = loss_function(predictions, targets) | ||
|
||
precision_scaler.scale(loss).backward() | ||
precision_scaler.step(optimiser) | ||
precision_scaler.update() | ||
|
||
total_loss += loss.item() | ||
|
||
average_train_loss = total_loss / len(training_loader) | ||
print(f'Epoch {epoch + 1}/{NUM_EPOCHS} - Train Loss: {average_train_loss:.2f}') | ||
training_losses.append(average_train_loss) | ||
|
||
class FacialRecognitionDataset(Dataset): | ||
def __init__(self, directory, img_transform=None): | ||
self.directory = directory | ||
self.img_transform = img_transform | ||
self.image_files = sorted([f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]) | ||
self.detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | ||
|
||
def __len__(self): | ||
return len(self.image_files) | ||
|
||
def __getitem__(self, index): | ||
file_path = os.path.join(self.directory, self.image_files[index]) | ||
pil_image = Image.open(file_path).convert('RGB') | ||
cv_image = np.array(pil_image)[:, :, ::-1] # Convert RGB to BGR for OpenCV | ||
|
||
gray_image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) | ||
detected_faces = self.detector.detectMultiScale(gray_image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) | ||
|
||
if detected_faces: | ||
x, y, width, height = detected_faces[0] | ||
cropped_face = cv_image[y:y+height, x:x+width] | ||
final_image = Image.fromarray(cropped_face[:, :, ::-1]) # Convert BGR back to RGB | ||
else: | ||
final_image = pil_image | ||
|
||
if self.img_transform: | ||
final_image = self.img_transform(final_image) | ||
|
||
return final_image, self.image_files[index] | ||
|
||
zip_path = '/content/drive/My Drive/test.zip' | ||
extract_to = '/content/test/' | ||
|
||
os.makedirs(extract_to, exist_ok=True) | ||
|
||
# Unzip the file | ||
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | ||
zip_ref.extractall(extract_to) | ||
|
||
print("Files extracted successfully!") | ||
|
||
# Setup dataset and loader | ||
dataset_directory = '/content/test' | ||
face_dataset = FacialRecognitionDataset(dataset_directory, image_transforms) | ||
face_loader = DataLoader(face_dataset, batch_size=1, shuffle=False) | ||
|
||
# Model to evaluation mode | ||
model.eval() | ||
all_predictions = [] | ||
all_filenames = [] | ||
|
||
# Processing images | ||
with torch.no_grad(): | ||
for data, names in face_loader: | ||
data = data.to(device) | ||
model_outputs = model(data) | ||
_, predicted_indices = torch.max(model_outputs, 1) | ||
predicted_categories = [index_to_category[index.item()] for index in predicted_indices] | ||
|
||
all_predictions.extend(predicted_categories) | ||
all_filenames.extend(names) | ||
|
||
# Generating CSV file | ||
results_df = pd.DataFrame({'Id': all_filenames, 'Category': all_predictions}) | ||
results_df['Id'] = results_df['Id'].str.extract('(\d+)').astype(int) | ||
results_df.sort_values('Id', inplace=True) | ||
results_df.to_csv('/content/drive/MyDrive/predictions.csv', index=False) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
# -*- coding: utf-8 -*- | ||
"""preprocessing.ipynb | ||
Automatically generated by Colab. | ||
Original file is located at | ||
https://colab.research.google.com/drive/1zHp3b8TUG2thkU3npcljql5mBDNpk63H | ||
""" | ||
|
||
import numpy as np | ||
import cv2 | ||
import os | ||
from PIL import Image | ||
import zipfile | ||
import shutil | ||
|
||
from google.colab import drive | ||
drive.mount('/content/drive') | ||
|
||
# download train.zip file | ||
zip_path = '/content/drive/My Drive/train.zip' | ||
extract_to = '/content/train/' | ||
|
||
os.makedirs(extract_to, exist_ok=True) | ||
|
||
# unzip the file | ||
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | ||
zip_ref.extractall(extract_to) | ||
|
||
print("Files extracted successfully!") | ||
|
||
def cropImg(imgPath): | ||
""" | ||
Crop the image to only include the face detected using OpenCV's Haar Cascades. | ||
Parameters: | ||
imgPath (str): The path to the input image file. | ||
Returns: | ||
np.array: Cropped face area as a numpy array in BGR format, or the original image if no face is detected. | ||
""" | ||
faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | ||
try: | ||
image = Image.open(imgPath).convert('RGB') | ||
except Exception as e: | ||
print(f"PIL error: {e}") | ||
return None | ||
|
||
# Convert PIL Image to numpy array for OpenCV to process | ||
open_cv_image = np.array(image) | ||
# Convert RGB to BGR for OpenCV | ||
open_cv_image = open_cv_image[:, :, ::-1].copy() | ||
|
||
gray = cv2.cvtColor(open_cv_image, cv2.COLOR_BGR2GRAY) | ||
faces = faceCascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) | ||
|
||
if len(faces) > 0: | ||
x, y, w, h = faces[0] | ||
face = open_cv_image[y:y+h, x:x+w] | ||
return face # Return numpy array in BGR format | ||
else: | ||
return open_cv_image # Return the original image in BGR format as a numpy array | ||
|
||
def normalize_data(data): | ||
""" | ||
Normalize the input image data from BGR format to a 0-1 range in RGB format, improving model performance. | ||
Parameters: | ||
data (np.array): Input BGR image data to be normalized. | ||
Returns: | ||
np.array: Normalized data array with values between 0 and 1 in RGB format. | ||
""" | ||
data = data[:, :, ::-1] # Convert BGR to RGB | ||
return data / 255.0 | ||
|
||
def process_images(image_folder, output_folder): | ||
""" | ||
Process each image in the specified folder: crop to face, normalize, and save to output folder. | ||
Parameters: | ||
image_folder (str): Folder containing the images to process. | ||
output_folder (str): Folder to save processed images. | ||
""" | ||
count = 0 | ||
# Ensure the output directory exists | ||
if not os.path.exists(output_folder): | ||
os.makedirs(output_folder) | ||
|
||
for filename in os.listdir(image_folder): | ||
if filename.lower().endswith(('.png', '.jpg', '.jpeg')): # Check for image files | ||
file_path = os.path.join(image_folder, filename) | ||
cropped_face = cropImg(file_path) | ||
|
||
if cropped_face is not None: | ||
normalized_image = normalize_data(cropped_face) | ||
output_path = os.path.join(output_folder, filename) | ||
cv2.imwrite(output_path, normalized_image * 255) # Convert back to 0-255 range | ||
count += 1 | ||
if count % 100 == 1: | ||
print(f"Processed {count} images.") | ||
else: | ||
print(f"Skipping file due to loading error: {filename}") | ||
|
||
print("Processing complete.") | ||
|
||
# Paths for the image directories | ||
image_folder = '/content/train/train' | ||
output_folder = '/content/processed' | ||
|
||
# Run the processing function | ||
process_images(image_folder, output_folder) | ||
|
||
# compress the processed train data to zip file | ||
!zip -r /content/processed.zip /content/processed | ||
|
||
# upload the processed train data to google drive to avoid running preprocessing step again | ||
source = '/content/processed.zip' | ||
destination= '/content/drive/MyDrive/processed.zip' | ||
shutil.move(source, destination) |