From 197a9b06df9c7b60ec9a753eb1b99eb0a5d880c4 Mon Sep 17 00:00:00 2001 From: "Park, Hyun Soo" Date: Fri, 12 Apr 2024 03:14:42 -0400 Subject: [PATCH] Add files via upload --- final.py | 246 +++++++++++++++++++++++++++++++++++++++++++++++ preprocessing.py | 120 +++++++++++++++++++++++ 2 files changed, 366 insertions(+) create mode 100644 final.py create mode 100644 preprocessing.py diff --git a/final.py b/final.py new file mode 100644 index 0000000..4c08041 --- /dev/null +++ b/final.py @@ -0,0 +1,246 @@ +# -*- coding: utf-8 -*- +"""final + +Automatically generated by Colab. + +Original file is located at + https://colab.research.google.com/drive/10iQhiCiRcuiDJvoNggByaX8XUMEDpdi7 +""" + +import torch +from torchvision import transforms +from torch.utils.data import random_split +from torch.utils.data import Dataset +from torch.utils.data import DataLoader +from PIL import Image + +import zipfile +import os +import pandas as pd +import numpy as np +import cv2 +import matplotlib.pyplot as plt + +!pip install facenet-pytorch + +device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') +print(f'Using device : {device}') + +from google.colab import drive +drive.mount('/content/drive') + +# download processed.zip file +zip_path = '/content/drive/My Drive/processed.zip' +extract_to = '/content/processed/' + +os.makedirs(extract_to, exist_ok=True) + +# unzip the file +with zipfile.ZipFile(zip_path, 'r') as zip_ref: + zip_ref.extractall(extract_to) + +print("Files extracted successfully!") + +# match the index for the preprocessed train data +category_csv_file = '/content/drive/MyDrive/category.csv' +train_csv_file = '/content/drive/MyDrive/train.csv' +image_folder = '/content/processed' + +def map_category(category_csv_file): + category_df = pd.read_csv(category_csv_file) + category_to_index = {category: idx for idx, category in enumerate(category_df['Category'])} + index_to_category = {idx: category for category, idx in category_to_index.items()} + return category_to_index, index_to_category + +category_to_index, index_to_category = map_category(category_csv_file) +category_df = pd.read_csv(category_csv_file) +train_df = pd.read_csv(train_csv_file) + +train_df['exists'] = train_df['File Name'].apply(lambda x: os.path.exists(os.path.join(image_folder, x))) +train_df = train_df[train_df['exists']] +train_df['Label'] = train_df['Category'].map(category_to_index) + +filenames = train_df['File Name'].tolist() +labels = train_df['Label'].tolist() + +# Choose an index to check +index_to_check = 1 + +# Check if the index is valid +if index_to_check < len(filenames): + img_path = os.path.join(image_folder, filenames[index_to_check]) + image = Image.open(img_path).convert('RGB') + plt.imshow(image) + plt.show() + + label = labels[index_to_check] + label_tensor = torch.tensor(label, dtype=torch.long) + print(label_tensor, filenames[index_to_check]) +else: + print(f"Index {index_to_check} is out of range for the number of files.") + +class ImageDataLoader(Dataset): + def __init__(self, images_directory, metadata_file_path, categories_file, transformation=None): + self.images_dir = images_directory + self.transform = transformation + self.metadata_df = pd.read_csv(metadata_file_path) + self.metadata_df['file_exists'] = self.metadata_df['Image_Name'].apply( + lambda name: os.path.isfile(os.path.join(images_directory, name)) + ) + self.metadata_df = self.metadata_df[self.metadata_df['file_exists']] + categories_df = pd.read_csv(categories_file) + self.category_map = {cat: index for index, cat in enumerate(categories_df['Category_Name'])} + self.metadata_df['Numeric_Label'] = self.metadata_df['Category'].map(self.category_map) + self.image_names = self.metadata_df['Image_Name'].tolist() + self.labels = self.metadata_df['Numeric_Label'].tolist() + + def __len__(self): + return len(self.image_names) + + def __getitem__(self, index): + file_path = os.path.join(self.images_dir, self.image_names[index]) + img = Image.open(file_path).convert('RGB') + if self.transform: + img = self.transform(img) + label = self.labels[index] + label_as_tensor = torch.tensor(label, dtype=torch.long) + return img, label_as_tensor + +# Define image transformations +image_transforms = transforms.Compose([ + transforms.Resize((224, 224), interpolation=transforms.InterpolationMode.BICUBIC), + transforms.CenterCrop(224), + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) #ImageNet +]) + +# Create an instance of the dataset +image_dataset = ImageDataLoader(image_folder, train_csv_file, category_csv_file, image_transforms) + +# Constants +NUM_EPOCHS = 30 +BATCH_SIZE = 32 +NUM_CLASSES = 100 +LEARNING_RATE = 1e-3 +STEP_SIZE = 20 +GAMMA = 0.5 + +# Dataset sizes +dataset_count = len(image_dataset) +training_count = int(dataset_count) +validation_count = dataset_count - training_count + +# Split dataset +training_data, validation_data = random_split(image_dataset, [training_count, validation_count]) + +# Loaders +training_loader = DataLoader(training_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, pin_memory=True) +validation_loader = DataLoader(validation_data, batch_size=BATCH_SIZE, shuffle=False, num_workers=4, pin_memory=True) + +# Model configuration +network = models.resnet50(weights='IMAGENET1K_V2') +network.fc = nn.Linear(network.fc.in_features, NUM_CLASSES) +network = nn.DataParallel(network).to(device) + +# Training components +loss_function = nn.CrossEntropyLoss() +optimiser = optim.Adam(network.parameters(), lr=LEARNING_RATE) +lr_scheduler = StepLR(optimiser, step_size=STEP_SIZE, gamma=GAMMA) +precision_scaler = GradScaler() + +# Metrics +training_losses = [] +validation_losses = [] +validation_accuracy = [] + +# Training process +for epoch in range(NUM_EPOCHS): + network.train() + total_loss = 0.0 + + for images, targets in training_loader: + images, targets = images.to(device), targets.to(device) + + optimiser.zero_grad() + + with autocast(): + predictions = network(images) + loss = loss_function(predictions, targets) + + precision_scaler.scale(loss).backward() + precision_scaler.step(optimiser) + precision_scaler.update() + + total_loss += loss.item() + + average_train_loss = total_loss / len(training_loader) + print(f'Epoch {epoch + 1}/{NUM_EPOCHS} - Train Loss: {average_train_loss:.2f}') + training_losses.append(average_train_loss) + +class FacialRecognitionDataset(Dataset): + def __init__(self, directory, img_transform=None): + self.directory = directory + self.img_transform = img_transform + self.image_files = sorted([f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]) + self.detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') + + def __len__(self): + return len(self.image_files) + + def __getitem__(self, index): + file_path = os.path.join(self.directory, self.image_files[index]) + pil_image = Image.open(file_path).convert('RGB') + cv_image = np.array(pil_image)[:, :, ::-1] # Convert RGB to BGR for OpenCV + + gray_image = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) + detected_faces = self.detector.detectMultiScale(gray_image, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) + + if detected_faces: + x, y, width, height = detected_faces[0] + cropped_face = cv_image[y:y+height, x:x+width] + final_image = Image.fromarray(cropped_face[:, :, ::-1]) # Convert BGR back to RGB + else: + final_image = pil_image + + if self.img_transform: + final_image = self.img_transform(final_image) + + return final_image, self.image_files[index] + +zip_path = '/content/drive/My Drive/test.zip' +extract_to = '/content/test/' + +os.makedirs(extract_to, exist_ok=True) + +# Unzip the file +with zipfile.ZipFile(zip_path, 'r') as zip_ref: + zip_ref.extractall(extract_to) + +print("Files extracted successfully!") + +# Setup dataset and loader +dataset_directory = '/content/test' +face_dataset = FacialRecognitionDataset(dataset_directory, image_transforms) +face_loader = DataLoader(face_dataset, batch_size=1, shuffle=False) + +# Model to evaluation mode +model.eval() +all_predictions = [] +all_filenames = [] + +# Processing images +with torch.no_grad(): + for data, names in face_loader: + data = data.to(device) + model_outputs = model(data) + _, predicted_indices = torch.max(model_outputs, 1) + predicted_categories = [index_to_category[index.item()] for index in predicted_indices] + + all_predictions.extend(predicted_categories) + all_filenames.extend(names) + +# Generating CSV file +results_df = pd.DataFrame({'Id': all_filenames, 'Category': all_predictions}) +results_df['Id'] = results_df['Id'].str.extract('(\d+)').astype(int) +results_df.sort_values('Id', inplace=True) +results_df.to_csv('/content/drive/MyDrive/predictions.csv', index=False) \ No newline at end of file diff --git a/preprocessing.py b/preprocessing.py new file mode 100644 index 0000000..fefbe8c --- /dev/null +++ b/preprocessing.py @@ -0,0 +1,120 @@ +# -*- coding: utf-8 -*- +"""preprocessing.ipynb + +Automatically generated by Colab. + +Original file is located at + https://colab.research.google.com/drive/1zHp3b8TUG2thkU3npcljql5mBDNpk63H +""" + +import numpy as np +import cv2 +import os +from PIL import Image +import zipfile +import shutil + +from google.colab import drive +drive.mount('/content/drive') + +# download train.zip file +zip_path = '/content/drive/My Drive/train.zip' +extract_to = '/content/train/' + +os.makedirs(extract_to, exist_ok=True) + +# unzip the file +with zipfile.ZipFile(zip_path, 'r') as zip_ref: + zip_ref.extractall(extract_to) + +print("Files extracted successfully!") + +def cropImg(imgPath): + """ + Crop the image to only include the face detected using OpenCV's Haar Cascades. + + Parameters: + imgPath (str): The path to the input image file. + + Returns: + np.array: Cropped face area as a numpy array in BGR format, or the original image if no face is detected. + """ + faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') + try: + image = Image.open(imgPath).convert('RGB') + except Exception as e: + print(f"PIL error: {e}") + return None + + # Convert PIL Image to numpy array for OpenCV to process + open_cv_image = np.array(image) + # Convert RGB to BGR for OpenCV + open_cv_image = open_cv_image[:, :, ::-1].copy() + + gray = cv2.cvtColor(open_cv_image, cv2.COLOR_BGR2GRAY) + faces = faceCascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) + + if len(faces) > 0: + x, y, w, h = faces[0] + face = open_cv_image[y:y+h, x:x+w] + return face # Return numpy array in BGR format + else: + return open_cv_image # Return the original image in BGR format as a numpy array + +def normalize_data(data): + """ + Normalize the input image data from BGR format to a 0-1 range in RGB format, improving model performance. + + Parameters: + data (np.array): Input BGR image data to be normalized. + + Returns: + np.array: Normalized data array with values between 0 and 1 in RGB format. + """ + data = data[:, :, ::-1] # Convert BGR to RGB + return data / 255.0 + +def process_images(image_folder, output_folder): + """ + Process each image in the specified folder: crop to face, normalize, and save to output folder. + + Parameters: + image_folder (str): Folder containing the images to process. + output_folder (str): Folder to save processed images. + """ + count = 0 + # Ensure the output directory exists + if not os.path.exists(output_folder): + os.makedirs(output_folder) + + for filename in os.listdir(image_folder): + if filename.lower().endswith(('.png', '.jpg', '.jpeg')): # Check for image files + file_path = os.path.join(image_folder, filename) + cropped_face = cropImg(file_path) + + if cropped_face is not None: + normalized_image = normalize_data(cropped_face) + output_path = os.path.join(output_folder, filename) + cv2.imwrite(output_path, normalized_image * 255) # Convert back to 0-255 range + count += 1 + if count % 100 == 1: + print(f"Processed {count} images.") + else: + print(f"Skipping file due to loading error: {filename}") + + print("Processing complete.") + +# Paths for the image directories +image_folder = '/content/train/train' +output_folder = '/content/processed' + +# Run the processing function +process_images(image_folder, output_folder) + +# compress the processed train data to zip file +!zip -r /content/processed.zip /content/processed + +# upload the processed train data to google drive to avoid running preprocessing step again +source = '/content/processed.zip' +destination= '/content/drive/MyDrive/processed.zip' +shutil.move(source, destination) \ No newline at end of file