diff --git a/.gitignore b/.gitignore index b6e4761..dfd7713 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# Ignore VS Code Config +.vscode/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] @@ -126,4 +129,4 @@ venv.bak/ dmypy.json # Pyre type checker -.pyre/ +.pyre/ \ No newline at end of file diff --git a/api/ECNQueue.py b/ECNQueue.py similarity index 100% rename from api/ECNQueue.py rename to ECNQueue.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/api.py b/api.py similarity index 78% rename from api/api.py rename to api.py index 0cbd771..73128f6 100644 --- a/api/api.py +++ b/api.py @@ -5,8 +5,11 @@ jwt_required, get_jwt_identity, jwt_refresh_token_required, set_refresh_cookies, unset_refresh_cookies ) -from werkzeug.security import check_password_hash import os, dotenv +from easyad import EasyAD +from ldap.filter import escape_filter_chars +# pylint says this is an error but it works so ¯\_(ツ)_/¯ +from ldap import INVALID_CREDENTIALS as LDAP_INVALID_CREDENTIALS import ECNQueue # Load envrionment variables for ./.env @@ -47,6 +50,57 @@ +def user_is_valid(username: str, password: str) -> bool: + """Checks if user is valid and in webqueue2 login group. + + Args: + username (str): Career account username. + password (str): Career account passphrase. + + Returns: + bool: True if user is valid, otherwise False. + """ + + # Check for empty arguments + if (username == "" or password == ""): + return False + + # Initialize EasyAD + config = { + "AD_SERVER": "boilerad.purdue.edu", + "AD_DOMAIN": "boilerad.purdue.edu" + } + ad = EasyAD(config) + + # Prepare search critiera for Active Directory + credentials = { + "username": escape_filter_chars(username), + "password": password + } + attributes = [ 'cn', "memberOf" ] + filter_string = f'(&(objectClass=user)(|(sAMAccountName={username})))' + + # Do user search + try: + user = ad.search(credentials=credentials, attributes=attributes, filter_string=filter_string)[0] + except LDAP_INVALID_CREDENTIALS: + return False + + # Isolate group names + # Example: + # 'CN=00000227-ECNStuds,OU=BoilerADGroups,DC=BoilerAD,DC=Purdue,DC=edu' becomes + # `00000227-ECNStuds` + user_groups = [ group.split(',')[0].split('=')[1] for group in user["memberOf"] ] + + # Check group membership + webqueue_login_group = "00000227-ECN-webqueue" + if webqueue_login_group not in user_groups: + return False + + return True + + + class Login(Resource): def post(self) -> tuple: """Validates username/password and returns both access and refresh tokens. @@ -76,10 +130,8 @@ def post(self) -> tuple: if field not in data.keys(): return ({ "message": f"{field} missing from request body"}, 422) - if data["username"] != os.environ.get("SHARED_USERNAME"): - return ({ "message": "Username invalid"}, 401) - if not check_password_hash(os.environ.get("SHARED_PASSWORD_HASH"), data["password"]): - return ({ "message": "Password invalid"}, 401) + if not user_is_valid(data["username"], data["password"]): + return ({ "message": "Username or password is invalid"}, 401) access_token = create_access_token(data["username"]) refresh_token = create_refresh_token(data["username"]) diff --git a/api/ECNQueue_old.py b/api/ECNQueue_old.py deleted file mode 100755 index 8bef24b..0000000 --- a/api/ECNQueue_old.py +++ /dev/null @@ -1,473 +0,0 @@ -#!/usr/local/bin/python - -#------------------------------------------------------------# -# Summary: Generates pages viewed by end users -#------------------------------------------------------------# - - -#------------------------------------------------------------# -# Import Libraries -#------------------------------------------------------------# -import email -import base64 -import os -import sys -import time -from string import digits -from email.Utils import parseaddr - - -#------------------------------------------------------------# -# Configure Script Environment -#------------------------------------------------------------# -c_queue_command_dir = '/usr/local/etc/ecn/queue/' - -# Use Live Queue: -# c_queue_dir = '/home/pier/e/queue/Mail/' - -# Use Test Queue: -c_queue_dir = '/Users/justincampbell/GitHub/ecn-queue/webqueue/q-snapshot' - -c_queues_to_skip = ['archives', 'drafts', 'inbox'] - -""" -c_header_map = { - 'priority':'qpriority', - 'status':'qstatus', - 'building':'qbuilding', - 'status-updated-by':'qstatus-updated-by', - 'time':'qtime', - 'assigned-to':'qassigned-to' -} -""" - - -class QueueItem: - - def __init__(self, queue_name, number, archive=''): - " Initialize a new Queue Item " - self.queue_name = queue_name # name of the queue containing the item - self.number = int(number) # queue item number - self.attributes = {'number': int(number), 'queue_name': queue_name} # dictionary of headers, attributes, etc. - self.headers = '' # text version of e-mail headers - self.content = '' # text version of e-mail content - self.body = {} # text version of the body of the e-mail - self.attachments = {} # dictionary of attachments keyed on filename - self.message = None # python e-mail representation of the message - self.archive = archive - - def load(self, headers_only=0): - " Load the content of this queue item " - - # Get an open file ready to read the queue item - # file = qCmd('qshow', self.queue_name, self.number, file=1) - if self.archive: - queue_dir = '%sarchives/%s/%s/' % (c_queue_dir, - self.queue_name, self.archive) - else: - queue_dir = '%s%s/' % (c_queue_dir, self.queue_name) - - if os.access(queue_dir + str(self.number), os.R_OK): - file = open(queue_dir + str(self.number)) - else: - return False - - self.attributes['last_updated'] = os.path.getmtime( - queue_dir + str(self.number)) - - in_headers = True - self.headers = '' - self.content = '' - - sys.stdout.flush() - line = 1 - num_lines = 0 - while line: - if in_headers: - line = file.readline() - num_lines += 1 - - # Skip lines beginning with [ or ( in the headers - if len(line) and line[0] in '[(': - if line.find('[%s] ' % self.queue_name) == 0: - line = line.replace('[%s] ' % self.queue_name, '') - else: - continue - - # a newline designates the end of the headers - if len(line) and line[0] == '\n': - in_headers = False - if headers_only: break - - self.headers += line - - else: - self.content += file.read() - break - - file.close() - - self.content = self.stripAttachments(self.content) - - self.message = email.message_from_string(self.headers + self.content) - - # Populate the message attributes from the - # message headers - for key in self.message.keys(): - self[key.lower()] = self.message[key] - - self['from_username'] = parseaddr(self['from'])[1].split('@')[0] - self['subject_status'] = '
%(subject)s
%(qstatus)s
' % self - self['for_username'] = self['qassigned-to'].lower() - - if not headers_only: - # Populate the message attachments from the - # message objects - self.body = {} - - if self.message.is_multipart(): - for part in self.message.walk(): - if part.get_filename(): - self.attachments[part.get_filename()] = {'content-type':part.get_content_type(), 'file':part.get_payload(decode=True)} - - elif part.get_content_type()[:5] == 'text/': - content_type = part.get_content_type() - - if not self.body.has_key(content_type): - self.body[content_type] = '' - - self.body[content_type] += part.get_payload(decode=True) - - else: - # If the message is not multipart then - # this is the body of the document - self.body[self.message.get_content_type()] = self.message.get_payload(decode=True) - - return True - - - def locked(self): - if self.archive: - queue_dir = '%sarchives/%s/%s/' % (c_queue_dir, self.queue_name, self.archive) - else: - queue_dir = '%s%s/' % (c_queue_dir, self.queue_name) - - if os.access(queue_dir + str(self.number) + '.lck', os.R_OK): - file = open(queue_dir + str(self.number) + '.lck') - parts = file.read().replace('\n', '').split(' ') - lock_info = { - 'file':parts[0], - 'program':parts[1], - 'user':parts[-1] - } - return lock_info - return False - - - def stripAttachments(self, message): - if not message: return '' - - lines = message.split('\n') - - attachments = [] - attachment_boundary = '' - attachment_data = '' - message_data = '' - - for line_index, line in enumerate(lines): - if attachment_boundary: - if line.find(attachment_boundary) == 0: - attachments.append(attachment_data) - attachment_boundary = '' - attachment_data = '' - else: - attachment_data += line + '\n' - - elif line.lower().find('content-type:') == 0: - header, value = line.split(':', 1) - value = value.strip().lower() - if value.find('text') == 0 or \ - value.find('multipart') == 0 or \ - value.find('message') == 0: - message_data += line + '\n' - else: - for x in range(line_index-1, 0, -1): - if lines[x].find('--') == 0: - attachment_boundary = lines[x] - for y in range(x+1, line_index): - attachment_data += lines[y] + '\n' - break - attachment_data += line + '\n' - - else: - message_data += line + '\n' - - for attachment in attachments: - message = email.message_from_string(attachment) - self.message = message - self.attachments[str(message.get_filename())] = {'content-type':message.get_content_type(), 'file':message.get_payload(decode=True)} - - return message_data - - def loadHeaders(self): - """ Helper function which calls load with headers_only=1 """ - self.load(headers_only=1) - - def getHeaders(self): - " Return the HTML Headers for this queue item " - if not self.headers: self.loadHeaders() - return self.headers - - def getBody(self, content_types=['text/plain','text/html']): - """ - Return the body of the e-mail using the content_types - to specify a prefered content type. If no content type - matching the prefered type is found it returns the first - body element - """ - - if not self.content: self.load() - - body_text = '' - for content_type in content_types: - if content_type in self.body: - body_text += '\n' + self.body[content_type] - if self.message.epilogue: - body_text += '\n' + self.message.epilogue - if body_text: - return body_text - """ - for content_type in content_types: - if content_type in self.body: - return self.body[content_type] - """ - - if len(self.body.keys()): - return self.body[self.body.keys()[0]] - else: - return '' - - def getAttachments(self): - " Return a list of filenames for all attachments " - if not self.content: self.load() - return self.attachments.keys() - - def getAttachment(self, filename): - " Return the content of a specific attachment " - if not self.content: self.load() - return self.attachments[filename]['file'] - - def getAttachmentContentType(self, filename): - " Return the content-type of a specific attachment " - if not self.content: self.load() - return self.attachments[filename]['content-type'] - - def getNumber(self): - " Returns the number of the queue item " - return self.number - - def numAttachments(self): - " Returns the number of attachments " - if not self.content: self.load() - return len(self.getAttachments()) - - def lastUpdated(self): - return self.attributes['last_updated'] - - def __contains__(self, item): - return item in self.attributes - - def __getitem__(self, key): - if not self.headers: - try: - self.loadHeaders() - except: - raise Exception('Error In Headers', 'Queue Item #%s' % self.number) - - key = key.lower() - # key = c_header_map.get(key, key) - return self.attributes.get(key,'') - - def __setitem__(self, key, value): - self.attributes[key.lower()] = value - - def __str__(self): - return "%14s:%-4s %-10s %-40s" % (self.queue_name, self.number, self['from'], self['subject']) - -class Queue: - - - def __init__(self, queue_name, archive=''): - self.loaded = False - self.queue_name = queue_name - self.archive = archive - self.num_items = None - self.items = [] - self.filtered_items = [] - self.filters = {} - self.sort_on = '' - self.sort_direction = 'ascending' - - def loadItems(self): - self.loaded = True - self.items = [] - - if self.archive: - # Where self.archive = 'YM0502' - queue_dir = c_queue_dir + 'archives/' + self.queue_name + '/' + self.archive + '/' - else: - # lines = qCmd('qscan', self.queue_name).split('\n')[1:] - queue_dir = c_queue_dir + self.queue_name + '/' - - if not os.access(queue_dir, os.F_OK): - return - - for file in os.listdir(queue_dir): - valid = True - for letter in file: - if letter not in digits: - valid = False - break - - if valid and os.access(queue_dir + file, os.R_OK): - item_num = file - item = QueueItem(self.queue_name, file, self.archive) - self.items.append(item) - - self.num_items = len(self.items) - - def sort(self, sort_on, sort_direction='ascending'): - if not self.loaded: self.loadItems() - self.sort_on = sort_on - self.sort_direction = sort_direction - if self.sort_on == 'qpriority': - self.items.sort(lambda a,b:cmp(a[sort_on].upper(), b[sort_on].upper())) - elif self.sort_on == 'date': - self.items.sort(lambda a,b:cmp(time.mktime(email.utils.parsedate(a[sort_on])), time.mktime(email.utils.parsedate(b[sort_on])))) - else: - self.items.sort(lambda a,b:cmp(a[sort_on], b[sort_on])) - - if sort_direction == 'descending': - self.items.reverse() - - self.filtered_items = [] - - def setFilter(self, name, value): - self.filters[name.lower()] = value.lower() - self.filtered_items = [] - - def addItems(self, items): - self.items.extend(items) - self.num_items = len(self.items) - self.loaded = True - - def setItems(self, items): - self.items = items[:] - self.num_items = len(self.items) - self.loaded = True - - def getItems(self, exact_match=False): - if not self.loaded: self.loadItems() - - if not self.filters: - return self.items - - elif self.filtered_items: - return self.filtered_items - - for item in self.items: - matches = False - for filter in self.filters: - for word in self.filters[filter].split(' or '): - word = word.strip() - - if not word: - continue - - if word[0] == '!': - if item[filter].lower().find(word[1:]) < 0: - matches = True - - elif exact_match and item[filter].lower() == word: - matches = True - break - elif not exact_match and item[filter].lower().find(word) >= 0: - matches = True - break - - if matches: - self.filtered_items.append(item) - - return self.filtered_items - - def getName(self): - return self.queue_name - - def setNumItems(self, num): - self.num_items = num - - def getNumItems(self): - if self.num_items is None: - self.num_items = 0 - - if self.archive: - queue_dir = c_queue_dir + 'archives/' + self.queue_name + '/' + self.archive - else: - queue_dir = c_queue_dir + self.queue_name - - files = os.listdir(queue_dir) - for file in files: - valid = 1 - for c in str(file): - if c not in digits: - valid = 0 - break - if valid: - self.num_items += 1 - - return self.num_items - - def lastUpdated(self): - update_times = [] - for item in self.getItems(): - update_times.append(item['last_updated']) - update_times.sort() - update_times.reverse() - if len(update_times): - return update_times[0] - return -1 - - def __len__(self): - return self.getNumItems() - - def __str__(self): - return "%-20s %s" % (self.getName(), self.getNumItems()) - - def __add__(self, other): - new_queue = Queue(self.getName() + '+' + other.getName()) - new_queue.addItems(self.getItems()) - new_queue.addItems(other.getItems()) - return new_queue - - def __cmp__(self, other): - return cmp(self.getName(), other.getName()) - - def __getitem__(self, index): - return self.getItems()[index] - -def getQueues(): - queues = [] - - for file in os.listdir(c_queue_dir): - if os.access(c_queue_dir + file, os.R_OK) and os.path.isdir(c_queue_dir + file) and file not in c_queues_to_skip: - queue = Queue(file) - queues.append(queue) - - return queues - -if __name__ == '__main__': - - # Create a combined Queue - item = QueueItem('webmaster', 22) - body = item.getBody() - diff --git a/api/requirements.txt b/api/requirements.txt deleted file mode 100644 index 59e3e3c..0000000 --- a/api/requirements.txt +++ /dev/null @@ -1,19 +0,0 @@ -# See: https://pip.pypa.io/en/stable/reference/pip_install/#example-requirements-file - -# General Utilities -gunicorn -pipdeptree -pylint - -# API -python-dotenv -Flask-RESTful -python-dateutil -Flask-JWT-Extended -# Prevent upgrade to 2.x until Flask-JWT-Extended is updated to support it -PyJWT == 1.* - -# API Documentation -mkdocs -mkdocs-material -mkautodoc \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..bc5fc39 --- /dev/null +++ b/setup.py @@ -0,0 +1,60 @@ +import setuptools, logging +from pathlib import Path + +# Configure the logger +logger_name = "webqueueapi_install_log" +logger = logging.getLogger(logger_name) +logger.setLevel(logging.DEBUG) + +# See Formatting Details: https://docs.python.org/3/library/logging.html#logrecord-attributes +# Example: Jan 28 2021 12:19:28 venv-manager : [INFO] Message +log_message_format = "%(asctime)s %(name)s : [%(levelname)s] %(message)s" +# See Time Formatting Details: https://docs.python.org/3.6/library/time.html#time.strftime +# Example: Jan 28 2021 12:19:28 +log_time_format = "%b %d %Y %H:%M:%S" +log_formatter = logging.Formatter(log_message_format, log_time_format) + +# Configure output to stdout +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(log_formatter) +stream_handler.setLevel(logging.INFO) +logger.addHandler(stream_handler) + +# Configure out to logfile, located in '/tmp/webqueueapi install log.log' +log_file_path = Path(f"/tmp/{logger_name}.log") +file_handler = logging.FileHandler(log_file_path) +file_handler.setFormatter(log_formatter) +logger.addHandler(file_handler) + +logger.debug("Attempting to install webqueue2-api package") + +setuptools.setup( + name="webqueue2-api", + version="0.9.1", + description="A library for managing Purdue ECN's queue system.", + py_modules=['api', 'ECNQueue'], + python_requires='>=3.6', + install_requires = [ + # General Utilities + "pipdeptree", + "gunicorn", + "pylint", + + # API + "python-dotenv", + "Flask-RESTful", + "python-dateutil", + "Flask-JWT-Extended", + # Prevent upgrade to 2.x until Flask-JWT-Extended is updated to support it + "PyJWT == 1.*", + # Custom version of python-ldap without SASL requirements + "python-ldap @ git+https://github.itap.purdue.edu/ECN/python-ldap/@python-ldap-3.3.1", + + # API Documentation + "mkdocs", + "mkdocs-material", + "mkautodoc" + ] +) + +logger.info("webqueue2-api package installed sucessfully") \ No newline at end of file diff --git a/utils/venv-manager.py b/utils/venv-manager.py deleted file mode 100644 index 3bb15ce..0000000 --- a/utils/venv-manager.py +++ /dev/null @@ -1,255 +0,0 @@ -"""Allows for creating, deleting and removing Python virtual environments in webqueue2 - -Examples: - Create a virtual environment: - $ venv-manager.py create - - Delete a virtual environment: - $ venv-manager.py delete - - Reset a virtual environment: - $ venv-manager.py reset -""" - -from pathlib import Path -import os, logging, argparse, subprocess -from typing import Union - - -################################################################################ -# Configuration -################################################################################ - -# Set virtual environment path -WEBQUEUE2_DIR = Path(os.path.abspath(__file__)).parent.parent -API_DIR = Path(WEBQUEUE2_DIR, "api") -VENV_NAME = "venv" -VENV_DIR = Path(API_DIR, VENV_NAME) - - -# Set minimum pip major version -TARGET_PIP_VERSION = 19 - - -# Configure the logger -logger_name = "venv-manager" -logger = logging.getLogger(logger_name) -logger.setLevel(logging.DEBUG) - -# See: https://docs.python.org/3/library/logging.html#logrecord-attributes -log_message_format = "%(asctime)s %(name)s : [%(levelname)s] %(message)s" -# See: https://docs.python.org/3.6/library/time.html#time.strftime -log_time_format = "%b %d %Y %H:%M:%S" -log_formatter = logging.Formatter(log_message_format, log_time_format) - -stream_handler = logging.StreamHandler() -stream_handler.setFormatter(log_formatter) -stream_handler.setLevel(logging.INFO) - -log_file_path = Path(WEBQUEUE2_DIR, "utils", f'{logger_name}.log') -file_handler = logging.FileHandler(log_file_path) -file_handler.setFormatter(log_formatter) - -logger.addHandler(stream_handler) -logger.addHandler(file_handler) - - -################################################################################ -# Functions -################################################################################ - - -def get_args() -> argparse.Namespace: - """Parses arguments and returns argparses's generated namespace. - - Returns: - argparse.Namespace: Argparses's generated namespace. - """ - parser = argparse.ArgumentParser() - parser.add_argument( - "action", - help="Action to perform.", - choices=("create", "delete", "reset") - ) - parser.add_argument( - "--debug", - help="Print debug logs", - action="store_true", - ) - return parser.parse_args() - - -def run_logged_subprocess(command: Union[str, list], timeout: int = 60, shell: bool = True) -> tuple: - """Executes a shell command using subprocess with logging. - - stderr is redirected to stdout and stdout is pipped to logger. - If the subprocess raises an exception, the exception is logged as critical. - - Example: - Running a successful command: - run_logged_subprocess(command=["git", "commit", "-m", "'Commit message.'"]) - Returns: (0, "") - - Running an unsuccessful shell command with a 20 second timeout: - run_logged_subprocess(command="cd test/", timeout=20, shell=True) - Returns: (1, "cd: test: No such file or directory\n") - - Args: - command (Union): The command to run. If shell=False, pass a list with the first item being the command and the subsequent items being arguments. If shell=True, pass a string as you would type it into a shell. - timeout (int): The number of seconds to wait for a program before killing it. Defaults to 60. - - Returns: - tuple: With the first value being the return code and second being the combined stdout+stderr - """ - logger.debug(f"Entering subprocess for '{command}'") - with subprocess.Popen(command,\ - stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=shell, universal_newlines=True)\ - as logged_shell_process: - - subprocess_log_prefix = f"(PID: {logged_shell_process.pid})" - - try: - # Convert combined stdout and stderr stream to list of strings - process_output_stream, _ = logged_shell_process.communicate(timeout=timeout) - process_output_lines = process_output_stream.split("\n") - # Remove last entry in process_output_lines because it is always empty - process_output_lines.pop(-1) - - for line in process_output_lines: - logger.debug(f"{subprocess_log_prefix}: {line}") - except Exception as exception: - logger.critical(str(exception)) - else: - if logged_shell_process.returncode != 0: - logger.debug(f"Something went wrong. '{command}' exited with return code {logged_shell_process.returncode}") - elif logged_shell_process.returncode == 0: - logger.debug(f"Subprocess for '{command}' completed successfuly") - finally: - logger.debug(f"Exiting subprocess for '{command}'") - return (logged_shell_process.returncode, process_output_stream) - - -def create_environment() -> int: - """Creates a virtual environment for webqueue2 - - Exit Codes: - 0 = Success - 5 = VENV_DIR already exists - 10 = Could not create VENV_DIR - 15 = Could not install requirements - - Returns: - int: Exit code - """ - - logger.info(f"Creating virtual environment {VENV_NAME} at {VENV_DIR}") - - # Check for an existing virtual environment - try: - os.mkdir(VENV_DIR) - except FileExistsError: - logger.warning(f"The directory {VENV_DIR} already exists. Exiting") - return 5 - - # Create virtual environmentc - create_env_returncode, _ = run_logged_subprocess(f"cd {API_DIR} && python3 -m venv {VENV_NAME}", shell=True) - if create_env_returncode == 0: - logger.info(f"Virtual environment {VENV_NAME} created at {VENV_DIR}") - else: - logger.critical(f"Could not create virtual environment {VENV_NAME} at {VENV_DIR}. Exiting") - return 10 - - # Check pip version - logger.debug("Checking pip version") - check_pip_returncode, check_pip_output = run_logged_subprocess(f"{VENV_DIR}/bin/pip --version") - - if check_pip_returncode != 0: - logger.warning("Could not check pip version. Virtual environment dependencies may not install") - - pip_version_full = check_pip_output.split()[1] - logger.debug(f"pip version is {pip_version_full}") - - pip_version_major = pip_version_full.split(".")[0] - if int(pip_version_major) < 19: - logger.info(f"pip verion is {pip_version_major}.x (pip >= {TARGET_PIP_VERSION}.x needed.) Upgrading pip") - update_pip_returncode, update_pip_output = run_logged_subprocess(f"{VENV_DIR}/bin/pip install --upgrade pip") - - if update_pip_returncode == 0: - logger.info(update_pip_output.split("\n")[-2]) - else: - logger.warning("Failed to update pip. Virtual environment dependencies may not install") - - # Install requirements - logger.info("Installing requirements") - install_requirements_returncode, _ = run_logged_subprocess(f"{VENV_DIR}/bin/pip install -r {API_DIR}/requirements.txt") - if install_requirements_returncode == 0: - logger.info("Successfully installed requirements") - return 0 - else: - logger.critical("Failed to install requirements. Exiting") - return 15 - - -def delete_environment() -> int: - """Deletes a virtual environment for webqueue2 - - Exit Codes: - 0 = Success - 5 = Could not delete VENV_DIR - - Returns: - int: Exit code - """ - logger.info(f"Deleting virtual environment {VENV_NAME} at {VENV_DIR}") - - delete_venv_returncode, _ = run_logged_subprocess(f"rm -rf {VENV_DIR}") - if delete_venv_returncode == 0: - logger.info(f"Successfully deleted virtual environment {VENV_NAME} at {VENV_DIR}") - return 0 - else: - logger.critical(f"Failed to delete virtual environment {VENV_NAME} at {VENV_DIR}. Exiting") - return 5 - -def reset_environment() -> int: - """Resets a virtual environment for webqueue2 - - Exit Codes: - 0 = Success - 5 = Could not delete VENV_DIR - 10 = Could not create VENV_DIR - - Returns: - int: Exit code - """ - logger.info(f"Resetting virtual environment {VENV_NAME} at {VENV_DIR}") - - delete_returncode = delete_environment() - if delete_returncode != 0: - logger.critical(f"Failed to reset virtual environment {VENV_NAME} at {VENV_DIR}. Exiting") - return 5 - - create_returncode = create_environment() - if create_returncode != 0: - logger.critical(f"Failed to reset virtual environment {VENV_NAME} at {VENV_DIR}. Exiting") - return 10 - - logger.info(f"Successfully reset virtual environment {VENV_NAME} at {VENV_DIR}. Exiting") - - -if __name__ == "__main__": - logger.info(f"Starting venv-manager. Log file available at {log_file_path}") - - args = get_args() - action = args.action - - if args.debug: - stream_handler.setLevel(logging.DEBUG) - - if action == "create": - exit(create_environment()) - elif action == "delete": - exit(delete_environment()) - elif action == "reset": - exit(reset_environment()) - else: - logger.critical(f'Invalid argument {action}') \ No newline at end of file