diff --git a/utils/venv-manager.py b/utils/venv-manager.py deleted file mode 100644 index 95e6eee..0000000 --- a/utils/venv-manager.py +++ /dev/null @@ -1,519 +0,0 @@ -"""Allows for creating, deleting and removing Python virtual environments in webqueue2 - -Examples: - Create a virtual environment: - $ venv-manager.py create - - Delete a virtual environment: - $ venv-manager.py delete - - Reset a virtual environment: - $ venv-manager.py reset - -Exit Codes: - 0 = Success - 5 = Failed to make directory VENV_DIR. It already exists. - 10 = Failed to make VENV_DIR. Incorrect permissions. - 15 = Failed to create virtual environment. See LOG FILE. - 20 = Failed read requirements file VENV_REQUIREMENTS_FILE. - 25 = Failed to install requirements. See LOG FILE. - 30 = VENV_INTERPRETER does not exist. - 35 = Failed to get pyldap release info from GitHub. - 40 = Failed to download pyldap source. See LOG FILE. - 45 = Failed to extract pyldap source. See LOG FILE. - 50 = Failed to read pyldap build config file. - 55 = Failed to write pyldap build config file. - 60 = Failed to build pyldap VERSION. See LOG FILE. - 65 = Failed to install pyldap VERSION. See LOG FILE. - 70 = Failed to delete VENV_DIR -""" - -from pathlib import Path -import os, logging, argparse, subprocess, urllib.request, json, configparser -from urllib.error import HTTPError, URLError -from typing import Union - - -################################################################################ -# Configuration -################################################################################ - -# Set virtual environment path -WEBQUEUE2_DIR = Path(os.path.abspath(__file__)).parent.parent -API_DIR = Path(WEBQUEUE2_DIR, "api") -VENV_NAME = "venv" -VENV_DIR = Path(API_DIR, VENV_NAME) - - -# Set virtual evironment resource paths -VENV_INTERPRETER = Path(VENV_DIR, "bin", "python3") -VENV_REQUIREMENTS_FILE = Path(API_DIR, "requirements.txt") - - -# Set minimum pip major version -TARGET_PIP_VERSION = 19 - - -# Configure the logger -logger_name = "venv-manager" -logger = logging.getLogger(logger_name) -logger.setLevel(logging.DEBUG) - -# See Formatting Details: https://docs.python.org/3/library/logging.html#logrecord-attributes -# Example: Jan 28 2021 12:19:28 venv-manager : [INFO] Message -log_message_format = "%(asctime)s %(name)s : [%(levelname)s] %(message)s" -# See Time Formatting Details: https://docs.python.org/3.6/library/time.html#time.strftime -# Example: Jan 28 2021 12:19:28 -log_time_format = "%b %d %Y %H:%M:%S" -log_formatter = logging.Formatter(log_message_format, log_time_format) - -# Configure output to stdout -stream_handler = logging.StreamHandler() -stream_handler.setFormatter(log_formatter) -stream_handler.setLevel(logging.INFO) -logger.addHandler(stream_handler) - -# Configure out to logfile -log_file_path = Path(WEBQUEUE2_DIR, "utils", f'{logger_name}.log') -file_handler = logging.FileHandler(log_file_path) -file_handler.setFormatter(log_formatter) -logger.addHandler(file_handler) - - -################################################################################ -# Functions -################################################################################ - - -def get_args() -> argparse.Namespace: - """Parses arguments and returns argparses's generated namespace. - - Returns: - argparse.Namespace: Argparses's generated namespace. - """ - parser = argparse.ArgumentParser() - parser.add_argument( - "action", - help="Action to perform.", - choices=("create", "delete", "reset") - ) - parser.add_argument( - "--debug", - help="Print debug logs", - action="store_true", - ) - return parser.parse_args() - - -def run_logged_subprocess(command: Union[str, list], timeout: int = 60, shell: bool = True) -> tuple: - """Executes a shell command using subprocess with logging. - - stderr is redirected to stdout and stdout is pipped to logger. - If the subprocess raises an exception, the exception is logged as critical. - - Example: - Running a successful command: - run_logged_subprocess(command=["git", "commit", "-m", "'Commit message.'"]) - Returns: (0, "") - - Running an unsuccessful shell command with a 20 second timeout: - run_logged_subprocess(command="cd test/", timeout=20, shell=True) - Returns: (1, "cd: test: No such file or directory\n") - - Args: - command (Union): The command to run. If shell=False, pass a list with the first item being the command and the subsequent items being arguments. If shell=True, pass a string as you would type it into a shell. - timeout (int): The number of seconds to wait for a program before killing it. Defaults to 60. - - Returns: - tuple: With the first value being the return code and second being the combined stdout+stderr - """ - logger.debug(f"Entering subprocess for '{command}'") - with subprocess.Popen(command,\ - stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=shell, universal_newlines=True)\ - as logged_shell_process: - - subprocess_log_prefix = f"(PID: {logged_shell_process.pid})" - - try: - # Convert combined stdout and stderr stream to list of strings - process_output_stream, _ = logged_shell_process.communicate(timeout=timeout) - process_output_lines = process_output_stream.split("\n") - # Remove last entry in process_output_lines because it is always empty - process_output_lines.pop(-1) - - for line in process_output_lines: - logger.debug(f"{subprocess_log_prefix}: {line}") - except Exception as exception: - logger.critical(str(exception)) - else: - if logged_shell_process.returncode != 0: - logger.debug(f"Something went wrong. '{command}' exited with return code {logged_shell_process.returncode}") - elif logged_shell_process.returncode == 0: - logger.debug(f"Subprocess for '{command}' completed successfuly") - finally: - logger.debug(f"Exiting subprocess for '{command}'") - return (logged_shell_process.returncode, process_output_stream) - - -def install_custom_pyldap(venv_interpreter: str) -> int: - """Builds python-ldap without SASL support from GitHub source. - - The version of python-ldap to be used is determined in the following order: - - Version from requirements.txt (can be beta) - - Latest non-beta version from GitHub - - Args: - venv_interpreter (str): The absolute path to the python interpreter executable for the virtual environment. - - Returns: - int: Exit code. - """ - logger.info("Starting pyldap build process") - - # Check for valid venv interpreter - logger.debug(f"Checking for valid venv interpreter at {venv_interpreter}") - if not os.path.exists(Path(venv_interpreter)): - logger.error(f"venv interpreter does not exist. Exiting") - exit(30) - logger.debug(f"venv interpreter is valid") - - # Get list of release tags for pyldap from GitHub API - logger.debug(f"Getting pyldap tags from GitHub") - pyldap_github_tags_url = "https://api.github.com/repos/python-ldap/python-ldap/tags" - try: - with urllib.request.urlopen(pyldap_github_tags_url) as request: - pyldap_tags = json.loads(request.read().decode("utf-8")) - except HTTPError as e: - logger.error(f"Failed to connect to {pyldap_github_tags_url}. Got response {e.code} {e.msg}. Exiting") - exit(35) - except URLError as e: - logger.error(f"Could not connect to {pyldap_github_tags_url}. {e.reason}. Exiting") - exit(35) - logger.debug(f"Got {len(pyldap_tags)} pyldap tags from GitHub") - - # Build dictionary of available pyldap releases and their source code archive urls - # Example: - # { "name": "python-ldap-3.3.1", "zipball_url": "http://github.com/download" } becomes - # { "3.3.1": "http://github.com/download" } - logger.debug("Building list of pyldap versions.") - pyldap_versions = {} - for tag in pyldap_tags: - tag_version = tag["name"].split("-")[-1] - zipball_url = f"https://github.com/python-ldap/python-ldap/archive/python-ldap-{tag_version}.zip" - pyldap_versions[tag_version] = zipball_url - logger.debug(f"Built list of {len(pyldap_versions)} pyldap versions.") - - # Check requirements file for pyldap version - pyldap_version_from_requirements = "" - logger.debug(f"Checking for pyldap version in requirements file {VENV_REQUIREMENTS_FILE}") - try: - with open(VENV_REQUIREMENTS_FILE) as requirements_file: - for line in requirements_file: - if line.startswith("pyldap"): - pyldap_version_from_requirements = line.split(" ")[-1].strip("\n") - logger.debug(f"Found pyldap version {pyldap_version_from_requirements} in requirements file") - break - except Exception as e: - logger.warning(f"Could not read requirements file {VENV_REQUIREMENTS_FILE}. {e.strerror}. Defaulting to latest non-beta release on GitHub.") - pass - - # Set pyldap version to value from requirements file if valid and available - if pyldap_version_from_requirements and pyldap_version_from_requirements in pyldap_versions.keys(): - logger.debug(f"pyldap version {pyldap_version_from_requirements} is available from GitHub") - pyldap_version = pyldap_version_from_requirements - logger.info(f"Set pyldap version to {pyldap_version} (from requirements file)") - # Set to latest non-beta version - else: - logger.warning(f"pyldap version not found in requirements file. Defaulting to latest non-beta release on GitHub") - for version in pyldap_versions.keys(): - is_beta_version = "b" in version - if (not is_beta_version): - pyldap_version = version - break - logger.info(f"Set pyldap version to {pyldap_version} (from GitHub releases)") - - # Download pyldap soure code - logger.info(f"Downloading pyldap {pyldap_version} source from {pyldap_versions[pyldap_version]}") - - tmp_dir = "/tmp" - download_file_name = f"python-ldap-{pyldap_version}.zip" - download_file_path = Path(tmp_dir, download_file_name) - - download_pyldap_returncode, _ = run_logged_subprocess(f"wget -q -O {download_file_path} {pyldap_versions[pyldap_version]}") - if download_pyldap_returncode == 0: - logger.debug(f"Downloaded pyldap {pyldap_version} source to {download_file_path}") - else: - logger.error(f"Failed to download pyldap source. See {log_file_path}. Exiting") - exit(40) - - # Extract source code - - # The archive from GitHub has a root folder formatted 'user-repo-version'. - # Because the pyldap source is user 'python-ldap' and repo 'python-ldap' - # the build folder MUST be the following format: - BUILD_DIR_NAME = f"python-ldap-python-ldap-{pyldap_version}" - BUILD_DIR_PATH = Path(tmp_dir, BUILD_DIR_NAME) - - logger.info(f"Extracing pyldap {pyldap_version} source to {BUILD_DIR_PATH}") - extract_source_returncode, _ = run_logged_subprocess(f"unzip -q -o -d {tmp_dir} {download_file_path}") - if extract_source_returncode == 0: - logger.debug(f"Extracted pyldap source to {BUILD_DIR_PATH}") - else: - logger.error(f"Failed to extract pyldap source. See {log_file_path}. Exiting") - exit(45) - - # Start the build process - logger.info(f"Building pyldap {pyldap_version}") - - # Read the pyldap build config file - pyldap_config_file_name = "setup.cfg" - pyldap_config_file_path = Path(BUILD_DIR_PATH, pyldap_config_file_name) - pyldap_version_from_needs_updated = True - - logger.debug(f"Reading pyldap build config file {pyldap_config_file_path}") - pyldap_config = configparser.ConfigParser() - try: - with open(pyldap_config_file_path) as pyldap_config_file: - pyldap_config.read_file(pyldap_config_file) - logger.debug("Read pyldap build config file") - except Exception as e: - logger.error(f"Failed to read pyldap build config file {pyldap_config_file_path}. {e}. Exiting") - exit(50) - - # Check for SASL requirement in pyldap build config file - logger.debug("Checking for '_ldap' section") - if not pyldap_config.has_section("_ldap"): - logger.warning("Failed to find '_ldap' section in pyldap build config file. pyldap may fail to build") - pyldap_version_from_needs_updated = False - pass - else: - logger.debug("'_ldap' section found") - - logger.debug("Checking for 'defines' option") - if not pyldap_config.has_option("_ldap", "defines"): - logging.warning("Failed to find 'defines' option in pyldap build config file. pyldap may fail to build") - pyldap_version_from_needs_updated = False - else: - logger.debug("'defines' option found") - - # Remove SASL requirement if present - if pyldap_version_from_needs_updated: - logger.debug("Removing SASL requirement") - - defines_options = pyldap_config['_ldap']['defines'].split(' ') - build_config_updated = False - try: - defines_options.remove('HAVE_SASL') - pyldap_config['_ldap']['defines'] = " ".join(defines_options) - logger.debug("SASL requirement removed") - build_config_updated = True - except ValueError as e: - logger.warning("SASL requirement not found in pyldap build config file. Build config file will not be modified") - pass - - # Write new build config - logger.debug("Writing new pyldap build config") - if build_config_updated: - try: - with open(pyldap_config_file_path, 'w') as pyldap_config_file: - pyldap_config.write(pyldap_config_file) - logger.debug("Wrote new pyldap build config") - except Exception as e: - logger.error(f"Failed to write pyldap build config file {pyldap_config_file_path}. {e}. Exiting") - exit(55) - - # Build pyldap - logger.debug(f"Building pyldap {pyldap_version}") - build_pyldap_returncode, _ = run_logged_subprocess(f"cd {BUILD_DIR_PATH} && {VENV_DIR}/bin/python3 setup.py build") - if build_pyldap_returncode == 0: - logger.debug(f"Built pyldap {pyldap_version}") - else: - logger.error(f"Failed to build pyldap {pyldap_version}. See {log_file_path}. Exiting") - exit(60) - - # Install pyldap - logger.debug(f"Installing pyldap {pyldap_version} in virtual environment {VENV_NAME} at {VENV_DIR}") - install_pyldap_returncode, _ = run_logged_subprocess(f"cd {BUILD_DIR_PATH} && {VENV_DIR}/bin/python3 setup.py install") - if install_pyldap_returncode == 0: - logger.debug(f"Installed pyldap {pyldap_version}") - else: - logger.error(f"Failed to install pyldap {pyldap_version}. See {log_file_path}. Exiting") - exit(65) - - logger.info(f"Finshed installing pyldap {pyldap_version}") - return 0 - - -def is_valid_requirement(line: str) -> bool: - """Determines if line is a valid requirement - - Args: - line (str): Line to check. - - Returns: - bool: True if line is valid requirement. False if line is not valid requirement. - """ - # Line is blank - if line == "\n" or line == "": - return False - - # Line is comment - if line.startswith("#"): - return False - - # Line is for pyldap - if line.startswith("pyldap"): - return False - - return True - - -def create_environment() -> int: - """Creates a virtual environment for webqueue2 - - Returns: - int: Exit code - """ - - logger.info(f"Creating virtual environment {VENV_NAME} at {VENV_DIR}") - - # Check for an existing virtual environment - logger.debug(f"Creating virtual environment directory at {VENV_DIR}") - try: - os.mkdir(VENV_DIR) - except FileExistsError: - logger.error(f"Failed to make directory {VENV_DIR}. It already exists. Exiting") - exit(5) - except PermissionError: - logger.error(f"Failed to make directory {VENV_DIR}. Incorrect permissions. Exiting") - exit(10) - logger.debug(f"Created virtual environment directory") - - - # Create virtual environment - logger.debug(f"Creating virtual environment {VENV_NAME} at {VENV_DIR}") - create_env_returncode, _ = run_logged_subprocess(f"cd {API_DIR} && python3 -m venv {VENV_NAME}", shell=True) - if create_env_returncode == 0: - logger.info(f"Successfully created virtual environment {VENV_NAME} created at {VENV_DIR}") - else: - logger.error(f"Failed to create virtual environment. See {log_file_path}. Exiting") - exit(15) - - # Check pip version - logger.debug("Checking pip version") - check_pip_returncode, check_pip_output = run_logged_subprocess(f"{VENV_DIR}/bin/pip --version") - if check_pip_returncode == 0: - pip_version_full = check_pip_output.split()[1] - logger.debug(f"pip version is {pip_version_full}") - else: - logger.warning("Failed to check pip version. Virtual environment dependencies may not install") - - # Update pip if needed - pip_version_major = pip_version_full.split(".")[0] - if int(pip_version_major) < 19: - logger.debug(f"pip verion is {pip_version_major}.x (pip >= {TARGET_PIP_VERSION}.x needed.) Upgrading pip") - update_pip_returncode, update_pip_output = run_logged_subprocess(f"{VENV_DIR}/bin/pip install --upgrade pip") - if update_pip_returncode == 0: - pip_install_message = update_pip_output.split("\n")[-2] - logger.debug(pip_install_message) - else: - logger.warning("Failed to update pip. Virtual environment dependencies may not install") - - # Install requirements - logger.info("Installing virtual environment requirements") - - # Install python-ldap - install_custom_pyldap(VENV_INTERPRETER) - - # Install the rest of the requirements from the requirements file - logger.debug(f"Checking for venv requirements file {VENV_REQUIREMENTS_FILE}") - if not os.path.exists(VENV_REQUIREMENTS_FILE): - logger.warning(f"Could not find requirements file {VENV_REQUIREMENTS_FILE}. No requirements will be installed") - return 0 - logger.debug("Found requirements file") - - # Get raw requirements from requirements file - logger.debug(f"Reading raw requirements from requirements file {VENV_REQUIREMENTS_FILE}") - try: - with open(VENV_REQUIREMENTS_FILE) as requirements_file: - raw_requirements = requirements_file.readlines() - except Exception as e: - logger.warning(f"Failed read requirements file {VENV_REQUIREMENTS_FILE}. {e}. Exiting") - exit(20) - logger.debug("Read raw requirements from requirements file") - - # Filter and clean requirements - logger.debug("Validating requirements") - valid_requirements = [] - for requirement in raw_requirements: - if is_valid_requirement(requirement): - valid_requirements.append(requirement.strip()) - logger.debug(f"Validated {len(valid_requirements)} requirements") - - # Generate requirements string - logger.debug("Generating requirements string") - requirements_string = "" - for requirement in valid_requirements: - requirements_string += f"'{requirement}' " - logger.debug(f"Generated requirements string {requirements_string}") - - # Install requirements - logger.debug("Installing requirements") - install_requirements_returncode, _ = run_logged_subprocess(f"{VENV_DIR}/bin/pip install {requirements_string}") - if install_requirements_returncode == 0: - logger.info("Successfully installed requirements") - else: - logger.error(f"Failed to install requirements. See {log_file_path}. Exiting") - exit(25) - - logger.info("Finished creating virtual environment") - return 0 - - -def delete_environment() -> int: - """Deletes a virtual environment for webqueue2 - - Returns: - int: Exit code - """ - logger.info(f"Deleting virtual environment {VENV_NAME} at {VENV_DIR}") - - delete_venv_returncode, _ = run_logged_subprocess(f"rm -rf {VENV_DIR}") - if delete_venv_returncode == 0: - logger.info(f"Successfully deleted virtual environment {VENV_NAME} at {VENV_DIR}") - return 0 - else: - logger.error(f"Failed to delete virtual environment {VENV_NAME} at {VENV_DIR}. Exiting") - exit(70) - - -def reset_environment() -> int: - """Resets a virtual environment for webqueue2 - - Returns: - int: Exit code - """ - logger.info(f"Resetting virtual environment {VENV_NAME} at {VENV_DIR}") - delete_returncode = delete_environment() - create_returncode = create_environment() - logger.info(f"Successfully reset virtual environment {VENV_NAME} at {VENV_DIR}. Exiting") - return 0 - - -if __name__ == "__main__": - logger.info(f"Starting venv-manager. Log file available at {log_file_path}") - - args = get_args() - action = args.action - - if args.debug: - stream_handler.setLevel(logging.DEBUG) - - if action == "create": - exit(create_environment()) - elif action == "delete": - exit(delete_environment()) - elif action == "reset": - exit(reset_environment()) - else: - logger.critical(f'Invalid argument {action}')