Skip to content

Commit

Permalink
slapdtest: Set and check command paths in __init__
Browse files Browse the repository at this point in the history
Make SlapdObject.PATH_* instance attributes, rather than class
ones. Set them in __init__.
Move checking them from start() to __init__(), so the check becomes
simply error handling.

Put a straight-up copy of Python's shutil.which() in
ldap.compat -- it is a temporary backport, not a modified fork.
  • Loading branch information
Petr Viktorin committed Dec 19, 2017
1 parent f01b66e commit 02915b3
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 66 deletions.
70 changes: 70 additions & 0 deletions Lib/ldap/compat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Compatibility wrappers for Py2/Py3."""

import sys
import os

if sys.version_info[0] < 3:
from UserDict import UserDict, IterableUserDict
Expand Down Expand Up @@ -41,3 +42,72 @@ def reraise(exc_type, exc_value, exc_traceback):
"""
# In Python 3, all exception info is contained in one object.
raise exc_value

try:
from shutil import which
except ImportError:
# shutil.which() from Python 3.6
# "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
# 2011, 2012, 2013, 2014, 2015, 2016, 2017 Python Software Foundation;
# All Rights Reserved"
def which(cmd, mode=os.F_OK | os.X_OK, path=None):
"""Given a command, mode, and a PATH string, return the path which
conforms to the given mode on the PATH, or None if there is no such
file.
`mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
of os.environ.get("PATH"), or can be overridden with a custom search
path.
"""
# Check that a given file can be accessed with the correct mode.
# Additionally check that `file` is not a directory, as on Windows
# directories pass the os.access check.
def _access_check(fn, mode):
return (os.path.exists(fn) and os.access(fn, mode)
and not os.path.isdir(fn))

# If we're given a path with a directory part, look it up directly rather
# than referring to PATH directories. This includes checking relative to the
# current directory, e.g. ./script
if os.path.dirname(cmd):
if _access_check(cmd, mode):
return cmd
return None

if path is None:
path = os.environ.get("PATH", os.defpath)
if not path:
return None
path = path.split(os.pathsep)

if sys.platform == "win32":
# The current directory takes precedence on Windows.
if not os.curdir in path:
path.insert(0, os.curdir)

# PATHEXT is necessary to check on Windows.
pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
# See if the given file matches any of the expected path extensions.
# This will allow us to short circuit when given "python.exe".
# If it does match, only test that one, otherwise we have to try
# others.
if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
files = [cmd]
else:
files = [cmd + ext for ext in pathext]
else:
# On other platforms you don't have things like PATHEXT to tell you
# what file suffixes are executable, so just pass on cmd as-is.
files = [cmd]

seen = set()
for dir in path:
normdir = os.path.normcase(dir)
if not normdir in seen:
seen.add(normdir)
for thefile in files:
name = os.path.join(dir, thefile)
if _access_check(name, mode):
return name
return None
109 changes: 43 additions & 66 deletions Lib/slapdtest/_slapdtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
os.environ['LDAPNOINIT'] = '1'

import ldap
from ldap.compat import quote_plus
from ldap.compat import quote_plus, which

HERE = os.path.abspath(os.path.dirname(__file__))

Expand Down Expand Up @@ -109,46 +109,14 @@ def requires_ldapi():
else:
return identity


def _which(cmd):
"""Specialized which command based on shutil.which() from Python 3.6.
* simplified
* always adds /sbin directories to path
"""

def _access_check(fn):
return (os.path.exists(fn) and os.access(fn, os.F_OK | os.X_OK)
and not os.path.isdir(fn))

# Path with directory part skips PATH lookup.
if os.path.dirname(cmd):
if _access_check(cmd):
return cmd
return None

path = os.environ.get("PATH", os.defpath).split(os.pathsep)

if sys.platform == 'win32':
if os.curdir not in path:
path.insert(0, os.curdir)
# include path extension (.exe)
pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
files = [cmd + ext for ext in pathext]
else:
# always include sbin for slapd binary
for sbin in ['/usr/local/sbin', '/sbin', '/usr/sbin']:
if sbin not in path:
path.append(sbin)
files = [cmd]

for directory in path:
for name in files:
name = os.path.join(directory, name)
if _access_check(name):
return name
return None

def _add_sbin(path):
"""Add /sbin and related directories to a command search path"""
directories = path.split(os.pathsep)
if sys.platform != 'win32':
for sbin in '/usr/local/sbin', '/sbin', '/usr/sbin':
if sbin not in directories:
directories.append(sbin)
return os.pathsep.join(directories)

def combined_logger(
log_name,
Expand Down Expand Up @@ -221,14 +189,9 @@ class SlapdObject(object):
SCHEMADIR = "/etc/ldap/schema"
else:
SCHEMADIR = None
# _check_requirements turns paths into absolute paths
PATH_LDAPADD = 'ldapadd'
PATH_LDAPDELETE = 'ldapdelete'
PATH_LDAPMODIFY = 'ldapmodify'
PATH_LDAPWHOAMI = 'ldapwhoami'
# The following two binaries are usually in /usr/sbin.
PATH_SLAPD = os.environ.get('SLAPD', 'slapd')
PATH_SLAPTEST = 'slaptest'

BIN_PATH = os.environ.get('BIN', os.environ.get('PATH', os.defpath))
SBIN_PATH = os.environ.get('SBIN', _add_sbin(BIN_PATH))

# time in secs to wait before trying to access slapd via LDAP (again)
_start_sleep = 1.5
Expand Down Expand Up @@ -256,29 +219,44 @@ def __init__(self):
self.default_ldap_uri = self.ldap_uri
# Use simple bind via LDAP uri
self.cli_sasl_external = False

self._find_commands()

if self.SCHEMADIR is None:
raise ValueError('SCHEMADIR is None, ldap schemas are missing.')

# TLS certs
self.cafile = os.path.join(HERE, 'certs/ca.pem')
self.servercert = os.path.join(HERE, 'certs/server.pem')
self.serverkey = os.path.join(HERE, 'certs/server.key')
self.clientcert = os.path.join(HERE, 'certs/client.pem')
self.clientkey = os.path.join(HERE, 'certs/client.key')

def _check_requirements(self):
names = [
"PATH_LDAPADD", "PATH_LDAPMODIFY", "PATH_LDAPDELETE",
"PATH_LDAPWHOAMI", "PATH_SLAPD", "PATH_SLAPTEST",
]
for name in names:
value = getattr(self, name)
binary = _which(value)
if binary is None:
raise ValueError(
"Command '{}' not found in PATH".format(value)
)
else:
setattr(self, name, binary)
if self.SCHEMADIR is None:
raise ValueError('SCHEMADIR is None, ldap schemas are missing.')
def _find_commands(self):
self.PATH_LDAPADD = self._find_command('ldapadd')
self.PATH_LDAPDELETE = self._find_command('ldapdelete')
self.PATH_LDAPMODIFY = self._find_command('ldapmodify')
self.PATH_LDAPWHOAMI = self._find_command('ldapwhoami')

self.PATH_SLAPD = os.environ.get('SLAPD', None)
if not self.PATH_SLAPD:
self.PATH_SLAPD = self._find_command('slapd', in_sbin=True)
self.PATH_SLAPTEST = self._find_command('slaptest', in_sbin=True)

def _find_command(self, cmd, in_sbin=False):
if in_sbin:
path = self.SBIN_PATH
var_name = 'SBIN'
else:
path = self.BIN_PATH
var_name = 'BIN'
command = which(cmd, path=path)
if command is None:
raise ValueError(
"Command '{}' not found. Set the {} environment variable to "
"override slapdtest's search path.".format(value, var_name)
)
return command

def setup_rundir(self):
"""
Expand Down Expand Up @@ -439,7 +417,6 @@ def start(self):
"""

if self._proc is None:
self._check_requirements()
# prepare directory structure
atexit.register(self.stop)
self._cleanup_rundir()
Expand Down

0 comments on commit 02915b3

Please sign in to comment.