diff --git a/setup.py b/setup.py index 5dbb036..f963642 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,8 @@ def get_all_dependencies(): # Custom version of python-ldap without SASL requirements "python-ldap @ git+https://github.itap.purdue.edu/ECN/python-ldap/@python-ldap-3.3.1", "easyad", - "dataclasses" + "dataclasses", + "pyparsing" ], extras_require={ "dev": conditional_dependencies["dev"], diff --git a/src/webqueue2api/parser/errors.py b/src/webqueue2api/parser/errors.py index ef1ac48..9cdb754 100644 --- a/src/webqueue2api/parser/errors.py +++ b/src/webqueue2api/parser/errors.py @@ -6,4 +6,9 @@ def __init__(self, path: str): class QueueDoesNotExistError(Exception): def __init__(self, path: str): self.message = f"Directory {path} not found." + super().__init__(self.message) + +class ParseError(Exception): + def __init__(self, line_number: int, message: str = "Unable to parse item."): + self.message = f"{message} at line {line_number}" super().__init__(self.message) \ No newline at end of file diff --git a/src/webqueue2api/parser/item.py b/src/webqueue2api/parser/item.py index 928ddc4..4ff5d57 100644 --- a/src/webqueue2api/parser/item.py +++ b/src/webqueue2api/parser/item.py @@ -1,14 +1,16 @@ import os import time import email +from email.policy import Policy import re -import datetime from dateutil.parser import parse from dateutil import tz from typing import Union from pathlib import Path from .config import config from .errors import ItemDoesNotExistError +from .utils import format_date_string +from .parser import parse_item @@ -61,11 +63,10 @@ def __init__(self, queue: str, number: int, headers_only: bool = False) -> None: self.path = Path(config.queue_directory, self.queue, str(self.number)) if not self.path.exists(): raise ItemDoesNotExistError(str(self.path)) - + self.last_updated = self.__get_time_last_updated() - self.__raw_tem = self.__get_raw_item() + self.__raw_item = self.__get_raw_item() self.headers = self.__parse_headers() - if not headers_only: self.content = self.__parseSections() self.is_locked = self.__check_is_locked() self.user_email = self.__parse_from_data(data="user_email") self.user_name = self.__parse_from_data(data="user_name") @@ -76,7 +77,8 @@ def __init__(self, queue: str, number: int, headers_only: bool = False) -> None: self.priority = self.__get_most_recent_header_by_type("Priority") self.department = self.__get_most_recent_header_by_type("Department") self.building = self.__get_most_recent_header_by_type("Building") - self.date_received = self.__get_formatted_date(self.__get_most_recent_header_by_type("Date")) + self.date_received = format_date_string(self.__get_most_recent_header_by_type("Date")) + if not headers_only: self.content = self.__parse_sections() self.json_data = self.__generate_json_data() def __generate_json_data(self) -> dict: @@ -134,7 +136,7 @@ def __get_time_last_updated(self) -> str: """ unix_time = os.path.getmtime(self.path) formatted_time = time.strftime('%m-%d-%y %I:%M %p', time.localtime(unix_time)) - return self.__get_formatted_date(formatted_time) + return format_date_string(formatted_time) def __get_raw_item(self) -> list: """Returns a list of all lines in the item file @@ -156,7 +158,7 @@ def __get_header_boundary(self) -> int: Returns: int: line number where the Item headers end """ - for line_number, line in enumerate(self.__raw_tem): + for line_number, line in enumerate(self.__raw_item): if line == "\n": return line_number @@ -183,7 +185,7 @@ def __parse_headers(self) -> list: # QTime-Updated-By: campb303 queue_prefix_pattern = re.compile(r"\[.*?\] {1}") for line_number in range(self.__get_header_boundary()): - line = self.__raw_tem[line_number] + line = self.__raw_item[line_number] line_has_queue_prefix = queue_prefix_pattern.match(line) if line_has_queue_prefix: @@ -210,850 +212,79 @@ def __parse_headers(self) -> list: ] for key in message.keys(): - headers.append({"type": key, "content": self.__get_formatted_date(message[key]) if key in date_headers else message[key]}) + headers.append({"type": key, "content": format_date_string(message[key]) if key in date_headers else message[key]}) return headers - def __parseSections(self) -> list: - # List of all item events - sections = [] - - contentStart = self.__get_header_boundary() + 1 - contentEnd = len(self.__raw_tem) - 1 - - # List of assignments for the item - assignementLsit = self.__assignmentParsing(contentStart) - - # Appends each assignment individually to sections - for assignment in assignementLsit: - sections.append(assignment) - - # Checks for empty content within an item and returns and - if contentEnd <= contentStart: - blankInitialMessage = self.__initialMessageParsing([""]) - sections.append(blankInitialMessage) - return sections - - # Checks for Directory Identifiers - if self.__raw_tem[contentStart] == "\n" and self.__raw_tem[contentStart + 1].startswith("\t"): - - directoryStartLine = contentStart + 1 - - # Parses the directory information and returns a dictionary of directory values - directoryInfo = self.__directoryParsing(directoryStartLine) - - # Appends Directory Information into the sections array - sections.append(directoryInfo) - - # Sets the initial message start to the next line after all directory lines and newlines - contentStart = contentStart + len(directoryInfo) + 1 - - # The start line, type, and end line for item events - sectionBoundaries = [] - - # Delimiter info - delimiters = [ - {"name": "edit", "pattern": "*** Edited"}, - {"name": "status", "pattern": "*** Status"}, - {"name": "replyToUser", "pattern": "*** Replied"}, - {"name": "replyFromUser", "pattern": "=== "}, - ] - - # Signifies that there is an initial message to parse - initialMessageSection = True - - # Parses the entire contents of the message, stores everything before any delimiter as the initial message - # and the line number of any delimiters as well as the type - for lineNumber in range(contentStart, contentEnd + 1): - - line = self.__raw_tem[lineNumber] - - # Looks for a starting delimiter and explicity excludes the reply-from-user ending delimiter - if (line.startswith("*** Edited by: ") or - line.startswith("*** Replied by: ") or - line.startswith("*** Status updated by: ") or - line == "=== Additional information supplied by user ===\n" and not - line == "===============================================\n" - ): - - # Sets the delimiter type based on the pattern within the delimiters list - for delimiter in delimiters: - - if line.startswith(delimiter["pattern"]): - sectionBoundaries.append( - {"start": lineNumber, "type": delimiter["name"]}) - break - - # If a starting delimiter was encountered, then there is no initial message - if initialMessageSection: - initialMessageSection = False - - elif initialMessageSection == True: - # Delimiter not encountered yet, so append initial message starting line as the current lin number - sectionBoundaries.append( - {"start": lineNumber, "type": "initial_message"}) - initialMessageSection = False - - # Used to set the end line of the last delimiter - sectionBoundaries.append({"start": contentEnd + 1}) - - # Sets the end of the section boundary to the begining of the next section boundary - for boundaryIndex in range(0, len(sectionBoundaries) - 1): - - sectionBoundaries[boundaryIndex]["end"] = sectionBoundaries[boundaryIndex + 1]["start"] - - # Remove End of File boundary since the line number has been assigned to the last delimiter - del sectionBoundaries[-1] - - # Parses through all the boundaries in section boundaries - for boundary in sectionBoundaries: - - # Sets line to the first line of the boundary (which is always the delimiter) - line = self.__raw_tem[boundary["start"]] - - # Returns all of the lines within the current section - sectionContent = self.__raw_tem[boundary["start"]: boundary["end"]] - - # Appends an initial message dictionary to sections - if boundary["type"] == "initial_message": - initialMessageDictionary = self.__initialMessageParsing( - sectionContent) - sections.append(initialMessageDictionary) - - elif boundary["type"] == "edit": - # Returns a dictionary with edit information - editInfo = self.__editParsing( - sectionContent, boundary["start"]) - - # Checks for a parse error and appends it, returning the sections list which stops the parsing - if editInfo["type"] == "parse_error": - sections.append(editInfo) - return self.__getSortedSections(sections) - - # Appends the edit dictionary to sections - sections.append(editInfo) - - elif boundary["type"] == "replyToUser": - # Returns a dictionary with reply-to information - replyToInfo = self.__replyToParsing( - sectionContent, boundary["start"]) - - # Checks for a parse error and appends it, returning the sections list which stops the parsing - if replyToInfo["type"] == "parse_error": - sections.append(replyToInfo) - return self.__getSortedSections(sections) - - # Appends the reply-to to sections - sections.append(replyToInfo) - - elif boundary["type"] == "status": - # Returns a dictionary with status information - statusInfo = self.__statusParsing( - sectionContent, boundary["start"]) - - if statusInfo["type"] == "parse_error": - sections.append(statusInfo) - return self.__getSortedSections(sections) - - # Appends the status to sections - sections.append(statusInfo) - - elif boundary["type"] == "replyFromUser": - # Returns a dictionary with userReply information - replyFromInfo = self.__userReplyParsing( - sectionContent, boundary["start"]) - - if replyFromInfo["type"] == "parse_error": - sections.append(replyFromInfo) - return self.__getSortedSections(sections) - - # Appends the replyFrom to sections - sections.append(replyFromInfo) - - sortedSections = self.__getSortedSections(sections) - - return sortedSections - # return sections - - def __directoryParsing(self, directoryStartLine: int) -> dict: - """Returns a dictionary with directory information + def __parse_sections(self) -> list: + """Generates a list of dictionaries which represent all the secctions in an item Example: - Name: Nestor Fabian Rodriguez Buitrago - Login: rodri563 - Computer: ce-205-38 (128.46.205.67) - Location: HAMP G230 - Email: rodri563@purdue.edu - Phone: 7654766893 - Office: HAMP G230 - UNIX Dir: /home/bridge/b/rodri563 - Zero Dir: U=\\bridge.ecn.purdue.edu\rodri563 - User ECNDB: http://eng.purdue.edu/jump/2e8399a - Host ECNDB: http://eng.purdue.edu/jump/2e83999 - Subject: Autocad installation - - Args: - directoryStartLine (int): line number within the item that the directory starts on + [example] Returns: - dict: dictionary that splits each line within the directory into a key and a value - """ - directoryInformation = {"type": "directory_information"} - - directoryPossibleKeys = [ - "Name", - "Login", - "Computer", - "Location", - "Email", - "Phone", - "Office", - "UNIX Dir", - "Zero Dir", - "User ECNDB", - "Host ECNDB", - "Subject" - ] - # Executies until the directory start line is greater than the directory ending line - while True: - - # Returns the line number at directory start line - info = self.__raw_tem[directoryStartLine] - - # Breaks the loop if it encountrs a newline, signifying the end of the directory information - if info == "\n": - + list: list of dictionaries, which all containt a type key to deliminate what type of section the dictionary represents + """ + # Convert list of lines to single string + raw_item_as_string = "".join(self.__raw_item) + + # Temporarily make the email package raise an error upon encountering a defect in the headers + Policy.raise_on_defect = True + + # Parse body + body_sections = parse_item(raw_item_as_string) + + # Return the email oackage to its normal parsing state + Policy.raise_on_defect = False + + # Add initial message headers to intial message section + for index, section in enumerate(body_sections): + if section["type"] == "initial_message": + body_sections[index] = self.__add_initial_message_headers(section) break + + # Add assignment sections to all the other sections + body_sections.extend(self.__get_assignments()) - else: - - # Removes white including space, newlines, and tabs from the directory info line - strippedInfo = info.strip() - - # Attempts to find ": " but will accept ":", denoting a blank entry for a directory item - if ": " in strippedInfo: - - # Seperates the directory info line into two variables, the first variable being the key, the second being the value - # swt1 - key, value = strippedInfo.split(": ", 1) - - if key in directoryPossibleKeys: - # Adds the key value pair to the directory info dictionary - directoryInformation[key] = value - else: - # Casts the list type on to a dictionary - dictionaryList = list(directoryInformation) - # Length of dictionary list - lenDictionaryList = len(dictionaryList) - # The last key appended to the directory dictionary - lastKeyAppended = dictionaryList[lenDictionaryList - 1] - - directoryInformation[lastKeyAppended] = directoryInformation[lastKeyAppended] + \ - " " + strippedInfo - - elif ":" in strippedInfo: - - # Seperates the directory info line into two variables, the first variable being the key, the second being the value - key, value = strippedInfo.split(":", 1) - - if key in directoryPossibleKeys: - # Adds the key value pair to the directory info dictionary - directoryInformation[key] = value - else: - # Casts the list type on to a dictionary - dictionaryList = list(directoryInformation) - # Length of dictionary list - lenDictionaryList = len(dictionaryList) - # The last key appended to the directory dictionary - lastKeyAppended = dictionaryList[lenDictionaryList - 1] - - directoryInformation[lastKeyAppended] = directoryInformation[lastKeyAppended] + \ - " " + strippedInfo - - # Signifies that this line belongs to the most previous line - elif ": " not in strippedInfo and ":" not in strippedInfo: - # Casts the list type on to a dictionary - dictionaryList = list(directoryInformation) - # Length of dictionary list - lenDictionaryList = len(dictionaryList) - # The last key appended to the directory dictionary - lastKeyAppended = dictionaryList[lenDictionaryList - 1] - - directoryInformation[lastKeyAppended] = directoryInformation[lastKeyAppended] + \ - " " + strippedInfo - # Counter to denote the end of the directory - directoryStartLine = directoryStartLine + 1 - - # Returns the directory information dictionary - return directoryInformation - - def __assignmentParsing(self, contentStart: int) -> list: - """Returns a list with assignment information dictionaries + body_sections = self.__get_sorted_sections(body_sections) - Example: - Assigned-To: campb303 - Assigned-To-Updated-Time: Tue, 23 Jun 2020 13:27:00 EDT - Assigned-To-Updated-By: campb303 + return body_sections - Args: - contentStart (int): line number where the content starts - - Returns: - list: [ - {"type": "assignment", - "datetime": datetime of the assignment, - "by": user who initiated the assignment, - "to": user who was assigned - }, - ] - """ - assignmentList = [] - - # Assignment Information - assignedBy = "" - assignedDateTime = "" - assignedTo = "" - - # Parses the header looking for assignment delimeters and stores info into their respective variables - for headerContent in range(0, contentStart): - - line = self.__raw_tem[headerContent] - - # Gets who the Item was assigned to - if line.startswith("Assigned-To: "): - - assignedTo = ( - re.search("(?<=Assigned-To: )(.*)", line)).group() - - # Gets the date the Item was assigned - elif line.startswith("Assigned-To-Updated-Time: "): - - dateFromLine = ( - re.search("(?<=Assigned-To-Updated-Time: )(.*)", line)).group() - - assignedDateTime = self.__get_formatted_date(dateFromLine) - - # Gets who assigned the Item - elif line.startswith("Assigned-To-Updated-By: "): - - assignedBy = ( - re.search("(?<=Assigned-To-Updated-By: )(.*)", line)).group() - - # Appends the assignment to the sections list - assignmentList.append( - {"type": "assignment", - "datetime": assignedDateTime, - "by": assignedBy, - "to": assignedTo} - ) - - return assignmentList - - def __initialMessageParsing(self, content: list) -> dict: - """Returns a dictionary with initial message information - - Example: - \n - Testtest\n - \n + def __add_initial_message_headers(self, initial_message: dict) -> dict: + """Adds header information to the intial message. Args: - content (list): content of the initial message + initial_message (dict): The intial message dictionary without headers. Returns: - dict: - "type": "initial_message", - "datetime": datetime the initial message was sent, - "from_name": from_name, - "from_email": user_email, - "to": [{email, name}], - "cc": [{email, name}], - "subject": initial message subject - "content": content of the initial message + dict: Initial message dictionary with headers. """ - initialMessageDictionary = {} - - initialMessageDictionary["type"] = "initial_message" - - # Gets the initial message date from the header - rawMessageDateStr = self.__get_most_recent_header_by_type("Date") - - # Sets datetime in the intialMessage dictionary to UTC formatted date - initialMessageDictionary["datetime"] = self.__get_formatted_date( - rawMessageDateStr) - - initialMessageDictionary["from_name"] = self.__parse_from_data( - data="user_name") - - initialMessageDictionary["from_email"] = self.__parse_from_data( - data="user_email") - - # Stores list of dictionaries for the recipients of the initial message - initialMessageDictionary["to"] = [] - - # Parses the header looking for recipients of the initial message and stores it in a list of tuples - rawMessageRecipientsList = email.utils.getaddresses( - [self.__get_most_recent_header_by_type("To")]) - - # Parses the CC list and stores the cc recipient information in a list of dictionaries - for recipients in rawMessageRecipientsList: - - initialMessageDictionary["to"].append( - {"name": recipients[0], - "email": recipients[1]} - ) - - # Stores list of dictionaries for CC information - initialMessageDictionary["cc"] = [] - - # Parses the header looking for CC recipients of the initial message and stores it in a list of tuples - rawMessageCCList = email.utils.getaddresses( - [self.__get_most_recent_header_by_type("CC")]) - - # Parses the CC list and stores the cc recipient information in a list of dictionaries - for ccRecipients in rawMessageCCList: - - initialMessageDictionary["cc"].append( - {"name": ccRecipients[0], - "email": ccRecipients[1]} - ) - - initialMessageDictionary["subject"] = self.__get_most_recent_header_by_type( - "Subject") - - # Removes unecessary newlines from the begining and the end of the initial message - initialMessageDictionary["content"] = self.__getFormattedSectionContent( - content) - - return initialMessageDictionary - - def __editParsing(self, content: list, lineNum: int) -> dict: - """Returns a dictionary with edit information - - Example: - *** Edited by: campb303 at: 06/23/20 13:27:56 ***\n - \n - This be an edit my boy\n - \n - \n - \n - - Args: - content (list): content of an edit - lineNum (int): line number of an edit within an item - - Returns: - dict: a dictionary with these keys, - "type": "edi", - "by": initiator of the edit, - "datetime": datetime of the edit, - "content": content of the edit - """ - - # Edit Info dictionary - editInfo = {} - - for count, line in enumerate(content): - if line == "===============================================\n": - errorMessage = "Reply-from-user ending delimter encountered without Reply-from-user starting delimter" - return self.__errorParsing(line, lineNum + count + 1, errorMessage) - - editInfo["type"] = "edit" - - delimiterLine = content[0] - # Parses for the author of the edit, which is located between the "*** Edited by: " and " at:" substrings - try: - editInfo["by"] = ( - re.search("(?<=\*{3} Edited by: )(.*)(?= at:)", delimiterLine)).group() - except: - errorMessage = "*** Edited by: [username] at: [date and time] ***\n" - return self.__errorParsing(delimiterLine, lineNum, errorMessage) - - try: - # Parses for the date and time of the edit, which is located between the " at: " and "***\n" substrings - dateTimeString = ( - re.search("(?<= at: )(.*)(?= \*\*\*\n)", delimiterLine)).group() - except: - # Returns an error message if there is no space after "at:" - errorMessage = "*** Edited by: [username] at: [date and time] ***\n" - return self.__errorParsing(delimiterLine, lineNum, errorMessage) - - # Attempts to format the date and time into utc format - editInfo["datetime"] = self.__get_formatted_date(dateTimeString) - - # Remove the delimiter String and unecessary newlines - editInfo["content"] = self.__getFormattedSectionContent(content) - - return editInfo - - def __replyToParsing(self, content: list, lineNum: int) -> dict: - """Returns a dictionary with reply to user information - - Example: - *** Replied by: campb303 at: 06/23/20 13:28:18 ***\n - \n - This be a reply my son\n - \n - Justin\n - ECN\n - \n - - Args: - content (list): content of a reply to user - lineNum (int): line number of a reply to user in an item - - Returns: - dict: a dictionary with these keys, - "type": "reply_to_user", - "by": initiator of the reply to user, - "datetime": datetime of the reply to user, - "content": content of the reply to user - """ - replyInfo = {} - - replyInfo["type"] = "reply_to_user" - - delimiterLine = content[0] - - for count, line in enumerate(content): - if line == "===============================================\n": - errorMessage = "Reply-from-user ending delimter encountered without Reply-from-user starting delimter" - return self.__errorParsing(line, lineNum + count + 1, errorMessage) - - try: - # Parses for the author of the reply, which is located between the "*** Replied by: " and " at:" substrings - replyInfo["by"] = ( - re.search("(?<=\*{3} Replied by: )(.*)(?= at:)", delimiterLine)).group() - except: - errorMessage = "*** Replied by: [username] at: [date and time] ***\n" - return self.__errorParsing(delimiterLine, lineNum, errorMessage) - - # Parses for the date and time of the reply, which is located between the " at: " and "***\n" substrings - try: - dateTimeString = ( - re.search("(?<= at: )(.*)(?= \*\*\*\n)", delimiterLine)).group() - except: - errorMessage = "*** Replied by: [username] at: [date and time] ***\n" - return self.__errorParsing(delimiterLine, lineNum, errorMessage) - - # Formats date to UTC - replyInfo["datetime"] = self.__get_formatted_date(dateTimeString) - - replyInfo["content"] = self.__getFormattedSectionContent(content) - - return replyInfo - - def __statusParsing(self, content: list, lineNum: int) -> dict: - """Returns a dictionary with status information - - Example: - *** Status updated by: campb303 at: 6/23/2020 13:26:55 ***\n - Dont Delete\n - - Args: - content (list): The content of a status update - lineNum (int): The line number of a status update in an item - - Returns: - dict: a dictionary with these keys, - "type": "status", - "by": initiator of the status update, - "datetime": datetime of the status update, - "content": content of the status update - """ - statusInfo = {} - - statusInfo["type"] = "status" - - delimiterLine = content[0] - - for count, line in enumerate(content): - if line == "===============================================\n": - errorMessage = "Reply-from-user ending delimter encountered without Reply-from-user starting delimter" - return self.__errorParsing(line, lineNum + count + 1, errorMessage) - - # Parses for the author of the status change, which is located between the "*** Status updated by: " and " at:" substrings - try: - statusInfo["by"] = ( - re.search("(?<=\*{3} Status updated by: )(.*)(?= at:)", delimiterLine)).group() - except: - errorMessage = "*** Status updated by: [username] at: [date and time] ***\n" - - return self.__errorParsing(delimiterLine, lineNum, errorMessage) - - # Parses for the date and time of the status change, which is located between the " at: " and "***\n" substrings - try: - dateTimeString = re.search( - "(?<= at: )(.*)(?= \*\*\*\n)", delimiterLine).group() - except: - errorMessage = "*** Status updated by: [username] at: [date and time] ***\n" - - return self.__errorParsing(delimiterLine, lineNum, errorMessage) - - # Formats the date to UTC - statusInfo["datetime"] = self.__get_formatted_date(dateTimeString) - - # Remove the delimiter String and unecessary newlines - statusInfo["content"] = self.__getFormattedSectionContent(content) - - return statusInfo - - def __userReplyParsing(self, replyContent: list, lineNumber: int) -> dict: - """Returns a dictionary with user reply information - - Example: - === Additional information supplied by user ===\n - \n - Subject: Re: Beepboop\n - From: Justin Campbell \n - Date: Tue, 23 Jun 2020 13:30:45 -0400\n - X-ECN-Queue-Original-Path: /home/pier/e/queue/Attachments/inbox/2020-06-23/212-original.txt\n - X-ECN-Queue-Original-URL: https://engineering.purdue.edu/webqueue/Attachments/inbox/2020-06-23/212-original.txt\n - \n - Huzzah!\n - \n - ===============================================\n - \n - Args: - replyContent (list): The entire section of a reply-from-user - lineNumber (int): The line number of the begining of a reply-from-user section within and item - - Returns: - dict: a dictionary with these keys, - "type": "reply_from_user", - "from_name": name of the user that sent the reply, - "from_email": email of the user that sent the reply, - "subject": subject of the reply, - "datetime": the datetime of the reply, - "cc": [ - {"name": name of the carbon copied recipient, - "email": email of the carbon copied recipient - }, - ] - "content": content of the reply - "headers": [ - {"type": headerType, - "content": content - }, - ] - """ - replyFromInfo = {} - - replyFromInfo["type"] = "reply_from_user" - - replyFromHeaders = [] - newLineCounter = 0 - endingDelimiterCount = 0 - - # Delimiter information line numbers to remove from reply from user - linesToRemove = [] - - # Parses the section content looking for any line that starts with a metadata, also tracks the line - # number with the enumerate function - for lineNum, line in enumerate(replyContent): - - if endingDelimiterCount == 0 and lineNum == len(replyContent) - 1: - errorMessage = "Did not encounter a reply-from-user ending delimiter" - return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) - - if newLineCounter == 1 and line != "\n": - - try: - # Append header information for each headr line - headerType, content = line.split(": ", 1) - replyFromHeaders.append( - {"type": headerType, - "content": content - } - ) - except: - lenReplyFromHeaders = len(replyFromHeaders) - if lenReplyFromHeaders == 0: - errorMessage = ("Expected reply-from-user header information:\n" + - "=== Additional information supplied by user ===\n" + - "\n" + - "[Header Type]: [Header Value]\n" + - "\n" - ) - return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) - - else: - replyFromHeaders[lenReplyFromHeaders - - 1]["content"] = replyFromHeaders[lenReplyFromHeaders - 1]["content"] + " " + line - - linesToRemove.append(lineNum) - # Checks for a newline and breaks for loop on second occurance of a newline - if line == "\n": - newLineCounter = newLineCounter + 1 - - if newLineCounter == 2 and "datetime" not in replyFromInfo.keys(): - errorMessage = "Expected \"Date: [datetime]\" in the header info" - return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) - - elif line == "===============================================\n": - endingDelimiterCount = endingDelimiterCount + 1 - - elif line.startswith("From: ") and newLineCounter == 1: - # Returns a list of one tuples with a name stored in the first index of the tuple and an email stored in the second index of the tuple - emailList = email.utils.getaddresses([line]) - replyFromInfo["from_name"] = emailList[0][0] - replyFromInfo["from_email"] = emailList[0][1] - - elif line.startswith("Subject: ") and newLineCounter == 1: - # Matches everything after "Subject: " - try: - subjectStr = ( - re.search("(?<=Subject: )(.*)", line)).group() - except: - errorMessage = "Expeted syntax of \"Subject: [subject]\"" - return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) - - # Formatts the date to UTC - replyFromInfo["subject"] = subjectStr - - elif line.startswith("Date: ") and newLineCounter == 1: - # Matches everything after "Date: " - try: - dateStr = (re.search("(?<=Date: )(.*)", line)).group() - except: - errorMessage = "\"Date: [datetime]\"" - return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) - - # Formatts the date to UTC - replyFromInfo["datetime"] = self.__get_formatted_date(dateStr) - - elif line.startswith("Cc: ") and newLineCounter == 1: - - replyFromInfo["cc"] = [] - - # Returns a list of tuples with email information - recipientsList = email.utils.getaddresses([line]) - - # Parses through the cc tuple list - for cc in recipientsList: - # Stores the cc information in a dictionary and appends it to the ccRecipientsList - replyFromInfo["cc"].append( - {"name": cc[0], - "email": cc[1]} - ) - - # Deletes reduntant lines from the message content in reverse order - for lineNum in sorted(linesToRemove, reverse=True): - replyContent.pop(lineNum) - - # Strips any unnecessary newlines or any delimiters frm the message content - replyFromInfo["content"] = self.__getFormattedSectionContent( - replyContent) - - replyFromInfo["headers"] = replyFromHeaders - - return replyFromInfo - - def __getFormattedSectionContent(self, sectionContent: list) -> list: - """Returns a list with message content that is stripped of unnecessary newlines and begining delimiters - - Example: - *** Edited by: mph at: 02/21/20 10:27:16 ***\n - \n - Still need to rename machines - but the networking issue now seems to \n - be resolved via another ticket.\n - \n - \n - \n - \n - \n - - Args: - sectionContent (list): The section content of a parsed section - - Returns: - list: the section content of a parsed section without any delimiters and unnecessary newlines - """ - # Continually removes the first line of sectionContent if it is a newline or delimiter in each iteration - while len(sectionContent) > 1: - if (sectionContent[0] == "\n" or - sectionContent[0].startswith("*** Edited by: ") or - sectionContent[0].startswith("*** Replied by: ") or - sectionContent[0].startswith("*** Status updated by: ") or - sectionContent[0] == "=== Additional information supplied by user ===\n" or - sectionContent[0] == "===============================================\n" - ): - sectionContent.pop(0) - else: - # Breaks the loop if the first line isn't a newline or delimiter - break - - # Continually removes the last line of sectionContent if it is a newline or delimiter in each iteration - while len(sectionContent) > 1: - # Initializes the Length of sectionContent each iteration of the loop - sectionContentLength = len(sectionContent) - - if (sectionContent[sectionContentLength - 1] == "\n" or - sectionContent[sectionContentLength - - 1] == "===============================================\n" - ): - sectionContent.pop(sectionContentLength - 1) - else: - # Breaks the loop if the last line isn't a newline or delimiter - break - - return sectionContent - - def __errorParsing(self, line: str, lineNum: int, expectedSyntax: str) -> dict: - """Returns a dictionary with error parse information when a line is malformed - - Example: - "*** Status updated by: ewhile at: 5/7/2020 10:59:11 *** sharing between\n" - - Args: - line (str): line of that threw error - lineNum (int): line number in the item that threw error - expectedSyntax (str): a message stating the syntax the line should follow - - Returns: - dict: a dictionary with these keys, - "type": "parse_error", - "datetime": time the error was encountered, - "file_path": path of the item with erroneos line, - "expected": expectedSyntax, - "got": line, - "line_num": lineNum - """ - errorDictionary = {} - - # Type - errorDictionary["type"] = "parse_error" - - # Dateime of the parse error - errorDictionary["datetime"] = self.__get_formatted_date( - str(datetime.datetime.now())) - - # Item filepath - errorDictionary["file_path"] = str(self.path) - - # Expected value - errorDictionary["expected"] = expectedSyntax - - # line that threw error - errorDictionary["got"] = line - - # line number that threw error - errorDictionary["line_num"] = lineNum - - # returns the error dictionary - return errorDictionary + raw_cc = self.__get_most_recent_header_by_type("CC") + raw_to = self.__get_most_recent_header_by_type("To") + + initial_message["datetime"] = self.date_received + initial_message["from_name"] = self.user_name + initial_message["from_email"] = self.user_email + initial_message["to"] = [ + { "name": user_name, "email": user_email } + for user_name, user_email in email.utils.getaddresses([raw_to]) + ] + initial_message["cc"] = [ + { "name": user_name, "email": user_email } + for user_name, user_email in email.utils.getaddresses([raw_cc]) + ] + initial_message["subject"] = self.subject - def __getSortedSections(self, sectionsList: list) -> list: - """Sorts the sections chronologically by datetime + return initial_message - Example: - [example] need to do + def __get_sorted_sections(self, sectionsList: list) -> list: + """Returns list of sections sorted chronologically. Args: - sections (list): the list of sections to be sorted + sections (list): List of sections to be sorted. Returns: - list: a list of sections sorted by datetime + list: List of sections sorted chronologically. """ sectionsLength = len(sectionsList) sortedSections = [] @@ -1070,9 +301,6 @@ def __getSortedSections(self, sectionsList: list) -> list: if iteration == 0: oldestSection = currentSection - - #datetime.datetime.strptime(date_time_str, '%Y-%m-%d %H:%M:%S.%f') - elif parse(currentSection["datetime"]) < parse(oldestSection["datetime"]): oldestSection = currentSection @@ -1082,6 +310,30 @@ def __getSortedSections(self, sectionsList: list) -> list: return sortedSections + def __get_assignments(self) -> list: + """Returns a list of dictionaries containing assignments + + Example: + + + Returns: + list: list of dictionaries which represent assignment sections + """ + assignment_list = [] + + for index, header in enumerate(self.headers): + if header["type"] == "Assigned-To": + assignment = { + "type": "assignment", + "to": self.headers[index]["content"], + "datetime": self.headers[index + 1]["content"], + "by": self.headers[index + 2]["content"] + + } + assignment_list.append(assignment) + + return assignment_list + def __check_is_locked(self) -> Union[str, bool]: """Returns a string info about the lock if true and a bool False if false @@ -1173,25 +425,6 @@ def __get_user_alias(self) -> str: return email_user if email_domain.endswith("purdue.edu") else "" - def __get_formatted_date(self, date: str) -> str: - """Returns the date/time formatted as RFC 8601 YYYY-MM-DDTHH:MM:SS+00:00. - Returns empty string if the string argument passed to the function is not a datetime. - See: https://en.wikipedia.org/wiki/ISO_8601 - - Returns: - str: Properly formatted date/time recieved or empty string. - """ - try: - # This date is never meant to be used. The default attribute is just to set timezone. - parsed_date = parse(date, default=datetime.datetime( - 1970, 1, 1, tzinfo=tz.gettz('EDT'))) - except: - return "" - - parsed_date_string = parsed_date.strftime("%Y-%m-%dT%H:%M:%S%z") - - return parsed_date_string - def to_json(self) -> dict: """Returns a JSON safe representation of the item. diff --git a/src/webqueue2api/parser/parser.py b/src/webqueue2api/parser/parser.py new file mode 100644 index 0000000..dee58ad --- /dev/null +++ b/src/webqueue2api/parser/parser.py @@ -0,0 +1,331 @@ +import pyparsing as pp +import json +import string +import email +import email.errors +import datetime +from .utils import format_date_string +from .errors import ParseError + + +parsed_item = [] + + + +################################################################################ +# Delimiters +################################################################################ +action_start_delimiter = "*** " +action_end_delimiter = "***" + +edit_start_delimiter = action_start_delimiter + "Edited by: " +status_start_delimiter = action_start_delimiter + "Status updated by: " +reply_to_user_start_delimiter = action_start_delimiter + "Replied by: " +reply_from_user_start_delimiter = "=== Additional information supplied by user ===" +reply_from_user_end_delimiter = "===============================================" + + + +################################################################################ +# Parse Actions: Callbacks for Rules +################################################################################ +def parse_section_by_type(section_type): + """Returns a function to parse a section based on the section type. + + Args: + section_type (str): The type of section to parse. + Can be "reply_to_user" | "edit" | "reply_from_user" | "status" | "directory_information" | "initial_message" + """ + + def parse_section(original_string: str, match_start_index: int, tokens: pp.ParseResults) -> None: + """Parses section and adds to global section list. + + Args: + original_string (string): The original string passed to PyParsing + match_start_index (int): The character index where the match starts. + tokens (pyparsing.ParseResults): The PyParsing results. + + Raises: + ValueError, OverflowError: If a date cannot be formatted. + ParseError: If unexpected formatting is encountered. + """ + tokens_dictionary = tokens.asDict() + tokens_dictionary["type"] = section_type + + # Remove empty keys + for key in tokens_dictionary.keys(): + if key == "": del tokens_dictionary[key] + + # Parse reply-from-user headers + if section_type == "reply_from_user": + try: + headers = email.message_from_string(tokens_dictionary["headers"]) + except email.errors.MissingHeaderBodySeparatorDefect as e: + parse_error = { + "type": "parse_error", + "datetime": format_date_string(str(datetime.datetime.now())), + "expected": "Header information with a key/value pair seperated by a colon or a newline to seperate the header from the content", + } + headers_list = tokens_dictionary["headers"].splitlines(keepends=True) + for line in headers_list: + if ":" not in line and not line.startswith(" "): + parse_error["got"] = line + line_number = original_string[:(match_start_index + original_string[match_start_index:].find(line))].count("\n") + 1 + parse_error["line_num"] = line_number + parsed_item.append(parse_error) + raise ParseError(parse_error["line_num"], f"{parse_error['got']} is a malfomred header or the start of message content without a newline") + + if "Date" not in headers.keys(): + content_start = tokens_dictionary["content"][0].strip().split("\n", 1)[0] + parse_error = { + "type": "parse_error", + "datetime": format_date_string(str(datetime.datetime.now())), + "expected": "A Date header in the reply from user section", + "got": content_start, + "line_num": original_string[:original_string.find(content_start)].count("\n") + 1 + } + raise ParseError(parse_error["line_num"], "Expected a 'Date' header in the reply from user section") + + headers_list = [] + for key in headers.keys(): + headers_list.append({"type": key, "content": headers[key]}) + + for header in headers_list: + if header["type"] == "Date": + tokens_dictionary["datetime"] = header["content"] + elif header["type"] == "Subject": + tokens_dictionary["subject"] = header["content"] + elif header["type"] == "From": + user_name, user_email = email.utils.parseaddr(header["content"]) + tokens_dictionary["from_name"] = user_name + tokens_dictionary["from_email"] = user_email + elif header["type"].lower() == "cc": + cc_list = [ + { "name": user_name, "email" :user_email } + for user_name, user_email in email.utils.getaddresses([header["content"]]) + ] + tokens_dictionary["cc"] = cc_list + + tokens_dictionary["headers"] = headers_list + + # Format date header + if "datetime" in tokens_dictionary.keys(): + try: + formatted_date = format_date_string(tokens_dictionary["datetime"]) + except (ValueError, OverflowError): + line_number = original_string[:original_string.find(f"{key}: {headers[key]}")].count("\n") + 1 + parsed_item.append({ + "type": "parse_error", + "datetime": format_date_string(str(datetime.datetime.now())), + "expected": "Expected a date and/or time", + "got": headers[key], + "line_num": line_number + }) + raise ParseError(line_number, f"Could not format date header") + + tokens_dictionary["datetime"] = formatted_date + + # Convert content string to list of lines + if "content" in tokens_dictionary.keys(): + tokens_dictionary["content"] = tokens_dictionary["content"][0].strip() + tokens_dictionary["content"] = tokens_dictionary["content"].splitlines(keepends=True) + + parsed_item.append(tokens_dictionary) + return + + return parse_section + +def check_for_nested_action(original_string, match_start_index, tokens): + """Checks for nested action in reply_from_user. + + Args: + original_string (string): The original string passed to PyParsing + match_start_index (int): The character index where the match starts. + tokens (pyparsing.ParseResults): The PyParsing results. + + Raises: + ParseError: If nested action is found. + """ + token_string = tokens[0] + strings_that_indicate_nesting = [ + edit_start_delimiter, + status_start_delimiter, + reply_to_user_start_delimiter, + reply_from_user_start_delimiter + ] + + for delimiter in strings_that_indicate_nesting: + if delimiter in token_string: + line_number = 1 + original_string[:match_start_index].count("\n") + token_string[:token_string.find(delimiter)].count("\n") + parsed_item.append({ + "type": "parse_error", + "datetime": format_date_string(str(datetime.datetime.now())), + "expected": reply_from_user_end_delimiter, + "got": f"Found nested action '{delimiter}' in reply from user", + "line_num": line_number + }) + raise ParseError(line_number, f"Found nested action '{delimiter}' in reply from user") + return + +def error_handler(original_string, match_start_index, tokens): + token_string = tokens[0][0] + + parse_error = { + "type": "parse_error", + 'datetime': format_date_string(str(datetime.datetime.now())), + } + + if token_string == reply_from_user_start_delimiter and \ + reply_from_user_end_delimiter in original_string[match_start_index:]: + + expected_token = "\n\n" + line_number = original_string[:original_string[match_start_index:].find(reply_from_user_end_delimiter) + match_start_index].count("\n") + 1 + + parse_error["expected"] = expected_token + parse_error["got"] = reply_from_user_end_delimiter + parse_error["line_num"] = line_number + parsed_item.append(parse_error) + raise ParseError(line_number, f"No newline found after header information") + + elif token_string == reply_from_user_start_delimiter: + expected_token = reply_from_user_end_delimiter + line_number = original_string.count('\n') + 1 + + parse_error["expected"] = expected_token + parse_error["got"] = "End of file" + parse_error["line_num"] = line_number + parsed_item.append(parse_error) + raise ParseError(line_number, f"No reply from user end delimiter found") + else: + expected_token = f"Action delimiter starting with '{action_start_delimiter}' and ending with '{action_end_delimiter}' or {reply_from_user_start_delimiter}" + line_number = (original_string[:match_start_index]).count('\n') + 1 + + parse_error["expected"] = expected_token + parse_error["got"] = token_string + parse_error["line_num"] = line_number + parsed_item.append(parse_error) + raise ParseError(line_number, f"No action start delimiter found") + + +################################################################################ +# Rules +################################################################################ +header_rule = pp.SkipTo("\n\n").leaveWhitespace() + +directory_rule = pp.Dict( + pp.White("\n").suppress() + + pp.Optional(pp.Group("Name" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("Login" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("Computer" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("Location" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("Email" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("Phone" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("Office" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("UNIX Dir" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("Zero Dir" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("User ECNDB" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("Host ECNDB" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.Optional(pp.Group("Subject" + pp.Literal(":").suppress() + pp.SkipTo(pp.LineEnd()))) + + pp.White("\n\n").suppress() +).setParseAction(parse_section_by_type("directory_information")) + +initial_message_rule = pp.Group( + pp.SkipTo( + pp.Literal(reply_from_user_start_delimiter) + | pp.Literal(action_start_delimiter) + ) | pp.SkipTo(pp.StringEnd(), include=True) +).leaveWhitespace().setResultsName("content").setParseAction(parse_section_by_type("initial_message")) + +reply_from_user_rule = ( + (reply_from_user_start_delimiter + pp.OneOrMore(pp.LineEnd())).suppress() + + pp.SkipTo("\n\n").setResultsName("headers") + + (pp.Group(pp.SkipTo(reply_from_user_end_delimiter + pp.LineEnd()).setParseAction(check_for_nested_action)).setResultsName("content")) + + (pp.Literal(reply_from_user_end_delimiter) + pp.LineEnd()).suppress() + + pp.ZeroOrMore(pp.LineEnd()).suppress() +).leaveWhitespace().setParseAction(parse_section_by_type("reply_from_user")) + +reply_to_user_rule = ( + pp.LineStart() + + pp.Literal(reply_to_user_start_delimiter).suppress() + + pp.Word(pp.alphanums).setResultsName("by")+ + pp.Literal(" at: ").suppress() + + pp.Word(pp.nums + "/-: ").setResultsName("datetime") + + (pp.Literal(action_end_delimiter) + pp.LineEnd()).suppress() + + pp.Group( + pp.SkipTo( + (pp.LineStart() + pp.Literal(reply_from_user_start_delimiter)) + | (pp.LineStart() + pp.Literal(action_start_delimiter)) + ) | pp.SkipTo(pp.StringEnd(), include=True) + ).setResultsName("content") +).leaveWhitespace().setParseAction(parse_section_by_type("reply_to_user")) + +edit_rule = ( + pp.LineStart() + + pp.Literal(edit_start_delimiter).suppress() + + pp.Word(pp.alphanums).setResultsName("by") + + pp.Literal(" at: ").suppress() + + pp.Word(pp.nums + "/-: ").setResultsName("datetime") + + (pp.Literal(action_end_delimiter) + pp.LineEnd()).suppress() + + pp.Group( + pp.SkipTo( + (pp.LineStart() + pp.Literal(reply_from_user_start_delimiter)) + | (pp.LineStart() + pp.Literal(action_start_delimiter)) + ) | pp.SkipTo(pp.StringEnd(), include=True) + ).setResultsName("content") +).leaveWhitespace().setParseAction(parse_section_by_type("edit")) + +status_rule = ( + pp.LineStart() + + pp.Literal(status_start_delimiter).suppress() + + pp.Word(pp.alphanums).setResultsName("by") + + pp.Literal(" at: ").suppress() + + pp.Word(pp.nums + "/-: ").setResultsName("datetime") + + (pp.Literal(action_end_delimiter) + pp.LineEnd()).suppress() + + pp.Group( + pp.SkipTo( + (pp.LineStart() + pp.Literal(reply_from_user_start_delimiter)) + | (pp.LineStart() + pp.Literal(action_start_delimiter)) + ) | pp.SkipTo(pp.StringEnd(), include=True) + ).setResultsName("content") +).leaveWhitespace().setParseAction(parse_section_by_type("status")) + +error_rule = pp.Group( + pp.SkipTo(pp.LineEnd()) +).setParseAction(error_handler) + +item_rule = ( + header_rule + + pp.Optional(directory_rule).suppress() + + initial_message_rule + + pp.ZeroOrMore( + reply_from_user_rule + | reply_to_user_rule + | edit_rule + | status_rule + ) + pp.Optional(error_rule) +) + + + +def parse_item(item_body: string, raise_on_error: bool = False) -> list: + """Accepts string of an Item body and returns JSON serializable list of dictionary with formatted action types. + + Args: + item_body (string): The string of the item to be parsed. + raise_on_error (bool): If true, a ParseError is raised when parsing error encountered. Otherwise, a parse error dictionary is added to the return value before being returned. Defaults to False. + + Returns: + list: List of actions as ordered in the item. Does not include initial message metadata or assignments. + + Raises: + ParseError: If raise_on_error is True, raises ParseError when parsing error occurs. Otherwise adds parse_error acction to return value. + """ + if raise_on_error: + item_rule.parseString(item_body) + else: + try: + item_rule.parseString(item_body) + except ParseError: + pass + + return parsed_item \ No newline at end of file diff --git a/src/webqueue2api/parser/utils.py b/src/webqueue2api/parser/utils.py new file mode 100644 index 0000000..945a421 --- /dev/null +++ b/src/webqueue2api/parser/utils.py @@ -0,0 +1,26 @@ +"""Shared utilities for the parser package""" + +from dateutil import parser, tz +from datetime import datetime + +def format_date_string(date: str) -> str: + """Returns the date/time formatted as RFC 8601 YYYY-MM-DDTHH:MM:SS+00:00. + Returns empty string if the string argument passed to the function is not a datetime. + See: https://en.wikipedia.org/wiki/ISO_8601 + + Returns: + str: Properly formatted date/time recieved or empty string. + """ + try: + # This date is never meant to be used. The default attribute is just to set timezone. + parsed_date = parser.parse(date, default=datetime( + 1970, 1, 1, tzinfo=tz.gettz('EDT'))) + except: + return "" + + parsed_date_string = parsed_date.strftime("%Y-%m-%dT%H:%M:%S%z") + + return parsed_date_string + +if __name__ == "__main__": + print(format_date_string("Tue, 23 Jun 2020 13:30:45 -0400")) \ No newline at end of file