From 02915b3f5bfe0f93d92cdeb26778d8c3efa56d25 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Tue, 19 Dec 2017 14:26:39 +0100 Subject: [PATCH] slapdtest: Set and check command paths in __init__ Make SlapdObject.PATH_* instance attributes, rather than class ones. Set them in __init__. Move checking them from start() to __init__(), so the check becomes simply error handling. Put a straight-up copy of Python's shutil.which() in ldap.compat -- it is a temporary backport, not a modified fork. --- Lib/ldap/compat.py | 70 +++++++++++++++++++++++ Lib/slapdtest/_slapdtest.py | 109 ++++++++++++++---------------------- 2 files changed, 113 insertions(+), 66 deletions(-) diff --git a/Lib/ldap/compat.py b/Lib/ldap/compat.py index de0e110..cbfeef5 100644 --- a/Lib/ldap/compat.py +++ b/Lib/ldap/compat.py @@ -1,6 +1,7 @@ """Compatibility wrappers for Py2/Py3.""" import sys +import os if sys.version_info[0] < 3: from UserDict import UserDict, IterableUserDict @@ -41,3 +42,72 @@ def reraise(exc_type, exc_value, exc_traceback): """ # In Python 3, all exception info is contained in one object. raise exc_value + +try: + from shutil import which +except ImportError: + # shutil.which() from Python 3.6 + # "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, + # 2011, 2012, 2013, 2014, 2015, 2016, 2017 Python Software Foundation; + # All Rights Reserved" + def which(cmd, mode=os.F_OK | os.X_OK, path=None): + """Given a command, mode, and a PATH string, return the path which + conforms to the given mode on the PATH, or None if there is no such + file. + + `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result + of os.environ.get("PATH"), or can be overridden with a custom search + path. + + """ + # Check that a given file can be accessed with the correct mode. + # Additionally check that `file` is not a directory, as on Windows + # directories pass the os.access check. + def _access_check(fn, mode): + return (os.path.exists(fn) and os.access(fn, mode) + and not os.path.isdir(fn)) + + # If we're given a path with a directory part, look it up directly rather + # than referring to PATH directories. This includes checking relative to the + # current directory, e.g. ./script + if os.path.dirname(cmd): + if _access_check(cmd, mode): + return cmd + return None + + if path is None: + path = os.environ.get("PATH", os.defpath) + if not path: + return None + path = path.split(os.pathsep) + + if sys.platform == "win32": + # The current directory takes precedence on Windows. + if not os.curdir in path: + path.insert(0, os.curdir) + + # PATHEXT is necessary to check on Windows. + pathext = os.environ.get("PATHEXT", "").split(os.pathsep) + # See if the given file matches any of the expected path extensions. + # This will allow us to short circuit when given "python.exe". + # If it does match, only test that one, otherwise we have to try + # others. + if any(cmd.lower().endswith(ext.lower()) for ext in pathext): + files = [cmd] + else: + files = [cmd + ext for ext in pathext] + else: + # On other platforms you don't have things like PATHEXT to tell you + # what file suffixes are executable, so just pass on cmd as-is. + files = [cmd] + + seen = set() + for dir in path: + normdir = os.path.normcase(dir) + if not normdir in seen: + seen.add(normdir) + for thefile in files: + name = os.path.join(dir, thefile) + if _access_check(name, mode): + return name + return None diff --git a/Lib/slapdtest/_slapdtest.py b/Lib/slapdtest/_slapdtest.py index ebc5441..484eb54 100644 --- a/Lib/slapdtest/_slapdtest.py +++ b/Lib/slapdtest/_slapdtest.py @@ -21,7 +21,7 @@ os.environ['LDAPNOINIT'] = '1' import ldap -from ldap.compat import quote_plus +from ldap.compat import quote_plus, which HERE = os.path.abspath(os.path.dirname(__file__)) @@ -109,46 +109,14 @@ def requires_ldapi(): else: return identity - -def _which(cmd): - """Specialized which command based on shutil.which() from Python 3.6. - - * simplified - * always adds /sbin directories to path - """ - - def _access_check(fn): - return (os.path.exists(fn) and os.access(fn, os.F_OK | os.X_OK) - and not os.path.isdir(fn)) - - # Path with directory part skips PATH lookup. - if os.path.dirname(cmd): - if _access_check(cmd): - return cmd - return None - - path = os.environ.get("PATH", os.defpath).split(os.pathsep) - - if sys.platform == 'win32': - if os.curdir not in path: - path.insert(0, os.curdir) - # include path extension (.exe) - pathext = os.environ.get("PATHEXT", "").split(os.pathsep) - files = [cmd + ext for ext in pathext] - else: - # always include sbin for slapd binary - for sbin in ['/usr/local/sbin', '/sbin', '/usr/sbin']: - if sbin not in path: - path.append(sbin) - files = [cmd] - - for directory in path: - for name in files: - name = os.path.join(directory, name) - if _access_check(name): - return name - return None - +def _add_sbin(path): + """Add /sbin and related directories to a command search path""" + directories = path.split(os.pathsep) + if sys.platform != 'win32': + for sbin in '/usr/local/sbin', '/sbin', '/usr/sbin': + if sbin not in directories: + directories.append(sbin) + return os.pathsep.join(directories) def combined_logger( log_name, @@ -221,14 +189,9 @@ class SlapdObject(object): SCHEMADIR = "/etc/ldap/schema" else: SCHEMADIR = None - # _check_requirements turns paths into absolute paths - PATH_LDAPADD = 'ldapadd' - PATH_LDAPDELETE = 'ldapdelete' - PATH_LDAPMODIFY = 'ldapmodify' - PATH_LDAPWHOAMI = 'ldapwhoami' - # The following two binaries are usually in /usr/sbin. - PATH_SLAPD = os.environ.get('SLAPD', 'slapd') - PATH_SLAPTEST = 'slaptest' + + BIN_PATH = os.environ.get('BIN', os.environ.get('PATH', os.defpath)) + SBIN_PATH = os.environ.get('SBIN', _add_sbin(BIN_PATH)) # time in secs to wait before trying to access slapd via LDAP (again) _start_sleep = 1.5 @@ -256,6 +219,12 @@ def __init__(self): self.default_ldap_uri = self.ldap_uri # Use simple bind via LDAP uri self.cli_sasl_external = False + + self._find_commands() + + if self.SCHEMADIR is None: + raise ValueError('SCHEMADIR is None, ldap schemas are missing.') + # TLS certs self.cafile = os.path.join(HERE, 'certs/ca.pem') self.servercert = os.path.join(HERE, 'certs/server.pem') @@ -263,22 +232,31 @@ def __init__(self): self.clientcert = os.path.join(HERE, 'certs/client.pem') self.clientkey = os.path.join(HERE, 'certs/client.key') - def _check_requirements(self): - names = [ - "PATH_LDAPADD", "PATH_LDAPMODIFY", "PATH_LDAPDELETE", - "PATH_LDAPWHOAMI", "PATH_SLAPD", "PATH_SLAPTEST", - ] - for name in names: - value = getattr(self, name) - binary = _which(value) - if binary is None: - raise ValueError( - "Command '{}' not found in PATH".format(value) - ) - else: - setattr(self, name, binary) - if self.SCHEMADIR is None: - raise ValueError('SCHEMADIR is None, ldap schemas are missing.') + def _find_commands(self): + self.PATH_LDAPADD = self._find_command('ldapadd') + self.PATH_LDAPDELETE = self._find_command('ldapdelete') + self.PATH_LDAPMODIFY = self._find_command('ldapmodify') + self.PATH_LDAPWHOAMI = self._find_command('ldapwhoami') + + self.PATH_SLAPD = os.environ.get('SLAPD', None) + if not self.PATH_SLAPD: + self.PATH_SLAPD = self._find_command('slapd', in_sbin=True) + self.PATH_SLAPTEST = self._find_command('slaptest', in_sbin=True) + + def _find_command(self, cmd, in_sbin=False): + if in_sbin: + path = self.SBIN_PATH + var_name = 'SBIN' + else: + path = self.BIN_PATH + var_name = 'BIN' + command = which(cmd, path=path) + if command is None: + raise ValueError( + "Command '{}' not found. Set the {} environment variable to " + "override slapdtest's search path.".format(value, var_name) + ) + return command def setup_rundir(self): """ @@ -439,7 +417,6 @@ def start(self): """ if self._proc is None: - self._check_requirements() # prepare directory structure atexit.register(self.stop) self._cleanup_rundir()