diff --git a/README.md b/README.md index 5927087..fc4ad71 100644 --- a/README.md +++ b/README.md @@ -37,4 +37,4 @@ pip install -U pip # Install the webqueue2-api package in editable mode with all extra packages. pip install -e .[all] ``` -4. Make changes and create a PR. \ No newline at end of file +4. Make changes and create a PR. diff --git a/setup.py b/setup.py index b22ce74..83a43c5 100644 --- a/setup.py +++ b/setup.py @@ -8,8 +8,8 @@ "docs": [ "mkdocs", "mkdocs-material", - "mkdocs-awesome-pages-plugin", - "mkdocs-macros-plugin" + "mkautodoc", + "mkdocs-awesome-pages-plugin" ], } @@ -21,11 +21,12 @@ def get_all_dependencies(): return dependencies setup( - name="webqueue2_api", + name="webqueue2api", version="0.9.1", description="A library for managing Purdue ECN's queue system.", python_requires='>=3.6', - packages=find_packages(), + packages=find_packages(where="src"), + package_dir={"": "src"}, install_requires = [ "gunicorn", "python-dotenv", @@ -37,6 +38,7 @@ def get_all_dependencies(): # Custom version of python-ldap without SASL requirements "python-ldap @ git+https://github.itap.purdue.edu/ECN/python-ldap/@python-ldap-3.3.1", "easyad", + "dataclasses" ], extras_require={ "dev": conditional_dependencies["dev"], diff --git a/src/webqueue2api/__init__.py b/src/webqueue2api/__init__.py new file mode 100644 index 0000000..ba7b477 --- /dev/null +++ b/src/webqueue2api/__init__.py @@ -0,0 +1,2 @@ +from webqueue2api.parser import Item, Queue, load_queues +from .config import config \ No newline at end of file diff --git a/src/webqueue2api/api/__init__.py b/src/webqueue2api/api/__init__.py new file mode 100644 index 0000000..a78f240 --- /dev/null +++ b/src/webqueue2api/api/__init__.py @@ -0,0 +1,5 @@ +# WSGI App +from .app import app + +# Configuration +from .config import config \ No newline at end of file diff --git a/src/webqueue2api/api/app.py b/src/webqueue2api/api/app.py new file mode 100644 index 0000000..fdc00a9 --- /dev/null +++ b/src/webqueue2api/api/app.py @@ -0,0 +1,37 @@ +from flask import Flask +from flask_restful import Api +from flask_jwt_extended import JWTManager +from .config import config +from .resources import Login, RefreshAccessToken, Item, Queue, QueueList + +app = Flask(__name__) +api = Api(app) + +# Set JWT secret key and create JWT manager +app.config["JWT_SECRET_KEY"] = config.jwt_secret_key +# The JWT RFC uses the "sub" key for identity claims. However, +# Flask-JWT-Extended uses "identity" by default for compatibility reasons so +# we ovverride the default claim key to comply with the RFC +app.config["JWT_IDENTITY_CLAIM"] = "sub" +# Set the key for error messages generated by Flask-JWT-Extended +app.config["JWT_ERROR_MESSAGE_KEY"] = "message" + +# Look for JWTs in headers (for access) then cookies (for refresh) +app.config["JWT_TOKEN_LOCATION"] = ["headers", "cookies"] +# Restrict cookies to HTTPS in prod, allow HTTP in dev +app.config["JWT_COOKIE_SECURE"] = False if config.jwt_secret_key == "dev" else True +# Restrict cookies using SameSite=strict flag +app.config["JWT_COOKIE_SAMESITE"] = "strict" +# Restrict refresh tokens to /token/refresh endpoint +app.config["JWT_REFRESH_COOKIE_PATH"] = '/tokens/refresh' +# Set the cookie key for CRSF validation string +# This is the default value. Adding it for easy reference +app.config["JWT_REFRESH_CSRF_HEADER_NAME"] = "X-CSRF-TOKEN" + +tokenManager = JWTManager(app) + +api.add_resource(Login, "/api/login") +api.add_resource(RefreshAccessToken, "/api/tokens/refresh") +api.add_resource(Item, "/api/data//") +api.add_resource(Queue, "/api/data/") +api.add_resource(QueueList, "/api/data/get_queues") \ No newline at end of file diff --git a/src/webqueue2api/api/auth.py b/src/webqueue2api/api/auth.py new file mode 100644 index 0000000..02e5158 --- /dev/null +++ b/src/webqueue2api/api/auth.py @@ -0,0 +1,55 @@ +from easyad import EasyAD +from ldap.filter import escape_filter_chars +# pylint says this is an error but it works so ¯\_(ツ)_/¯ +from ldap import INVALID_CREDENTIALS as LDAP_INVALID_CREDENTIALS + + + +def user_is_valid(username: str, password: str) -> bool: + """Checks if user is valid and in webqueue2 login group. + + Args: + username (str): Career account username. + password (str): Career account passphrase. + + Returns: + bool: True if user is valid, otherwise False. + """ + + # Check for empty arguments + if (username == "" or password == ""): + return False + + # Initialize EasyAD + config = { + "AD_SERVER": "boilerad.purdue.edu", + "AD_DOMAIN": "boilerad.purdue.edu" + } + ad = EasyAD(config) + + # Prepare search critiera for Active Directory + credentials = { + "username": escape_filter_chars(username), + "password": password + } + attributes = [ 'cn', "memberOf" ] + filter_string = f'(&(objectClass=user)(|(sAMAccountName={username})))' + + # Do user search + try: + user = ad.search(credentials=credentials, attributes=attributes, filter_string=filter_string)[0] + except LDAP_INVALID_CREDENTIALS: + return False + + # Isolate group names + # Example: + # 'CN=00000227-ECNStuds,OU=BoilerADGroups,DC=BoilerAD,DC=Purdue,DC=edu' becomes + # `00000227-ECNStuds` + user_groups = [ group.split(',')[0].split('=')[1] for group in user["memberOf"] ] + + # Check group membership + webqueue_login_group = "00000227-ECN-webqueue" + if webqueue_login_group not in user_groups: + return False + + return True \ No newline at end of file diff --git a/src/webqueue2api/api/config.py b/src/webqueue2api/api/config.py new file mode 100644 index 0000000..d16f099 --- /dev/null +++ b/src/webqueue2api/api/config.py @@ -0,0 +1,39 @@ +"""Stores API configuartion data.""" +from dataclasses import dataclass +import random, string + + + +def generate_random_string(length=16) -> str: + """Generate random string of letters and numbers of specified length. + + Example: + generate_random_string() -> "aud04ki947rrje3k9" + + Args: + length (int, optional): Number of characters to generate. Defaults to 16. + + Returns: + str: Random string. + """ + possible_characters = string.ascii_letters + string.digits + "!@#$%^&*" + random_string = '' + for number in range(length): + random_string += random.choice(possible_characters) + return random_string + +@dataclass +class Configuraton: + """Stores API configuration. + + Args: + jwt_secret_key (str): The key used to confirm JWT validity. + environment (str): The type of environment to run in. "prod" or "dev" + """ + jwt_secret_key: str + environment: str + +config = Configuraton( + jwt_secret_key = generate_random_string(), + environment = "prod" +) \ No newline at end of file diff --git a/src/webqueue2api/api/resources/__init__.py b/src/webqueue2api/api/resources/__init__.py new file mode 100644 index 0000000..c8d303a --- /dev/null +++ b/src/webqueue2api/api/resources/__init__.py @@ -0,0 +1,5 @@ +from .login import Login +from .refresh_access_token import RefreshAccessToken +from .item import Item +from .queue import Queue +from .queue_list import QueueList \ No newline at end of file diff --git a/src/webqueue2api/api/resources/item.py b/src/webqueue2api/api/resources/item.py new file mode 100644 index 0000000..9f1b147 --- /dev/null +++ b/src/webqueue2api/api/resources/item.py @@ -0,0 +1,48 @@ +from flask import request +from flask_restful import Resource +from flask_jwt_extended import jwt_required +# To avoid naming conflicts +from webqueue2api.parser import Item as _Item +from webqueue2api.parser.errors import ItemDoesNotExistError + +class Item(Resource): + @jwt_required + def get(self, queue: str, number: int) -> tuple: + """Returns the JSON representation of the item requested. + + Return Codes: + 200 (OK): On success. + 404 (Not Found): When an Item does not exist. + + Example: + /api/ce/100 returns: + { + "lastUpdated": "07-23-20 10:11 PM", + "headers": [...], + "content": [...], + "isLocked": "ce 100 is locked by knewell using qvi", + "userEmail": "campb303@purdue.edu", + "userName": "Justin Campbell", + "userAlias": "campb303", + "assignedTo": "campb303", + "subject": "Beepboop", + "status": "Dont Delete", + "priority": "", + "deparment": "", + "building": "", + "dateReceived": "Tue, 23 Jun 2020 13:25:51 -0400" + } + + Args: + queue (str): The queue of the item requested. + item (int): The number of the item requested. + + Returns: + tuple: Item as JSON and HTTP response code. + """ + headers_only = True if request.args.get("headers_only") == "True" else False + + try: + return (_Item(queue, number, headers_only=headers_only).to_json(), 200) + except ItemDoesNotExistError: + return ({"message": f"Item {queue}{number} not found."}, 404) \ No newline at end of file diff --git a/src/webqueue2api/api/resources/login.py b/src/webqueue2api/api/resources/login.py new file mode 100644 index 0000000..29430f8 --- /dev/null +++ b/src/webqueue2api/api/resources/login.py @@ -0,0 +1,51 @@ +from flask import request, after_this_request +from flask_restful import Resource +from flask_jwt_extended import create_access_token, create_refresh_token, set_refresh_cookies +from ..auth import user_is_valid + + + +class Login(Resource): + def post(self) -> tuple: + """Validates username/password, sets refresh token cookie and returns access token. + + Return Codes: + 200 (OK): On success. + 401 (Unauthroized): When username or password are incorrect. + 422 (Unprocessable Entitiy): When the username or password can't be parsed. + + Example: + curl -X POST + -H "Content-Type: application/json" + -d '{"username": "bob", "password": "super_secret"}' + + { "access_token": fjr09hfp09h932jp9ruj3.3r8ihf8h0w8hr08ifhj804h8i.8h48ith08ity409hip0t4 } + + Returns: + tuple: Response containing tokens and HTTP response code. + """ + if not request.is_json: + return ({ "message": "JSON missing from request body"}, 422) + + data = request.json + + fields_to_check = ["username", "password"] + for field in fields_to_check: + if field not in data.keys(): + return ({ "message": f"{field} missing from request body"}, 422) + + if not user_is_valid(data["username"], data["password"]): + return ({ "message": "Username or password is invalid"}, 401) + + access_token = create_access_token(data["username"]) + refresh_token = create_refresh_token(data["username"]) + + # This decorator is needed because Flask-RESTful's 'resourceful routing` + # doesn't allow for direct modification to the Flask response object. + # See: https://flask-restful.readthedocs.io/en/latest/quickstart.html#resourceful-routing + @after_this_request + def set_refresh_cookie_callback(response): + set_refresh_cookies(response, refresh_token) + return response + + return ({ "access_token": access_token }, 200) \ No newline at end of file diff --git a/src/webqueue2api/api/resources/queue.py b/src/webqueue2api/api/resources/queue.py new file mode 100644 index 0000000..f79c041 --- /dev/null +++ b/src/webqueue2api/api/resources/queue.py @@ -0,0 +1,33 @@ +from flask import request +from flask_restful import Resource +from flask_jwt_extended import jwt_required +# To avoid naming conflicts +from webqueue2api.parser import Queue as _Queue +from webqueue2api.parser.errors import QueueDoesNotExistError + + + +class Queue(Resource): + @jwt_required + def get(self, queues: str) -> tuple: + """Returns the JSON representation of the queue requested. + + Return Codes: + 200 (OK): On success. + 404 (Not Found): When a Queue does not exist. + Args: + queues (str): Plus (+) deliminited list of queues. + + Returns: + tuple: Queues as JSON and HTTP response code. + """ + queues_requested = queues.split("+") + headers_only = False if request.args.get("headers_only") == "False" else True + + try: + queue_list = [] + for queue in queues_requested: + queue_list.append(_Queue(queue, headers_only=headers_only).to_json()) + return (queue_list, 200) + except QueueDoesNotExistError: + return ({"message": f"Queue {queue} not found."}, 404) \ No newline at end of file diff --git a/src/webqueue2api/api/resources/queue_list.py b/src/webqueue2api/api/resources/queue_list.py new file mode 100644 index 0000000..46b7af2 --- /dev/null +++ b/src/webqueue2api/api/resources/queue_list.py @@ -0,0 +1,29 @@ +from flask_restful import Resource +from flask_jwt_extended import jwt_required +from webqueue2api.parser import get_queue_counts + + +class QueueList(Resource): + @jwt_required + def get(self) -> tuple: + """Returns a list of dictionaries with the number of items in each queue. + + Return Codes: + 200 (OK): On success. + + Example: + [ + { + name: "me", + number_of_items: 42 + }, + { + name: "bidc", + number_of_items: 3 + } + ] + + Returns: + tuple: Queues and item counts as JSON and HTTP response code. + """ + return (get_queue_counts(), 200) \ No newline at end of file diff --git a/src/webqueue2api/api/resources/refresh_access_token.py b/src/webqueue2api/api/resources/refresh_access_token.py new file mode 100644 index 0000000..4a60081 --- /dev/null +++ b/src/webqueue2api/api/resources/refresh_access_token.py @@ -0,0 +1,9 @@ +from flask_restful import Resource +from flask_jwt_extended import jwt_refresh_token_required, get_jwt_identity, create_access_token + +class RefreshAccessToken(Resource): + @jwt_refresh_token_required + def post(self): + username = get_jwt_identity() + access_token = create_access_token(username) + return ({"access_token": access_token}, 200) \ No newline at end of file diff --git a/src/webqueue2api/config.py b/src/webqueue2api/config.py new file mode 100644 index 0000000..20da74f --- /dev/null +++ b/src/webqueue2api/config.py @@ -0,0 +1,13 @@ +from dataclasses import dataclass +import webqueue2api.parser.config +import webqueue2api.api.config + +@dataclass +class Configuration: + parser: dataclass + api: dataclass + +config = Configuration( + parser = webqueue2api.parser.config, + api = webqueue2api.api.config +) \ No newline at end of file diff --git a/src/webqueue2api/parser/__init__.py b/src/webqueue2api/parser/__init__.py new file mode 100644 index 0000000..3d312e7 --- /dev/null +++ b/src/webqueue2api/parser/__init__.py @@ -0,0 +1,9 @@ +# Classes +from .item import Item +from .queue import Queue + +# Utilities +from .queue import load_queues, get_queue_counts + +# Configuration +from .config import config \ No newline at end of file diff --git a/src/webqueue2api/parser/config.py b/src/webqueue2api/parser/config.py new file mode 100644 index 0000000..882ef68 --- /dev/null +++ b/src/webqueue2api/parser/config.py @@ -0,0 +1,18 @@ +"""Stores parser configuartion data.""" +from dataclasses import dataclass + +@dataclass +class Configuraton: + """Stores parser configuration. + + Args: + queue_directory (str): The absolute file path to the queues directory. + queues_to_ignore (list): List of queues to ignore when loading all queues. + """ + queue_directory: str + queues_to_ignore: list + +config = Configuraton( + queue_directory = "/home/pier/e/queue/Mail", + queues_to_ignore = ["archives", "drafts", "inbox", "coral"] +) \ No newline at end of file diff --git a/src/webqueue2api/parser/errors.py b/src/webqueue2api/parser/errors.py new file mode 100644 index 0000000..ef1ac48 --- /dev/null +++ b/src/webqueue2api/parser/errors.py @@ -0,0 +1,9 @@ +class ItemDoesNotExistError(Exception): + def __init__(self, path: str): + self.message = f"File {path} not found." + super().__init__(self.message) + +class QueueDoesNotExistError(Exception): + def __init__(self, path: str): + self.message = f"Directory {path} not found." + super().__init__(self.message) \ No newline at end of file diff --git a/src/webqueue2api/parser/item.py b/src/webqueue2api/parser/item.py new file mode 100644 index 0000000..27ea817 --- /dev/null +++ b/src/webqueue2api/parser/item.py @@ -0,0 +1,1206 @@ +import os +import time +import email +import re +import datetime +from dateutil.parser import parse +from dateutil import tz +from typing import Union +from pathlib import Path +from .config import config +from .errors import ItemDoesNotExistError + + + +#------------------------------------------------------------------------------# +# Classes +#------------------------------------------------------------------------------# +class Item: + """A single issue. + + Args: + queue (str): The name of the Item's queue. + number (int): The number of the Item. + headers_only (bool, optional): Whether or not to parse headers only. Defaults to False. + + Example: + # Create an Item (ce100) + `item = Item("ce", 100)` + + Attributes: + queue: The name of the queue the Item is in. + number: The number of the Item. + path: The path to the Item on the filesystem. + last_updated: An ISO 8601 formatted time string showing the last time the file was updated according to the filesystem. + headers: A list of dictionaries containing header keys and values. + content: A list of section dictionaries. + is_locked: A boolean showing whether or not a lockfile for the item is present. + user_email: The email address of the person who this item is from. + user_name: The real name of the person who this item is from. + user_alias: The Purdue career account alias of the person this item is from. + assigned_to: The Purdue career account alias of the person this item is assigned to + subject: The subject of the original message for this item. + status: The most recent status update for the item. + priority: The most recent priority for this item. + department: The most recent department for this item. + date_received: The date this item was created. + json_data: A JSON serializable representation of the Item. + + Raises: + ItemDoesNotExistError: If an item does not exist on the filesystem. + """ + + def __init__(self, queue: str, number: int, headers_only: bool = False) -> None: + self.queue = queue + + try: + self.number = int(number) + except ValueError: + raise ValueError(f"Could not convert {number} to an integer") + + self.path = Path(config.queue_directory, self.queue, str(self.number)) + if not self.path.exists(): + raise ItemDoesNotExistError(str(self.path)) + + self.last_updated = self.__get_time_last_updated() + self.__raw_tem = self.__get_raw_item() + self.headers = self.__parse_headers() + if not headers_only: self.content = self.__parseSections() + self.is_locked = self.__check_is_locked() + self.user_email = self.__parse_from_data(data="user_email") + self.user_name = self.__parse_from_data(data="user_name") + self.user_alias = self.__get_user_alias() + self.assigned_to = self.__get_most_recent_header_by_type("Assigned-To") + self.subject = self.__get_most_recent_header_by_type("Subject") + self.status = self.__get_most_recent_header_by_type("Status") + self.priority = self.__get_most_recent_header_by_type("Priority") + self.department = self.__get_most_recent_header_by_type("Department") + self.building = self.__get_most_recent_header_by_type("Building") + self.date_received = self.__get_formatted_date(self.__get_most_recent_header_by_type("Date")) + self.json_data = self.__generate_json_data() + + def __generate_json_data(self) -> dict: + """Generates a JSON serializable data dictionary of class attributes. + + Example: + __generate_json_data() returns: + { + "assigned_to": 'bekuma', + "building": '', + "content": [...], + "date_received": '2020-08-20T22: 06: 08+0000', + "departmen"': '', + "headers": [...], + "is_locked"': False, + "last_updated": '2021-06-04T11: 01: 00-0400', + "number": 1, + "path": '/home/pier/e/queue/Mail/ce/1', + "priority": '', + "queue": 'ce', + "status": 'archiving from zsite', + "subject": 'RE: Updating my computer', + "user_alias": 'govind', + "user_email": 'govind@purdue.edu', + "user_name": 'Govindaraju, Rao S' + } + + Returns: + dict: JSON serializable data dictionary of class attributes. + """ + json_data = {} + + # List of attributes that need processing before being added to json_data + attributes_to_process = ["path"] + for attribute in attributes_to_process: + if attribute == "path": + json_data[attribute] = str(self.path) + + # List of attributes to be ignored + attributes_to_ignore = ["to_json", "json_data"] + attributes_to_process + for attribute in self.__dir__(): + if not attribute.startswith("_") and attribute not in attributes_to_ignore: + json_data[attribute] = self.__getattribute__(attribute) + + return json_data + + def __get_time_last_updated(self) -> str: + """Returns last modified time of item reported by the filesystem in mm-dd-yy hh:mm am/pm format. + + Example: + 07-23-20 10:34 AM + + Returns: + str: last modified time of item reported by the filesystem in mm-dd-yy hh:mm am/pm format. + """ + unix_time = os.path.getmtime(self.path) + formatted_time = time.strftime('%m-%d-%y %I:%M %p', time.localtime(unix_time)) + return self.__get_formatted_date(formatted_time) + + def __get_raw_item(self) -> list: + """Returns a list of all lines in the item file + + Returns: + list: List of all the lines in the item file + """ + with open(self.path, errors="replace") as file: + return file.readlines() + + def __get_header_boundary(self) -> int: + """Returns the 0 based line number where the Item headers stop. + + Example: The header end would be on line 13 + 12: X-ECN-Queue-Original-URL: + 13: + 14: I need help. + + Returns: + int: line number where the Item headers end + """ + for line_number, line in enumerate(self.__raw_tem): + if line == "\n": + return line_number + + def __parse_headers(self) -> list: + """Returns a list containing dictionaries of header type and data. + Removes queue prefixes and whitespace. + + Examples: + "[ce] QStatus: Dont Delete\\nFrom: Justin Campbell \\n" + becomes + [ + {"QStatus": "Don't Delete"}, + {"From": "Justin Campbell "} + ] + + Returns: + list: Header dicts + """ + header_string = "" + + # Remove '[queue] ' prefixes: + # Example: + # [ce] QTime-Updated-By: campb303 becomes + # QTime-Updated-By: campb303 + queue_prefix_pattern = re.compile(r"\[.*?\] {1}") + for line_number in range(self.__get_header_boundary()): + line = self.__raw_tem[line_number] + line_has_queue_prefix = queue_prefix_pattern.match(line) + + if line_has_queue_prefix: + queue_prefix = line[line_has_queue_prefix.regs[0][0]: line_has_queue_prefix.regs[0][1]] + line = line.replace(queue_prefix, "") + + header_string += line + + message = email.message_from_string(header_string) + + headers = [] + date_headers=[ + "QStatus-Updated-Time", + "Status-Updated-Time", + "Edited-Time", + "QTime-Updated-Time", + "Merged-Time", + "Time-Updated-Time", + "Replied-Time", + "Assigned-To-Updated-Time", + "QAssigned-To-Updated-Time", + "Date", + "Sent" + ] + + for key in message.keys(): + headers.append({"type": key, "content": self.__get_formatted_date(message[key]) if key in date_headers else message[key]}) + + return headers + + def __parseSections(self) -> list: + # List of all item events + sections = [] + + contentStart = self.__get_header_boundary() + 1 + contentEnd = len(self.__raw_tem) - 1 + + # List of assignments for the item + assignementLsit = self.__assignmentParsing(contentStart) + + # Appends each assignment individually to sections + for assignment in assignementLsit: + sections.append(assignment) + + # Checks for empty content within an item and returns and + if contentEnd <= contentStart: + blankInitialMessage = self.__initialMessageParsing([""]) + sections.append(blankInitialMessage) + return sections + + # Checks for Directory Identifiers + if self.__raw_tem[contentStart] == "\n" and self.__raw_tem[contentStart + 1].startswith("\t"): + + directoryStartLine = contentStart + 1 + + # Parses the directory information and returns a dictionary of directory values + directoryInfo = self.__directoryParsing(directoryStartLine) + + # Appends Directory Information into the sections array + sections.append(directoryInfo) + + # Sets the initial message start to the next line after all directory lines and newlines + contentStart = contentStart + len(directoryInfo) + 1 + + # The start line, type, and end line for item events + sectionBoundaries = [] + + # Delimiter info + delimiters = [ + {"name": "edit", "pattern": "*** Edited"}, + {"name": "status", "pattern": "*** Status"}, + {"name": "replyToUser", "pattern": "*** Replied"}, + {"name": "replyFromUser", "pattern": "=== "}, + ] + + # Signifies that there is an initial message to parse + initialMessageSection = True + + # Parses the entire contents of the message, stores everything before any delimiter as the initial message + # and the line number of any delimiters as well as the type + for lineNumber in range(contentStart, contentEnd + 1): + + line = self.__raw_tem[lineNumber] + + # Looks for a starting delimiter and explicity excludes the reply-from-user ending delimiter + if (line.startswith("*** Edited by: ") or + line.startswith("*** Replied by: ") or + line.startswith("*** Status updated by: ") or + line == "=== Additional information supplied by user ===\n" and not + line == "===============================================\n" + ): + + # Sets the delimiter type based on the pattern within the delimiters list + for delimiter in delimiters: + + if line.startswith(delimiter["pattern"]): + sectionBoundaries.append( + {"start": lineNumber, "type": delimiter["name"]}) + break + + # If a starting delimiter was encountered, then there is no initial message + if initialMessageSection: + initialMessageSection = False + + elif initialMessageSection == True: + # Delimiter not encountered yet, so append initial message starting line as the current lin number + sectionBoundaries.append( + {"start": lineNumber, "type": "initial_message"}) + initialMessageSection = False + + # Used to set the end line of the last delimiter + sectionBoundaries.append({"start": contentEnd + 1}) + + # Sets the end of the section boundary to the begining of the next section boundary + for boundaryIndex in range(0, len(sectionBoundaries) - 1): + + sectionBoundaries[boundaryIndex]["end"] = sectionBoundaries[boundaryIndex + 1]["start"] + + # Remove End of File boundary since the line number has been assigned to the last delimiter + del sectionBoundaries[-1] + + # Parses through all the boundaries in section boundaries + for boundary in sectionBoundaries: + + # Sets line to the first line of the boundary (which is always the delimiter) + line = self.__raw_tem[boundary["start"]] + + # Returns all of the lines within the current section + sectionContent = self.__raw_tem[boundary["start"]: boundary["end"]] + + # Appends an initial message dictionary to sections + if boundary["type"] == "initial_message": + initialMessageDictionary = self.__initialMessageParsing( + sectionContent) + sections.append(initialMessageDictionary) + + elif boundary["type"] == "edit": + # Returns a dictionary with edit information + editInfo = self.__editParsing( + sectionContent, boundary["start"]) + + # Checks for a parse error and appends it, returning the sections list which stops the parsing + if editInfo["type"] == "parse_error": + sections.append(editInfo) + return self.__getSortedSections(sections) + + # Appends the edit dictionary to sections + sections.append(editInfo) + + elif boundary["type"] == "replyToUser": + # Returns a dictionary with reply-to information + replyToInfo = self.__replyToParsing( + sectionContent, boundary["start"]) + + # Checks for a parse error and appends it, returning the sections list which stops the parsing + if replyToInfo["type"] == "parse_error": + sections.append(replyToInfo) + return self.__getSortedSections(sections) + + # Appends the reply-to to sections + sections.append(replyToInfo) + + elif boundary["type"] == "status": + # Returns a dictionary with status information + statusInfo = self.__statusParsing( + sectionContent, boundary["start"]) + + if statusInfo["type"] == "parse_error": + sections.append(statusInfo) + return self.__getSortedSections(sections) + + # Appends the status to sections + sections.append(statusInfo) + + elif boundary["type"] == "replyFromUser": + # Returns a dictionary with userReply information + replyFromInfo = self.__userReplyParsing( + sectionContent, boundary["start"]) + + if replyFromInfo["type"] == "parse_error": + sections.append(replyFromInfo) + return self.__getSortedSections(sections) + + # Appends the replyFrom to sections + sections.append(replyFromInfo) + + sortedSections = self.__getSortedSections(sections) + + return sortedSections + # return sections + + def __directoryParsing(self, directoryStartLine: int) -> dict: + """Returns a dictionary with directory information + + Example: + Name: Nestor Fabian Rodriguez Buitrago + Login: rodri563 + Computer: ce-205-38 (128.46.205.67) + Location: HAMP G230 + Email: rodri563@purdue.edu + Phone: 7654766893 + Office: HAMP G230 + UNIX Dir: /home/bridge/b/rodri563 + Zero Dir: U=\\bridge.ecn.purdue.edu\rodri563 + User ECNDB: http://eng.purdue.edu/jump/2e8399a + Host ECNDB: http://eng.purdue.edu/jump/2e83999 + Subject: Autocad installation + + Args: + directoryStartLine (int): line number within the item that the directory starts on + + Returns: + dict: dictionary that splits each line within the directory into a key and a value + """ + directoryInformation = {"type": "directory_information"} + + directoryPossibleKeys = [ + "Name", + "Login", + "Computer", + "Location", + "Email", + "Phone", + "Office", + "UNIX Dir", + "Zero Dir", + "User ECNDB", + "Host ECNDB", + "Subject" + ] + # Executies until the directory start line is greater than the directory ending line + while True: + + # Returns the line number at directory start line + info = self.__raw_tem[directoryStartLine] + + # Breaks the loop if it encountrs a newline, signifying the end of the directory information + if info == "\n": + + break + + else: + + # Removes white including space, newlines, and tabs from the directory info line + strippedInfo = info.strip() + + # Attempts to find ": " but will accept ":", denoting a blank entry for a directory item + if ": " in strippedInfo: + + # Seperates the directory info line into two variables, the first variable being the key, the second being the value + # swt1 + key, value = strippedInfo.split(": ", 1) + + if key in directoryPossibleKeys: + # Adds the key value pair to the directory info dictionary + directoryInformation[key] = value + else: + # Casts the list type on to a dictionary + dictionaryList = list(directoryInformation) + # Length of dictionary list + lenDictionaryList = len(dictionaryList) + # The last key appended to the directory dictionary + lastKeyAppended = dictionaryList[lenDictionaryList - 1] + + directoryInformation[lastKeyAppended] = directoryInformation[lastKeyAppended] + \ + " " + strippedInfo + + elif ":" in strippedInfo: + + # Seperates the directory info line into two variables, the first variable being the key, the second being the value + key, value = strippedInfo.split(":", 1) + + if key in directoryPossibleKeys: + # Adds the key value pair to the directory info dictionary + directoryInformation[key] = value + else: + # Casts the list type on to a dictionary + dictionaryList = list(directoryInformation) + # Length of dictionary list + lenDictionaryList = len(dictionaryList) + # The last key appended to the directory dictionary + lastKeyAppended = dictionaryList[lenDictionaryList - 1] + + directoryInformation[lastKeyAppended] = directoryInformation[lastKeyAppended] + \ + " " + strippedInfo + + # Signifies that this line belongs to the most previous line + elif ": " not in strippedInfo and ":" not in strippedInfo: + # Casts the list type on to a dictionary + dictionaryList = list(directoryInformation) + # Length of dictionary list + lenDictionaryList = len(dictionaryList) + # The last key appended to the directory dictionary + lastKeyAppended = dictionaryList[lenDictionaryList - 1] + + directoryInformation[lastKeyAppended] = directoryInformation[lastKeyAppended] + \ + " " + strippedInfo + # Counter to denote the end of the directory + directoryStartLine = directoryStartLine + 1 + + # Returns the directory information dictionary + return directoryInformation + + def __assignmentParsing(self, contentStart: int) -> list: + """Returns a list with assignment information dictionaries + + Example: + Assigned-To: campb303 + Assigned-To-Updated-Time: Tue, 23 Jun 2020 13:27:00 EDT + Assigned-To-Updated-By: campb303 + + Args: + contentStart (int): line number where the content starts + + Returns: + list: [ + {"type": "assignment", + "datetime": datetime of the assignment, + "by": user who initiated the assignment, + "to": user who was assigned + }, + ] + """ + assignmentList = [] + + # Assignment Information + assignedBy = "" + assignedDateTime = "" + assignedTo = "" + + # Parses the header looking for assignment delimeters and stores info into their respective variables + for headerContent in range(0, contentStart): + + line = self.__raw_tem[headerContent] + + # Gets who the Item was assigned to + if line.startswith("Assigned-To: "): + + assignedTo = ( + re.search("(?<=Assigned-To: )(.*)", line)).group() + + # Gets the date the Item was assigned + elif line.startswith("Assigned-To-Updated-Time: "): + + dateFromLine = ( + re.search("(?<=Assigned-To-Updated-Time: )(.*)", line)).group() + + assignedDateTime = self.__get_formatted_date(dateFromLine) + + # Gets who assigned the Item + elif line.startswith("Assigned-To-Updated-By: "): + + assignedBy = ( + re.search("(?<=Assigned-To-Updated-By: )(.*)", line)).group() + + # Appends the assignment to the sections list + assignmentList.append( + {"type": "assignment", + "datetime": assignedDateTime, + "by": assignedBy, + "to": assignedTo} + ) + + return assignmentList + + def __initialMessageParsing(self, content: list) -> dict: + """Returns a dictionary with initial message information + + Example: + \n + Testtest\n + \n + + Args: + content (list): content of the initial message + + Returns: + dict: + "type": "initial_message", + "datetime": datetime the initial message was sent, + "from_name": from_name, + "from_email": user_email, + "to": [{email, name}], + "cc": [{email, name}], + "subject": initial message subject + "content": content of the initial message + """ + initialMessageDictionary = {} + + initialMessageDictionary["type"] = "initial_message" + + # Gets the initial message date from the header + rawMessageDateStr = self.__get_most_recent_header_by_type("Date") + + # Sets datetime in the intialMessage dictionary to UTC formatted date + initialMessageDictionary["datetime"] = self.__get_formatted_date( + rawMessageDateStr) + + initialMessageDictionary["from_name"] = self.__parse_from_data( + data="user_name") + + initialMessageDictionary["from_email"] = self.__parse_from_data( + data="user_email") + + # Stores list of dictionaries for the recipients of the initial message + initialMessageDictionary["to"] = [] + + # Parses the header looking for recipients of the initial message and stores it in a list of tuples + rawMessageRecipientsList = email.utils.getaddresses( + [self.__get_most_recent_header_by_type("To")]) + + # Parses the CC list and stores the cc recipient information in a list of dictionaries + for recipients in rawMessageRecipientsList: + + initialMessageDictionary["to"].append( + {"name": recipients[0], + "email": recipients[1]} + ) + + # Stores list of dictionaries for CC information + initialMessageDictionary["cc"] = [] + + # Parses the header looking for CC recipients of the initial message and stores it in a list of tuples + rawMessageCCList = email.utils.getaddresses( + [self.__get_most_recent_header_by_type("CC")]) + + # Parses the CC list and stores the cc recipient information in a list of dictionaries + for ccRecipients in rawMessageCCList: + + initialMessageDictionary["cc"].append( + {"name": ccRecipients[0], + "email": ccRecipients[1]} + ) + + initialMessageDictionary["subject"] = self.__get_most_recent_header_by_type( + "Subject") + + # Removes unecessary newlines from the begining and the end of the initial message + initialMessageDictionary["content"] = self.__getFormattedSectionContent( + content) + + return initialMessageDictionary + + def __editParsing(self, content: list, lineNum: int) -> dict: + """Returns a dictionary with edit information + + Example: + *** Edited by: campb303 at: 06/23/20 13:27:56 ***\n + \n + This be an edit my boy\n + \n + \n + \n + + Args: + content (list): content of an edit + lineNum (int): line number of an edit within an item + + Returns: + dict: a dictionary with these keys, + "type": "edi", + "by": initiator of the edit, + "datetime": datetime of the edit, + "content": content of the edit + """ + + # Edit Info dictionary + editInfo = {} + + for count, line in enumerate(content): + if line == "===============================================\n": + errorMessage = "Reply-from-user ending delimter encountered without Reply-from-user starting delimter" + return self.__errorParsing(line, lineNum + count + 1, errorMessage) + + editInfo["type"] = "edit" + + delimiterLine = content[0] + # Parses for the author of the edit, which is located between the "*** Edited by: " and " at:" substrings + try: + editInfo["by"] = ( + re.search("(?<=\*{3} Edited by: )(.*)(?= at:)", delimiterLine)).group() + except: + errorMessage = "*** Edited by: [username] at: [date and time] ***\n" + return self.__errorParsing(delimiterLine, lineNum, errorMessage) + + try: + # Parses for the date and time of the edit, which is located between the " at: " and "***\n" substrings + dateTimeString = ( + re.search("(?<= at: )(.*)(?= \*\*\*\n)", delimiterLine)).group() + except: + # Returns an error message if there is no space after "at:" + errorMessage = "*** Edited by: [username] at: [date and time] ***\n" + return self.__errorParsing(delimiterLine, lineNum, errorMessage) + + # Attempts to format the date and time into utc format + editInfo["datetime"] = self.__get_formatted_date(dateTimeString) + + # Remove the delimiter String and unecessary newlines + editInfo["content"] = self.__getFormattedSectionContent(content) + + return editInfo + + def __replyToParsing(self, content: list, lineNum: int) -> dict: + """Returns a dictionary with reply to user information + + Example: + *** Replied by: campb303 at: 06/23/20 13:28:18 ***\n + \n + This be a reply my son\n + \n + Justin\n + ECN\n + \n + + Args: + content (list): content of a reply to user + lineNum (int): line number of a reply to user in an item + + Returns: + dict: a dictionary with these keys, + "type": "reply_to_user", + "by": initiator of the reply to user, + "datetime": datetime of the reply to user, + "content": content of the reply to user + """ + replyInfo = {} + + replyInfo["type"] = "reply_to_user" + + delimiterLine = content[0] + + for count, line in enumerate(content): + if line == "===============================================\n": + errorMessage = "Reply-from-user ending delimter encountered without Reply-from-user starting delimter" + return self.__errorParsing(line, lineNum + count + 1, errorMessage) + + try: + # Parses for the author of the reply, which is located between the "*** Replied by: " and " at:" substrings + replyInfo["by"] = ( + re.search("(?<=\*{3} Replied by: )(.*)(?= at:)", delimiterLine)).group() + except: + errorMessage = "*** Replied by: [username] at: [date and time] ***\n" + return self.__errorParsing(delimiterLine, lineNum, errorMessage) + + # Parses for the date and time of the reply, which is located between the " at: " and "***\n" substrings + try: + dateTimeString = ( + re.search("(?<= at: )(.*)(?= \*\*\*\n)", delimiterLine)).group() + except: + errorMessage = "*** Replied by: [username] at: [date and time] ***\n" + return self.__errorParsing(delimiterLine, lineNum, errorMessage) + + # Formats date to UTC + replyInfo["datetime"] = self.__get_formatted_date(dateTimeString) + + replyInfo["content"] = self.__getFormattedSectionContent(content) + + return replyInfo + + def __statusParsing(self, content: list, lineNum: int) -> dict: + """Returns a dictionary with status information + + Example: + *** Status updated by: campb303 at: 6/23/2020 13:26:55 ***\n + Dont Delete\n + + Args: + content (list): The content of a status update + lineNum (int): The line number of a status update in an item + + Returns: + dict: a dictionary with these keys, + "type": "status", + "by": initiator of the status update, + "datetime": datetime of the status update, + "content": content of the status update + """ + statusInfo = {} + + statusInfo["type"] = "status" + + delimiterLine = content[0] + + for count, line in enumerate(content): + if line == "===============================================\n": + errorMessage = "Reply-from-user ending delimter encountered without Reply-from-user starting delimter" + return self.__errorParsing(line, lineNum + count + 1, errorMessage) + + # Parses for the author of the status change, which is located between the "*** Status updated by: " and " at:" substrings + try: + statusInfo["by"] = ( + re.search("(?<=\*{3} Status updated by: )(.*)(?= at:)", delimiterLine)).group() + except: + errorMessage = "*** Status updated by: [username] at: [date and time] ***\n" + + return self.__errorParsing(delimiterLine, lineNum, errorMessage) + + # Parses for the date and time of the status change, which is located between the " at: " and "***\n" substrings + try: + dateTimeString = re.search( + "(?<= at: )(.*)(?= \*\*\*\n)", delimiterLine).group() + except: + errorMessage = "*** Status updated by: [username] at: [date and time] ***\n" + + return self.__errorParsing(delimiterLine, lineNum, errorMessage) + + # Formats the date to UTC + statusInfo["datetime"] = self.__get_formatted_date(dateTimeString) + + # Remove the delimiter String and unecessary newlines + statusInfo["content"] = self.__getFormattedSectionContent(content) + + return statusInfo + + def __userReplyParsing(self, replyContent: list, lineNumber: int) -> dict: + """Returns a dictionary with user reply information + + Example: + === Additional information supplied by user ===\n + \n + Subject: Re: Beepboop\n + From: Justin Campbell \n + Date: Tue, 23 Jun 2020 13:30:45 -0400\n + X-ECN-Queue-Original-Path: /home/pier/e/queue/Attachments/inbox/2020-06-23/212-original.txt\n + X-ECN-Queue-Original-URL: https://engineering.purdue.edu/webqueue/Attachments/inbox/2020-06-23/212-original.txt\n + \n + Huzzah!\n + \n + ===============================================\n + \n + Args: + replyContent (list): The entire section of a reply-from-user + lineNumber (int): The line number of the begining of a reply-from-user section within and item + + Returns: + dict: a dictionary with these keys, + "type": "reply_from_user", + "from_name": name of the user that sent the reply, + "from_email": email of the user that sent the reply, + "subject": subject of the reply, + "datetime": the datetime of the reply, + "cc": [ + {"name": name of the carbon copied recipient, + "email": email of the carbon copied recipient + }, + ] + "content": content of the reply + "headers": [ + {"type": headerType, + "content": content + }, + ] + """ + replyFromInfo = {} + + replyFromInfo["type"] = "reply_from_user" + + replyFromHeaders = [] + newLineCounter = 0 + endingDelimiterCount = 0 + + # Delimiter information line numbers to remove from reply from user + linesToRemove = [] + + # Parses the section content looking for any line that starts with a metadata, also tracks the line + # number with the enumerate function + for lineNum, line in enumerate(replyContent): + + if endingDelimiterCount == 0 and lineNum == len(replyContent) - 1: + errorMessage = "Did not encounter a reply-from-user ending delimiter" + return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) + + if newLineCounter == 1 and line != "\n": + + try: + # Append header information for each headr line + headerType, content = line.split(": ", 1) + replyFromHeaders.append( + {"type": headerType, + "content": content + } + ) + except: + lenReplyFromHeaders = len(replyFromHeaders) + if lenReplyFromHeaders == 0: + errorMessage = ("Expected reply-from-user header information:\n" + + "=== Additional information supplied by user ===\n" + + "\n" + + "[Header Type]: [Header Value]\n" + + "\n" + ) + return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) + + else: + replyFromHeaders[lenReplyFromHeaders - + 1]["content"] = replyFromHeaders[lenReplyFromHeaders - 1]["content"] + " " + line + + linesToRemove.append(lineNum) + # Checks for a newline and breaks for loop on second occurance of a newline + if line == "\n": + newLineCounter = newLineCounter + 1 + + if newLineCounter == 2 and "datetime" not in replyFromInfo.keys(): + errorMessage = "Expected \"Date: [datetime]\" in the header info" + return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) + + elif line == "===============================================\n": + endingDelimiterCount = endingDelimiterCount + 1 + + elif line.startswith("From: ") and newLineCounter == 1: + # Returns a list of one tuples with a name stored in the first index of the tuple and an email stored in the second index of the tuple + emailList = email.utils.getaddresses([line]) + replyFromInfo["from_name"] = emailList[0][0] + replyFromInfo["from_email"] = emailList[0][1] + + elif line.startswith("Subject: ") and newLineCounter == 1: + # Matches everything after "Subject: " + try: + subjectStr = ( + re.search("(?<=Subject: )(.*)", line)).group() + except: + errorMessage = "Expeted syntax of \"Subject: [subject]\"" + return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) + + # Formatts the date to UTC + replyFromInfo["subject"] = subjectStr + + elif line.startswith("Date: ") and newLineCounter == 1: + # Matches everything after "Date: " + try: + dateStr = (re.search("(?<=Date: )(.*)", line)).group() + except: + errorMessage = "\"Date: [datetime]\"" + return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) + + # Formatts the date to UTC + replyFromInfo["datetime"] = self.__get_formatted_date(dateStr) + + elif line.startswith("Cc: ") and newLineCounter == 1: + + replyFromInfo["cc"] = [] + + # Returns a list of tuples with email information + recipientsList = email.utils.getaddresses([line]) + + # Parses through the cc tuple list + for cc in recipientsList: + # Stores the cc information in a dictionary and appends it to the ccRecipientsList + replyFromInfo["cc"].append( + {"name": cc[0], + "email": cc[1]} + ) + + # Deletes reduntant lines from the message content in reverse order + for lineNum in sorted(linesToRemove, reverse=True): + replyContent.pop(lineNum) + + # Strips any unnecessary newlines or any delimiters frm the message content + replyFromInfo["content"] = self.__getFormattedSectionContent( + replyContent) + + replyFromInfo["headers"] = replyFromHeaders + + return replyFromInfo + + def __getFormattedSectionContent(self, sectionContent: list) -> list: + """Returns a list with message content that is stripped of unnecessary newlines and begining delimiters + + Example: + *** Edited by: mph at: 02/21/20 10:27:16 ***\n + \n + Still need to rename machines - but the networking issue now seems to \n + be resolved via another ticket.\n + \n + \n + \n + \n + \n + + Args: + sectionContent (list): The section content of a parsed section + + Returns: + list: the section content of a parsed section without any delimiters and unnecessary newlines + """ + # Continually removes the first line of sectionContent if it is a newline or delimiter in each iteration + while len(sectionContent) > 1: + if (sectionContent[0] == "\n" or + sectionContent[0].startswith("*** Edited by: ") or + sectionContent[0].startswith("*** Replied by: ") or + sectionContent[0].startswith("*** Status updated by: ") or + sectionContent[0] == "=== Additional information supplied by user ===\n" or + sectionContent[0] == "===============================================\n" + ): + sectionContent.pop(0) + else: + # Breaks the loop if the first line isn't a newline or delimiter + break + + # Continually removes the last line of sectionContent if it is a newline or delimiter in each iteration + while len(sectionContent) > 1: + # Initializes the Length of sectionContent each iteration of the loop + sectionContentLength = len(sectionContent) + + if (sectionContent[sectionContentLength - 1] == "\n" or + sectionContent[sectionContentLength - + 1] == "===============================================\n" + ): + sectionContent.pop(sectionContentLength - 1) + else: + # Breaks the loop if the last line isn't a newline or delimiter + break + + return sectionContent + + def __errorParsing(self, line: str, lineNum: int, expectedSyntax: str) -> dict: + """Returns a dictionary with error parse information when a line is malformed + + Example: + "*** Status updated by: ewhile at: 5/7/2020 10:59:11 *** sharing between\n" + + Args: + line (str): line of that threw error + lineNum (int): line number in the item that threw error + expectedSyntax (str): a message stating the syntax the line should follow + + Returns: + dict: a dictionary with these keys, + "type": "parse_error", + "datetime": time the error was encountered, + "file_path": path of the item with erroneos line, + "expected": expectedSyntax, + "got": line, + "line_num": lineNum + """ + errorDictionary = {} + + # Type + errorDictionary["type"] = "parse_error" + + # Dateime of the parse error + errorDictionary["datetime"] = self.__get_formatted_date( + str(datetime.datetime.now())) + + # Item filepath + errorDictionary["file_path"] = str(self.path) + + # Expected value + errorDictionary["expected"] = expectedSyntax + + # line that threw error + errorDictionary["got"] = line + + # line number that threw error + errorDictionary["line_num"] = lineNum + + # returns the error dictionary + return errorDictionary + + def __getSortedSections(self, sectionsList: list) -> list: + """Sorts the sections chronologically by datetime + + Example: + [example] need to do + + Args: + sections (list): the list of sections to be sorted + + Returns: + list: a list of sections sorted by datetime + """ + sectionsLength = len(sectionsList) + sortedSections = [] + oldestSection = {} + + while len(sortedSections) < sectionsLength: + + for iteration, currentSection in enumerate(sectionsList): + + if currentSection["type"] == "directory_information": + sortedSections.append(currentSection) + sectionsList.remove(currentSection) + break + + if iteration == 0: + oldestSection = currentSection + + #datetime.datetime.strptime(date_time_str, '%Y-%m-%d %H:%M:%S.%f') + + elif parse(currentSection["datetime"]) < parse(oldestSection["datetime"]): + oldestSection = currentSection + + if iteration == len(sectionsList) - 1: + sortedSections.append(oldestSection) + sectionsList.remove(oldestSection) + + return sortedSections + + def __check_is_locked(self) -> Union[str, bool]: + """Returns a string info about the lock if true and a bool False if false + + Example: A file is locked + "CE 100 is locked by campb303 using qvi" + + Example: a file is not locked + False + + Returns: + Union[str, bool]: String with info about lock if true, bool False if false + """ + lock_file = str(self.path) + ".lck" + if os.path.exists(lock_file): + with open(lock_file) as file: + lock_info = file.readline().split(" ") + locked_by = lock_info[4] + locked_using = lock_info[1] + return f"{self.queue} {self.number} is locked by {locked_by} using {locked_using}" + else: + return False + + def __get_most_recent_header_by_type(self, header_type: str) -> str: + """Return the data of most recent header of the given type. + If no header of that type exists, return an empty string. + + Example: Requesting a Status header that does exist + __get_most_recent_header_by_type("Status") + becomes "Waiting for Reply" + + Example: Requesting a Status header that doesn't exist + __get_most_recent_header_by_type("Status") + becomes "" + + Args: + header_type (str): Type of header to return. + + Returns: + str: data of most recent header of the given type or empty string. + """ + for header in self.headers: + if header["type"] == header_type: + return header["content"] + return "" + + def __parse_from_data(self, data: str) -> str: + """Parse From header and return requested data. + Returns empty string if requested data is unavailable. + + Examples: From data is "From: Campbell, Justin " + __parse_from_data(data="userName") returns "Campbell, Justin" + __parse_from_data(data="userEmail") returns "campb303@purdue.edu" + + Args: + data (str): The data desired; can be "user_name" or "user_email". + + Returns: + str: userName, userEmail or empty string. + """ + from_header = self.__get_most_recent_header_by_type("From") + user_name, user_email = email.utils.parseaddr(from_header) + + if data == "user_name": + return user_name + elif data == "user_email": + return user_email + else: + raise ValueError( + "data='" + str(data) + "' is not a valid option. data must be \"user_name\" or \"user_email\".") + + def __get_user_alias(self) -> str: + """Returns user's Career Account alias if present. + If Career Account alias isn't present, returns empty string. + + Example: Email from campb303@purdue.edu + user_alias = "campb303" + + Example: Email from spam@spammer.net + user_alias = "" + + Returns: + str: User's Career Account alias if present or empty string + """ + try: + email_user, email_domain = self.user_email.split("@") + except ValueError: + for line_num, header in enumerate(self.headers): + if header["type"] == "From": + header_string = header["type"] + ": " + header["content"] + return self.__errorParsing(header_string, line_num + 1, "Expected valid email Address") + + return email_user if email_domain.endswith("purdue.edu") else "" + + def __get_formatted_date(self, date: str) -> str: + """Returns the date/time formatted as RFC 8601 YYYY-MM-DDTHH:MM:SS+00:00. + Returns empty string if the string argument passed to the function is not a datetime. + See: https://en.wikipedia.org/wiki/ISO_8601 + + Returns: + str: Properly formatted date/time recieved or empty string. + """ + try: + # This date is never meant to be used. The default attribute is just to set timezone. + parsed_date = parse(date, default=datetime.datetime( + 1970, 1, 1, tzinfo=tz.gettz('EDT'))) + except: + return "" + + parsed_date_string = parsed_date.strftime("%Y-%m-%dT%H:%M:%S%z") + + return parsed_date_string + + def to_json(self) -> dict: + """Returns a JSON safe representation of the item. + + Returns: + dict: JSON safe representation of the item. + """ + return self.json_data + + def __repr__(self) -> str: + return self.queue + str(self.number) \ No newline at end of file diff --git a/src/webqueue2api/parser/queue.py b/src/webqueue2api/parser/queue.py new file mode 100644 index 0000000..36c22eb --- /dev/null +++ b/src/webqueue2api/parser/queue.py @@ -0,0 +1,192 @@ +import os, re +from pathlib import Path +from .item import Item +from .config import config +from .errors import QueueDoesNotExistError + + + +#------------------------------------------------------------------------------# +# Classes +#------------------------------------------------------------------------------# +class Queue: + """A collection of items. + + Example: + # Create a queue (ce) + >>> queue = Queue("ce") + + Attributes: + name (str): The name of the queue. + path: (pathlib.Path): The path of the queue directory. + items (list): A list of Items in the queue. + json_data (dict): A JSON serializable representation of the Queue. + + Raises: + QueueDoesNotExistError: If a queue's directory does not exist on the filesystem. + """ + + def __init__(self, name: str, headers_only: bool = True) -> None: + self.name = name + self.path = Path(config.queue_directory, self.name) + if not self.path.exists(): + raise QueueDoesNotExistError(str(self.path)) + + self.items = self.__get_items(headers_only=headers_only) + self.json_data = self.__generate_json_data() + + def __generate_json_data(self) -> dict: + """Generates a JSON serializable data dictionary of class attributes. + + Example: + __generate_json_data() returns: + ... + + Returns: + dict: JSON serializable data dictionary of class attributes. + """ + json_data = {} + + # List of attributes that need processing before being added to json_data + attributes_to_process = ["path", "items"] + for attribute in attributes_to_process: + if attribute == "path": + json_data[attribute] = str(self.path) + if attribute == "items": + json_data[attribute] = [item.to_json() for item in self.items] + + # List of attributes to be ignored + attributes_to_ignore = ["to_json"] + attributes_to_process + for attribute in self.__dir__(): + if not attribute.startswith("_") and attribute not in attributes_to_ignore: + json_data[attribute] = self.__getattribute__(attribute) + + return json_data + + def __get_items(self, headers_only: bool) -> list: + """Returns a list of items for this Queue + + Args: + headers_only (bool): If True, loads Item headers. + + Returns: + list: a list of items for this Queue + """ + items = [] + + for item in os.listdir(self.path): + item_path = Path(self.path, item) + + is_file = True if os.path.isfile(item_path) else False + + if is_file and is_valid_item_name(item): + items.append(Item(self.name, item, headers_only)) + + return items + + def to_json(self) -> dict: + """Return JSON safe representation of the Queue + + The JSON representation of every item in the Queue is added to the + Queue's JSON data then the Queue's JSON data is returned. + + Returns: + dict: JSON safe representation of the Queue + """ + items = [] + for item in self.items: + items.append(item.to_json()) + self.json_data["items"] = items + + return self.json_data + + def __len__(self) -> int: + return len(self.items) + + def __repr__(self) -> str: + return f'{self.name}_queue' + + + +#------------------------------------------------------------------------------# +# Utilities +#------------------------------------------------------------------------------# +def is_valid_item_name(name: str) -> bool: + """Returns true if file name is a valid item name + + Example: + is_valid_item_name("21") -> true + is_valid_item_name("twentyone") -> false + + Args: + name (str): The name to test. + headers_only (bool, optional): Whether or not to parse headers only. Defaults to True. + + Returns: + bool: Name is valid item name. + """ + item_pattern = re.compile("^[0123456789]{1,3}$") + return True if item_pattern.match(name) else False + +def get_valid_queues() -> list: + """Returns a list of queues on the filesystem excluding ignored queues. + + Example: + ["bidc", "me", "ce"] + + Returns: + list: Valid queues + """ + queues = [] + + for file in os.listdir(config.queue_directory): + current_file = config.queue_directory + "/" + file + is_directory = os.path.isdir(current_file) + is_valid = file not in config.queues_to_ignore + + if is_directory and is_valid: + queues.append(file) + + return queues + +def get_queue_counts() -> list: + """Returns a list of dictionaries with the number of items in each queue. + + Example: + [ + { + name: "me", + number_of_items: 42 + }, + { + name: "bidc", + number_of_items: 3 + } + ] + + Returns: + list: Dictionaries with the number of items in each queue. + """ + queue_info = [] + for queue in get_valid_queues(): + possible_items = os.listdir(config.queue_directory + "/" + queue) + valid_items = [is_valid_item_name for file in possible_items] + queue_info.append( {"name": queue, "number_of_items": len(valid_items)} ) + + # Sorts list of queue info alphabetically + sorted_queue_info = sorted(queue_info, key = lambda queue_info_list: queue_info_list['name']) + + return sorted_queue_info + +def load_queues() -> list: + """Return a list of Queues for each queue. + + Returns: + list: list of Queues for each queue. + """ + queues = [] + + for queue in get_valid_queues(): + queues.append(Queue(queue)) + + return queues \ No newline at end of file