diff --git a/src/webqueue2api/api/resources/item.py b/src/webqueue2api/api/resources/item.py index 85fc9e0..c1380f1 100644 --- a/src/webqueue2api/api/resources/item.py +++ b/src/webqueue2api/api/resources/item.py @@ -39,6 +39,6 @@ def get(self, queue: str, number: int) -> tuple: tuple: Item as JSON and HTTP response code. """ try: - return (_Item(queue, number).toJson(), 200) + return (_Item(queue, number).to_json(), 200) except ItemDoesNotExistError: return ({"message": f"Item {queue}{number} not found."}, 404) \ No newline at end of file diff --git a/src/webqueue2api/parser/item.py b/src/webqueue2api/parser/item.py index ed9b387..592fe6f 100644 --- a/src/webqueue2api/parser/item.py +++ b/src/webqueue2api/parser/item.py @@ -57,24 +57,24 @@ def __init__(self, queue: str, number: int) -> None: if not self.path.exists(): raise ItemDoesNotExistError(str(self.path)) - self.last_updated = self.__getLastUpdated() - self.__raw_tem = self.__getRawItem() - self.headers = self.__parseHeaders() + self.last_updated = self.__get_time_last_updated() + self.__raw_tem = self.__get_raw_item() + self.headers = self.__parse_eaders() self.content = self.__parseSections() - self.is_locked = self.__isLocked() - self.user_email = self.__parseFromData(data="userEmail") - self.user_name = self.__parseFromData(data="userName") - self.user_alias = self.__getUserAlias() - self.assigned_to = self.__getMostRecentHeaderByType("Assigned-To") - self.subject = self.__getMostRecentHeaderByType("Subject") - self.status = self.__getMostRecentHeaderByType("Status") - self.priority = self.__getMostRecentHeaderByType("Priority") - self.department = self.__getMostRecentHeaderByType("Department") - self.building = self.__getMostRecentHeaderByType("Building") - self.date_received = self.__getFormattedDate(self.__getMostRecentHeaderByType("Date")) + self.is_locked = self.__check_is_locked() + self.user_email = self.__parse_from_data(data="user_email") + self.user_name = self.__parse_from_data(data="user_name") + self.user_alias = self.__get_user_alias() + self.assigned_to = self.__get_most_recent_header_by_type("Assigned-To") + self.subject = self.__get_most_recent_header_by_type("Subject") + self.status = self.__get_most_recent_header_by_type("Status") + self.priority = self.__get_most_recent_header_by_type("Priority") + self.department = self.__get_most_recent_header_by_type("Department") + self.building = self.__get_most_recent_header_by_type("Building") + self.date_received = self.__get_formatted_date(self.__get_most_recent_header_by_type("Date")) # TODO: Autopopulate jsonData w/ __dir__() command. Exclude `^_` and `jsonData`. - self.jsonData = { + self.json_data = { "queue": self.queue, "number": self.number, "lastUpdated": self.last_updated, @@ -93,7 +93,7 @@ def __init__(self, queue: str, number: int) -> None: "dateReceived": self.date_received } - def __getLastUpdated(self) -> str: + def __get_time_last_updated(self) -> str: """Returns last modified time of item reported by the filesystem in mm-dd-yy hh:mm am/pm format. Example: @@ -102,13 +102,11 @@ def __getLastUpdated(self) -> str: Returns: str: last modified time of item reported by the filesystem in mm-dd-yy hh:mm am/pm format. """ - # TODO: Simplify this code block by allowing __getFormattedDate to accept milliseconds since the epoch. - unixTime = os.path.getmtime(self.path) - formattedTime = time.strftime( - '%m-%d-%y %I:%M %p', time.localtime(unixTime)) - return self.__getFormattedDate(formattedTime) + unix_time = os.path.getmtime(self.path) + formatted_time = time.strftime('%m-%d-%y %I:%M %p', time.localtime(unix_time)) + return self.__get_formatted_date(formatted_time) - def __getRawItem(self) -> list: + def __get_raw_item(self) -> list: """Returns a list of all lines in the item file Returns: @@ -117,7 +115,7 @@ def __getRawItem(self) -> list: with open(self.path, errors="replace") as file: return file.readlines() - def __getHeaderBoundary(self) -> int: + def __get_header_boundary(self) -> int: """Returns the 0 based line number where the Item headers stop. Example: The header end would be on line 13 @@ -128,11 +126,11 @@ def __getHeaderBoundary(self) -> int: Returns: int: line number where the Item headers end """ - for lineNumber, line in enumerate(self.__raw_tem): + for line_number, line in enumerate(self.__raw_tem): if line == "\n": - return lineNumber + return line_number - def __parseHeaders(self) -> list: + def __parse_eaders(self) -> list: """Returns a list containing dictionaries of header type and data. Removes queue prefixes and whitespace. @@ -147,29 +145,27 @@ def __parseHeaders(self) -> list: Returns: list: Header dicts """ - headerString = "" + header_string = "" # Remove '[queue] ' prefixes: # Example: # [ce] QTime-Updated-By: campb303 becomes # QTime-Updated-By: campb303 - queuePrefixPattern = re.compile(r"\[.*?\] {1}") - for lineNumber in range(self.__getHeaderBoundary()): - line = self.__raw_tem[lineNumber] - lineHasQueuePrefix = queuePrefixPattern.match(line) + queue_prefix_pattern = re.compile(r"\[.*?\] {1}") + for line_number in range(self.__get_header_boundary()): + line = self.__raw_tem[line_number] + line_has_queue_prefix = queue_prefix_pattern.match(line) - if lineHasQueuePrefix: - queuePrefix = line[lineHasQueuePrefix.regs[0] - [0]: lineHasQueuePrefix.regs[0][1]] - line = line.replace(queuePrefix, "") + if line_has_queue_prefix: + queue_prefix = line[line_has_queue_prefix.regs[0][0]: line_has_queue_prefix.regs[0][1]] + line = line.replace(queue_prefix, "") - headerString += line + header_string += line - # message = email.message_from_string(headerString + "".join(self.__getContent())) - message = email.message_from_string(headerString) + message = email.message_from_string(header_string) headers = [] - dateHeaders=[ + date_headers=[ "QStatus-Updated-Time", "Status-Updated-Time", "Edited-Time", @@ -184,17 +180,15 @@ def __parseHeaders(self) -> list: ] for key in message.keys(): - headers.append({"type": key, "content": self.__getFormattedDate(message[key]) if key in dateHeaders else message[key]}) + headers.append({"type": key, "content": self.__get_formatted_date(message[key]) if key in date_headers else message[key]}) return headers - # TODO: Implement attachment parsing - def __parseSections(self) -> list: # List of all item events sections = [] - contentStart = self.__getHeaderBoundary() + 1 + contentStart = self.__get_header_boundary() + 1 contentEnd = len(self.__raw_tem) - 1 # List of assignments for the item @@ -507,7 +501,7 @@ def __assignmentParsing(self, contentStart: int) -> list: dateFromLine = ( re.search("(?<=Assigned-To-Updated-Time: )(.*)", line)).group() - assignedDateTime = self.__getFormattedDate(dateFromLine) + assignedDateTime = self.__get_formatted_date(dateFromLine) # Gets who assigned the Item elif line.startswith("Assigned-To-Updated-By: "): @@ -552,24 +546,24 @@ def __initialMessageParsing(self, content: list) -> dict: initialMessageDictionary["type"] = "initial_message" # Gets the initial message date from the header - rawMessageDateStr = self.__getMostRecentHeaderByType("Date") + rawMessageDateStr = self.__get_most_recent_header_by_type("Date") # Sets datetime in the intialMessage dictionary to UTC formatted date - initialMessageDictionary["datetime"] = self.__getFormattedDate( + initialMessageDictionary["datetime"] = self.__get_formatted_date( rawMessageDateStr) - initialMessageDictionary["from_name"] = self.__parseFromData( - data="userName") + initialMessageDictionary["from_name"] = self.__parse_from_data( + data="user_name") - initialMessageDictionary["from_email"] = self.__parseFromData( - data="userEmail") + initialMessageDictionary["from_email"] = self.__parse_from_data( + data="user_email") # Stores list of dictionaries for the recipients of the initial message initialMessageDictionary["to"] = [] # Parses the header looking for recipients of the initial message and stores it in a list of tuples rawMessageRecipientsList = email.utils.getaddresses( - [self.__getMostRecentHeaderByType("To")]) + [self.__get_most_recent_header_by_type("To")]) # Parses the CC list and stores the cc recipient information in a list of dictionaries for recipients in rawMessageRecipientsList: @@ -584,7 +578,7 @@ def __initialMessageParsing(self, content: list) -> dict: # Parses the header looking for CC recipients of the initial message and stores it in a list of tuples rawMessageCCList = email.utils.getaddresses( - [self.__getMostRecentHeaderByType("CC")]) + [self.__get_most_recent_header_by_type("CC")]) # Parses the CC list and stores the cc recipient information in a list of dictionaries for ccRecipients in rawMessageCCList: @@ -594,7 +588,7 @@ def __initialMessageParsing(self, content: list) -> dict: "email": ccRecipients[1]} ) - initialMessageDictionary["subject"] = self.__getMostRecentHeaderByType( + initialMessageDictionary["subject"] = self.__get_most_recent_header_by_type( "Subject") # Removes unecessary newlines from the begining and the end of the initial message @@ -655,7 +649,7 @@ def __editParsing(self, content: list, lineNum: int) -> dict: return self.__errorParsing(delimiterLine, lineNum, errorMessage) # Attempts to format the date and time into utc format - editInfo["datetime"] = self.__getFormattedDate(dateTimeString) + editInfo["datetime"] = self.__get_formatted_date(dateTimeString) # Remove the delimiter String and unecessary newlines editInfo["content"] = self.__getFormattedSectionContent(content) @@ -713,7 +707,7 @@ def __replyToParsing(self, content: list, lineNum: int) -> dict: return self.__errorParsing(delimiterLine, lineNum, errorMessage) # Formats date to UTC - replyInfo["datetime"] = self.__getFormattedDate(dateTimeString) + replyInfo["datetime"] = self.__get_formatted_date(dateTimeString) replyInfo["content"] = self.__getFormattedSectionContent(content) @@ -767,7 +761,7 @@ def __statusParsing(self, content: list, lineNum: int) -> dict: return self.__errorParsing(delimiterLine, lineNum, errorMessage) # Formats the date to UTC - statusInfo["datetime"] = self.__getFormattedDate(dateTimeString) + statusInfo["datetime"] = self.__get_formatted_date(dateTimeString) # Remove the delimiter String and unecessary newlines statusInfo["content"] = self.__getFormattedSectionContent(content) @@ -896,7 +890,7 @@ def __userReplyParsing(self, replyContent: list, lineNumber: int) -> dict: return self.__errorParsing(line, lineNumber + lineNum + 1, errorMessage) # Formatts the date to UTC - replyFromInfo["datetime"] = self.__getFormattedDate(dateStr) + replyFromInfo["datetime"] = self.__get_formatted_date(dateStr) elif line.startswith("Cc: ") and newLineCounter == 1: @@ -1001,7 +995,7 @@ def __errorParsing(self, line: str, lineNum: int, expectedSyntax: str) -> dict: errorDictionary["type"] = "parse_error" # Dateime of the parse error - errorDictionary["datetime"] = self.__getFormattedDate( + errorDictionary["datetime"] = self.__get_formatted_date( str(datetime.datetime.now())) # Item filepath @@ -1058,7 +1052,7 @@ def __getSortedSections(self, sectionsList: list) -> list: return sortedSections - def __isLocked(self) -> Union[str, bool]: + def __check_is_locked(self) -> Union[str, bool]: """Returns a string info about the lock if true and a bool False if false Example: A file is locked @@ -1070,93 +1064,88 @@ def __isLocked(self) -> Union[str, bool]: Returns: Union[str, bool]: String with info about lock if true, bool False if false """ - lockFile = str(self.path) + ".lck" - if os.path.exists(lockFile): - with open(lockFile) as file: - lockInfo = file.readline().split(" ") - lockedBy = lockInfo[4] - lockedUsing = lockInfo[1] - return "{queue} {number} is locked by {lockedBy} using {lockedUsing}".format(queue=self.queue, number=self.number, lockedBy=lockedBy, lockedUsing=lockedUsing) + lock_file = str(self.path) + ".lck" + if os.path.exists(lock_file): + with open(lock_file) as file: + lock_info = file.readline().split(" ") + locked_by = lock_info[4] + locked_using = lock_info[1] + return f"{self.queue} {self.number} is locked by {locked_by} using {locked_using}" else: return False - def __getMostRecentHeaderByType(self, headerType: str) -> str: + def __get_most_recent_header_by_type(self, header_type: str) -> str: """Return the data of most recent header of the given type. If no header of that type exists, return an empty string. Example: Requesting a Status header that does exist - __getMostRecentHeaderByType("Status") + __get_most_recent_header_by_type("Status") becomes "Waiting for Reply" Example: Requesting a Status header that doesn't exist - __getMostRecentHeaderByType("Status") + __get_most_recent_header_by_type("Status") becomes "" Args: - headerType (str): Type of header to return. + header_type (str): Type of header to return. Returns: str: data of most recent header of the given type or empty string. """ for header in self.headers: - if header["type"] == headerType: + if header["type"] == header_type: return header["content"] return "" - def __parseFromData(self, data: str) -> str: + def __parse_from_data(self, data: str) -> str: """Parse From header and return requested data. Returns empty string if requested data is unavailable. Examples: From data is "From: Campbell, Justin " - __parseFromData(data="userName") returns "Campbell, Justin" - __parseFromData(data="userEmail") returns "campb303@purdue.edu" + __parse_from_data(data="userName") returns "Campbell, Justin" + __parse_from_data(data="userEmail") returns "campb303@purdue.edu" Args: - data (str): The data desired; can be "userName" or "userEmail". + data (str): The data desired; can be "user_name" or "user_email". Returns: str: userName, userEmail or empty string. """ - fromHeader = self.__getMostRecentHeaderByType("From") - userName, userEmail = email.utils.parseaddr(fromHeader) + from_header = self.__get_most_recent_header_by_type("From") + user_name, user_email = email.utils.parseaddr(from_header) - if data == "userName": - return userName - elif data == "userEmail": - return userEmail + if data == "user_name": + return user_name + elif data == "user_email": + return user_email else: raise ValueError( - "data='" + str(data) + "' is not a valid option. data must be \"userName\" or \"userEmail\".") + "data='" + str(data) + "' is not a valid option. data must be \"user_name\" or \"user_email\".") - def __getUserAlias(self) -> str: + def __get_user_alias(self) -> str: """Returns user's Career Account alias if present. If Career Account alias isn't present, returns empty string. Example: Email from campb303@purdue.edu - userAlias = "campb303" + user_alias = "campb303" Example: Email from spam@spammer.net - userAlias = "" + user_alias = "" Returns: str: User's Career Account alias if present or empty string """ - - try: - emailUser, emailDomain = self.user_email.split("@") - - # Returns an error parse if the self.useremail doesn't contain exactally one "@" symbol + email_user, email_domain = self.user_email.split("@") except ValueError: - # Parses through the self.headers list to find the "From" header and its line number - for lineNum, header in enumerate(self.headers): + for line_num, header in enumerate(self.headers): if header["type"] == "From": - headerString = header["type"] + ": " + header["content"] - return self.__errorParsing(headerString, lineNum + 1, "Expected valid email Address") + header_string = header["type"] + ": " + header["content"] + return self.__errorParsing(header_string, line_num + 1, "Expected valid email Address") - return emailUser if emailDomain.endswith("purdue.edu") else "" + return email_user if email_domain.endswith("purdue.edu") else "" - def __getFormattedDate(self, date: str) -> str: + def __get_formatted_date(self, date: str) -> str: """Returns the date/time formatted as RFC 8601 YYYY-MM-DDTHH:MM:SS+00:00. Returns empty string if the string argument passed to the function is not a datetime. See: https://en.wikipedia.org/wiki/ISO_8601 @@ -1166,22 +1155,22 @@ def __getFormattedDate(self, date: str) -> str: """ try: # This date is never meant to be used. The default attribute is just to set timezone. - parsedDate = parse(date, default=datetime.datetime( + parsed_date = parse(date, default=datetime.datetime( 1970, 1, 1, tzinfo=tz.gettz('EDT'))) except: return "" - parsedDateString = parsedDate.strftime("%Y-%m-%dT%H:%M:%S%z") + parsed_date_string = parsed_date.strftime("%Y-%m-%dT%H:%M:%S%z") - return parsedDateString + return parsed_date_string - def toJson(self) -> dict: + def to_json(self) -> dict: """Returns a JSON safe representation of the item. Returns: dict: JSON safe representation of the item. """ - return self.jsonData + return self.json_data def __repr__(self) -> str: return self.queue + str(self.number) \ No newline at end of file