Skip to content

Commit

Permalink
Merge pull request #141 – Make testing on non-Linux platforms easier
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Viktorin authored and GitHub committed Dec 20, 2017
2 parents 9fb9338 + 02915b3 commit 8f5184c
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 42 deletions.
70 changes: 70 additions & 0 deletions Lib/ldap/compat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Compatibility wrappers for Py2/Py3."""

import sys
import os

if sys.version_info[0] < 3:
from UserDict import UserDict, IterableUserDict
Expand Down Expand Up @@ -41,3 +42,72 @@ def reraise(exc_type, exc_value, exc_traceback):
"""
# In Python 3, all exception info is contained in one object.
raise exc_value

try:
from shutil import which
except ImportError:
# shutil.which() from Python 3.6
# "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
# 2011, 2012, 2013, 2014, 2015, 2016, 2017 Python Software Foundation;
# All Rights Reserved"
def which(cmd, mode=os.F_OK | os.X_OK, path=None):
"""Given a command, mode, and a PATH string, return the path which
conforms to the given mode on the PATH, or None if there is no such
file.
`mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
of os.environ.get("PATH"), or can be overridden with a custom search
path.
"""
# Check that a given file can be accessed with the correct mode.
# Additionally check that `file` is not a directory, as on Windows
# directories pass the os.access check.
def _access_check(fn, mode):
return (os.path.exists(fn) and os.access(fn, mode)
and not os.path.isdir(fn))

# If we're given a path with a directory part, look it up directly rather
# than referring to PATH directories. This includes checking relative to the
# current directory, e.g. ./script
if os.path.dirname(cmd):
if _access_check(cmd, mode):
return cmd
return None

if path is None:
path = os.environ.get("PATH", os.defpath)
if not path:
return None
path = path.split(os.pathsep)

if sys.platform == "win32":
# The current directory takes precedence on Windows.
if not os.curdir in path:
path.insert(0, os.curdir)

# PATHEXT is necessary to check on Windows.
pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
# See if the given file matches any of the expected path extensions.
# This will allow us to short circuit when given "python.exe".
# If it does match, only test that one, otherwise we have to try
# others.
if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
files = [cmd]
else:
files = [cmd + ext for ext in pathext]
else:
# On other platforms you don't have things like PATHEXT to tell you
# what file suffixes are executable, so just pass on cmd as-is.
files = [cmd]

seen = set()
for dir in path:
normdir = os.path.normcase(dir)
if not normdir in seen:
seen.add(normdir)
for thefile in files:
name = os.path.join(dir, thefile)
if _access_check(name, mode):
return name
return None
3 changes: 2 additions & 1 deletion Lib/slapdtest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
__version__ = '3.0.0b2'

from slapdtest._slapdtest import SlapdObject, SlapdTestCase, SysLogHandler
from slapdtest._slapdtest import skip_unless_ci, requires_sasl, requires_tls
from slapdtest._slapdtest import requires_ldapi, requires_sasl, requires_tls
from slapdtest._slapdtest import skip_unless_ci
111 changes: 82 additions & 29 deletions Lib/slapdtest/_slapdtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import os
import socket
import sys
import time
import subprocess
import logging
Expand All @@ -20,7 +21,7 @@
os.environ['LDAPNOINIT'] = '1'

import ldap
from ldap.compat import quote_plus
from ldap.compat import quote_plus, which

HERE = os.path.abspath(os.path.dirname(__file__))

Expand Down Expand Up @@ -56,6 +57,12 @@

LOCALHOST = '127.0.0.1'

CI_DISABLED = set(os.environ.get('CI_DISABLED', '').split(':'))
if 'LDAPI' in CI_DISABLED:
HAVE_LDAPI = False
else:
HAVE_LDAPI = hasattr(socket, 'AF_UNIX')


def identity(test_item):
"""Identity decorator
Expand All @@ -69,7 +76,7 @@ def skip_unless_ci(reason, feature=None):
"""
if not os.environ.get('CI', False):
return unittest.skip(reason)
elif feature in os.environ.get('CI_DISABLED', '').split(':'):
elif feature in CI_DISABLED:
return unittest.skip(reason)
else:
# Don't skip on Travis
Expand All @@ -95,6 +102,22 @@ def requires_sasl():
return identity


def requires_ldapi():
if not HAVE_LDAPI:
return skip_unless_ci(
"test needs ldapi support (AF_UNIX)", feature='LDAPI')
else:
return identity

def _add_sbin(path):
"""Add /sbin and related directories to a command search path"""
directories = path.split(os.pathsep)
if sys.platform != 'win32':
for sbin in '/usr/local/sbin', '/sbin', '/usr/sbin':
if sbin not in directories:
directories.append(sbin)
return os.pathsep.join(directories)

def combined_logger(
log_name,
log_level=logging.WARN,
Expand Down Expand Up @@ -149,8 +172,6 @@ class SlapdObject(object):
root_dn = 'cn=%s,%s' % (root_cn, suffix)
root_pw = 'password'
slapd_loglevel = 'stats stats2'
# use SASL/EXTERNAL via LDAPI when invoking OpenLDAP CLI tools
cli_sasl_external = True
local_host = '127.0.0.1'
testrunsubdirs = (
'schema',
Expand All @@ -160,8 +181,6 @@ class SlapdObject(object):
)

TMPDIR = os.environ.get('TMP', os.getcwd())
SBINDIR = os.environ.get('SBIN', '/usr/sbin')
BINDIR = os.environ.get('BIN', '/usr/bin')
if 'SCHEMA' in os.environ:
SCHEMADIR = os.environ['SCHEMA']
elif os.path.isdir("/etc/openldap/schema"):
Expand All @@ -170,12 +189,9 @@ class SlapdObject(object):
SCHEMADIR = "/etc/ldap/schema"
else:
SCHEMADIR = None
PATH_LDAPADD = os.path.join(BINDIR, 'ldapadd')
PATH_LDAPDELETE = os.path.join(BINDIR, 'ldapdelete')
PATH_LDAPMODIFY = os.path.join(BINDIR, 'ldapmodify')
PATH_LDAPWHOAMI = os.path.join(BINDIR, 'ldapwhoami')
PATH_SLAPD = os.environ.get('SLAPD', os.path.join(SBINDIR, 'slapd'))
PATH_SLAPTEST = os.path.join(SBINDIR, 'slaptest')

BIN_PATH = os.environ.get('BIN', os.environ.get('PATH', os.defpath))
SBIN_PATH = os.environ.get('SBIN', _add_sbin(BIN_PATH))

# time in secs to wait before trying to access slapd via LDAP (again)
_start_sleep = 1.5
Expand All @@ -192,25 +208,55 @@ def __init__(self):
self._slapd_conf = os.path.join(self.testrundir, 'slapd.conf')
self._db_directory = os.path.join(self.testrundir, "openldap-data")
self.ldap_uri = "ldap://%s:%d/" % (LOCALHOST, self._port)
ldapi_path = os.path.join(self.testrundir, 'ldapi')
self.ldapi_uri = "ldapi://%s" % quote_plus(ldapi_path)
if HAVE_LDAPI:
ldapi_path = os.path.join(self.testrundir, 'ldapi')
self.ldapi_uri = "ldapi://%s" % quote_plus(ldapi_path)
self.default_ldap_uri = self.ldapi_uri
# use SASL/EXTERNAL via LDAPI when invoking OpenLDAP CLI tools
self.cli_sasl_external = True
else:
self.ldapi_uri = None
self.default_ldap_uri = self.ldap_uri
# Use simple bind via LDAP uri
self.cli_sasl_external = False

self._find_commands()

if self.SCHEMADIR is None:
raise ValueError('SCHEMADIR is None, ldap schemas are missing.')

# TLS certs
self.cafile = os.path.join(HERE, 'certs/ca.pem')
self.servercert = os.path.join(HERE, 'certs/server.pem')
self.serverkey = os.path.join(HERE, 'certs/server.key')
self.clientcert = os.path.join(HERE, 'certs/client.pem')
self.clientkey = os.path.join(HERE, 'certs/client.key')

def _check_requirements(self):
binaries = [
self.PATH_LDAPADD, self.PATH_LDAPMODIFY, self.PATH_LDAPWHOAMI,
self.PATH_SLAPD, self.PATH_SLAPTEST
]
for binary in binaries:
if not os.path.isfile(binary):
raise ValueError('Binary {} is missing.'.format(binary))
if self.SCHEMADIR is None:
raise ValueError('SCHEMADIR is None, ldap schemas are missing.')
def _find_commands(self):
self.PATH_LDAPADD = self._find_command('ldapadd')
self.PATH_LDAPDELETE = self._find_command('ldapdelete')
self.PATH_LDAPMODIFY = self._find_command('ldapmodify')
self.PATH_LDAPWHOAMI = self._find_command('ldapwhoami')

self.PATH_SLAPD = os.environ.get('SLAPD', None)
if not self.PATH_SLAPD:
self.PATH_SLAPD = self._find_command('slapd', in_sbin=True)
self.PATH_SLAPTEST = self._find_command('slaptest', in_sbin=True)

def _find_command(self, cmd, in_sbin=False):
if in_sbin:
path = self.SBIN_PATH
var_name = 'SBIN'
else:
path = self.BIN_PATH
var_name = 'BIN'
command = which(cmd, path=path)
if command is None:
raise ValueError(
"Command '{}' not found. Set the {} environment variable to "
"override slapdtest's search path.".format(value, var_name)
)
return command

def setup_rundir(self):
"""
Expand Down Expand Up @@ -331,11 +377,14 @@ def _start_slapd(self):
"""
Spawns/forks the slapd process
"""
urls = [self.ldap_uri]
if self.ldapi_uri:
urls.append(self.ldapi_uri)
slapd_args = [
self.PATH_SLAPD,
'-f', self._slapd_conf,
'-F', self.testrundir,
'-h', '%s' % ' '.join((self.ldap_uri, self.ldapi_uri)),
'-h', ' '.join(urls),
]
if self._log.isEnabledFor(logging.DEBUG):
slapd_args.extend(['-d', '-1'])
Expand All @@ -346,26 +395,28 @@ def _start_slapd(self):
# Waits until the LDAP server socket is open, or slapd crashed
# no cover to avoid spurious coverage changes, see
# https://github.com/python-ldap/python-ldap/issues/127
while 1: # pragma: no cover
for _ in range(10): # pragma: no cover
if self._proc.poll() is not None:
self._stopped()
raise RuntimeError("slapd exited before opening port")
time.sleep(self._start_sleep)
try:
self._log.debug("slapd connection check to %s", self.ldapi_uri)
self._log.debug(
"slapd connection check to %s", self.default_ldap_uri
)
self.ldapwhoami()
except RuntimeError:
pass
else:
return
raise RuntimeError("slapd did not start properly")

def start(self):
"""
Starts the slapd server process running, and waits for it to come up.
"""

if self._proc is None:
self._check_requirements()
# prepare directory structure
atexit.register(self.stop)
self._cleanup_rundir()
Expand Down Expand Up @@ -435,9 +486,11 @@ def _cli_auth_args(self):
# no cover to avoid spurious coverage changes
def _cli_popen(self, ldapcommand, extra_args=None, ldap_uri=None,
stdin_data=None): # pragma: no cover
if ldap_uri is None:
ldap_uri = self.default_ldap_uri
args = [
ldapcommand,
'-H', ldap_uri or self.ldapi_uri,
'-H', ldap_uri,
] + self._cli_auth_args() + (extra_args or [])
self._log.debug('Run command: %r', ' '.join(args))
proc = subprocess.Popen(
Expand Down
6 changes: 3 additions & 3 deletions Tests/t_ldap_sasl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
See https://www.python-ldap.org/ for details.
"""
import os
import pwd
import socket
import unittest

Expand All @@ -14,7 +13,8 @@

from ldap.ldapobject import SimpleLDAPObject
import ldap.sasl
from slapdtest import SlapdTestCase, requires_sasl, requires_tls
from slapdtest import SlapdTestCase
from slapdtest import requires_ldapi, requires_sasl, requires_tls


LDIF = """
Expand Down Expand Up @@ -60,7 +60,7 @@ def setUpClass(cls):
)
cls.server.ldapadd(ldif)

@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), "needs Unix socket")
@requires_ldapi()
def test_external_ldapi(self):
# EXTERNAL authentication with LDAPI (AF_UNIX)
ldap_conn = self.ldap_object_class(self.server.ldapi_uri)
Expand Down
3 changes: 2 additions & 1 deletion Tests/t_ldap_schema_subentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ldap.ldapobject import SimpleLDAPObject
import ldap.schema
from ldap.schema.models import ObjectClass
from slapdtest import SlapdTestCase
from slapdtest import SlapdTestCase, requires_ldapi

HERE = os.path.abspath(os.path.dirname(__file__))

Expand Down Expand Up @@ -88,6 +88,7 @@ def test_urlfetch_ldap(self):
dn, schema = ldap.schema.urlfetch(self.server.ldap_uri)
self.assertSlapdSchema(dn, schema)

@requires_ldapi()
def test_urlfetch_ldapi(self):
dn, schema = ldap.schema.urlfetch(self.server.ldapi_uri)
self.assertSlapdSchema(dn, schema)
Expand Down
Loading

0 comments on commit 8f5184c

Please sign in to comment.