diff --git a/CHANGES b/CHANGES index 7261105..cd87e0d 100644 --- a/CHANGES +++ b/CHANGES @@ -8,9 +8,9 @@ Lib/ using errno Tests/ -* code-cleaning in slapd.py and t_cext.py -* set env var LDAPNOINIT=1 in t_cext.py to avoid interference - with locally installed .ldaprc or ldap.conf +* re-factored slapd.py, t_cext.py and t_search.py +* set env var LDAPNOINIT=1 in t_cext.py and t_search.py to avoid + interference with locally installed .ldaprc or ldap.conf ---------------------------------------------------------------- Released 2.4.35 2017-04-25 @@ -1388,4 +1388,4 @@ Released 2.0.0pre02 2002-02-01 ---------------------------------------------------------------- Released 1.10alpha3 2000-09-19 -$Id: CHANGES,v 1.425 2017/04/26 08:58:06 stroeder Exp $ +$Id: CHANGES,v 1.426 2017/04/26 10:46:31 stroeder Exp $ diff --git a/Tests/slapd.py b/Tests/slapd.py index b0b0696..0ec998e 100644 --- a/Tests/slapd.py +++ b/Tests/slapd.py @@ -12,7 +12,31 @@ import logging import base64 -_log = logging.getLogger("slapd") +# determine log level +try: + _LOG_LEVEL = os.environ['LOGLEVEL'] + try: + _LOG_LEVEL = int(_LOG_LEVEL) + except ValueError: + pass +except KeyError: + _LOG_LEVEL = logging.WARN + +# initialize the module logger +_LOGGER = logging.getLogger("python-ldap-slapd") +_LOGGER.setLevel(_LOG_LEVEL) + +# a template string for generating simple slapd.conf file +SLAPD_CONF_TEMPLATE = """ +include %(path_schema_core)s +loglevel %(loglevel)s +allow bind_v2 +database %(database)s +directory %(directory)s +suffix %(suffix)s +rootdn %(rootdn)s +rootpw %(rootpw)s +""" def quote(s): '''Quotes the '"' and '\' characters in a string and surrounds with "..."''' @@ -21,17 +45,16 @@ def quote(s): def mkdirs(path): """Creates the directory path unless it already exists""" if not os.access(os.path.join(path, os.path.curdir), os.F_OK): - _log.debug("creating temp directory %s", path) + _LOGGER.debug("creating temp directory %s", path) os.mkdir(path) - return path def delete_directory_content(path): for dirpath, dirnames, filenames in os.walk(path, topdown=False): for n in filenames: - _log.info("remove %s", os.path.join(dirpath, n)) + _LOGGER.info("remove %s", os.path.join(dirpath, n)) os.remove(os.path.join(dirpath, n)) for n in dirnames: - _log.info("rmdir %s", os.path.join(dirpath, n)) + _LOGGER.info("rmdir %s", os.path.join(dirpath, n)) os.rmdir(os.path.join(dirpath, n)) LOCALHOST = '127.0.0.1' @@ -41,10 +64,10 @@ def find_available_tcp_port(host=LOCALHOST): s.bind((host, 0)) port = s.getsockname()[1] s.close() - _log.info("Found available port %d", port) + _LOGGER.info("Found available port %d", port) return port -class Slapd: +class SlapdObject: """ Controller class for a slapd instance, OpenLDAP's server. @@ -56,44 +79,33 @@ class Slapd: server is shut down. """ - suffix = "dc=slapd-test,dc=python-ldap,dc=org" - root_cn = "Manager" - root_dn = "cn=%s,%s" % (root_cn, suffix) - root_password = "password" + database = 'ldif' + suffix = 'dc=slapd-test,dc=python-ldap,dc=org' + root_cn = 'Manager' + root_dn = 'cn=%s,%s' % (root_cn, suffix) + root_pw = 'password' + slapd_loglevel = 'stats stats2' - _log = logging.getLogger("Slapd") + _log = _LOGGER # Use /var/tmp to placate apparmour on Ubuntu: - PATH_TMPDIR = "/var/tmp/python-ldap-test" - PATH_SBINDIR = "/usr/sbin" - PATH_BINDIR = "/usr/bin" - PATH_SCHEMA_CORE = "/etc/openldap/schema/core.schema" - PATH_LDAPADD = os.path.join(PATH_BINDIR, "ldapadd") - PATH_LDAPSEARCH = os.path.join(PATH_BINDIR, "ldapsearch") - PATH_SLAPD = os.path.join(PATH_SBINDIR, "slapd") - PATH_SLAPTEST = os.path.join(PATH_SBINDIR, "slaptest") - - def check_paths(cls): - """ - Checks that the configured executable paths look valid. - If they don't, then logs warning messages (not errors). - """ - for name, path in ( - ("slapd", cls.PATH_SLAPD), - ("ldapadd", cls.PATH_LDAPADD), - ("ldapsearch", cls.PATH_LDAPSEARCH), - ): - cls._log.debug("checking %s executable at %s", name, path) - if not os.access(path, os.X_OK): - cls._log.warn("cannot find %s executable at %s", name, path) - check_paths = classmethod(check_paths) + PATH_TMPDIR = '/var/tmp' + PATH_SBINDIR = '/usr/sbin' + PATH_BINDIR = '/usr/bin' + PATH_SCHEMA_CORE = '/etc/openldap/schema/core.schema' + PATH_LDAPADD = os.path.join(PATH_BINDIR, 'ldapadd') + PATH_LDAPSEARCH = os.path.join(PATH_BINDIR, 'ldapsearch') + PATH_SLAPD = os.path.join(PATH_SBINDIR, 'slapd') + PATH_SLAPTEST = os.path.join(PATH_SBINDIR, 'slaptest') def __init__(self): - self._config = [] self._proc = None self._port = 0 - self._tmpdir = self.PATH_TMPDIR - self._slapd_debug_level = 0 + self._tmpdir = os.path.join(os.environ.get('TMP', self.PATH_TMPDIR), 'python-ldap-test') + self._slapd_conf = os.path.join(self._tmpdir, "slapd.conf") + self._db_directory = os.path.join(self._tmpdir, "openldap-data") + self._config = self._generate_slapd_conf() + self._log.setLevel(_LOG_LEVEL) # getters def get_url(self): @@ -104,43 +116,35 @@ def get_address(self): self._port = find_available_tcp_port(LOCALHOST) return (LOCALHOST, self._port) - def get_tmpdir(self): - return os.environ.get('TMP', self._tmpdir) - def __del__(self): self.stop() - def configure(self, cfg): + def _generate_slapd_conf(self): """ - Appends slapd.conf configuration lines to cfg. - Also re-initializes any backing storage. - Feel free to subclass and override this method. + returns a string with a complete slapd.conf """ - - # Global - cfg.append("include " + quote(self.PATH_SCHEMA_CORE)) - cfg.append("allow bind_v2") - - # Database - ldif_dir = mkdirs(os.path.join(self.get_tmpdir(), "ldif-data")) - delete_directory_content(ldif_dir) # clear it out - cfg.append("database ldif") - cfg.append("directory " + quote(ldif_dir)) - - cfg.append("suffix " + quote(self.suffix)) - cfg.append("rootdn " + quote(self.root_dn)) - cfg.append("rootpw " + quote(self.root_password)) + # reinitialize database + delete_directory_content(self._db_directory) + # generate slapd.conf lines + config_dict = { + 'path_schema_core': quote(self.PATH_SCHEMA_CORE), + 'loglevel': self.slapd_loglevel, + 'database': quote(self.database), + 'directory': quote(self._db_directory), + 'suffix': quote(self.suffix), + 'rootdn': quote(self.root_dn), + 'rootpw': quote(self.root_pw), + } + return SLAPD_CONF_TEMPLATE % config_dict def _write_config(self): """Writes the slapd.conf file out, and returns the path to it.""" - path = os.path.join(self._tmpdir, "slapd.conf") - _ = mkdirs(self._tmpdir) - if os.access(path, os.F_OK): - self._log.debug("deleting existing %s", path) - os.remove(path) - self._log.debug("writing config to %s", path) - file(path, "w").writelines([line + "\n" for line in self._config]) - return path + mkdirs(self._tmpdir) + mkdirs(self._db_directory) + self._log.debug("writing config to %s", self._config) + config_file = file(self._slapd_conf, "wb") + config_file.write(self._config) + config_file.close() def start(self): """ @@ -150,7 +154,6 @@ def start(self): ok = False config_path = None try: - self.configure(self._config) self._test_configuration() self._start_slapd() self._wait_for_slapd() @@ -167,15 +170,14 @@ def start(self): def _start_slapd(self): # Spawns/forks the slapd process - config_path = self._write_config() + self._write_config() self._log.info("starting slapd") self._proc = subprocess.Popen([ self.PATH_SLAPD, - "-f", config_path, + "-f", self._slapd_conf, "-h", self.get_url(), - "-d", str(self._slapd_debug_level), + "-d", "0", ]) - self._proc_config = config_path def _wait_for_slapd(self): # Waits until the LDAP server socket is open, or slapd crashed @@ -224,27 +226,31 @@ def _stopped(self): self._log.info("slapd terminated") self._proc = None try: - os.remove(self._proc_config) + os.remove(self._slapd_conf) except os.error: - self._log.debug("could not remove %s", self._proc_config) + self._log.debug("could not remove %s", self._slapd_conf) def _test_configuration(self): - config_path = self._write_config() + self._write_config() + self._log.debug("testing configuration") + popen_list = [ + self.PATH_SLAPTEST, + "-f", self._slapd_conf, +# "-u", os.environ['USER'], + ] + if self._log.isEnabledFor(logging.DEBUG): + popen_list.append('-v') + popen_list.extend(['-d', 'config']) + else: + popen_list.append('-Q') try: - self._log.debug("testing configuration") - verboseflag = "-Q" - if self._log.isEnabledFor(logging.DEBUG): - verboseflag = "-v" - p = subprocess.Popen([ - self.PATH_SLAPTEST, - verboseflag, - "-f", config_path - ]) + p = subprocess.Popen(popen_list) if p.wait() != 0: raise RuntimeError("configuration test failed") self._log.debug("configuration seems ok") finally: - os.remove(config_path) + pass + #os.remove(self._slapd_conf) def ldapadd(self, ldif, extra_args=None): """Runs ldapadd on this slapd instance, passing it the ldif content""" @@ -255,7 +261,7 @@ def ldapadd(self, ldif, extra_args=None): self.PATH_LDAPADD, "-x", "-D", self.root_dn, - "-w", self.root_password, + "-w", self.root_pw, "-H", self.get_url() ] + extra_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, @@ -278,7 +284,7 @@ def ldapsearch( self.PATH_LDAPSEARCH, "-x", "-D", self.root_dn, - "-w", self.root_password, + "-w", self.root_pw, "-H", self.get_url(), "-b", base, "-s", scope, @@ -346,13 +352,11 @@ def started(self): suffix_dc = self.suffix.split(',')[0][3:] assert self.root_dn.startswith("cn=") assert self.root_dn.endswith("," + self.suffix) - self._log.debug( "adding %s and %s", self.suffix, self.root_dn, ) - self.ldapadd( "\n".join([ 'dn: '+self.suffix, @@ -367,21 +371,3 @@ def started(self): '' ]) ) - -def run(): - logging.basicConfig(level=logging.DEBUG) - slapd = Slapd() - _log.debug("Starting slapd...") - slapd.start() - _log.debug("Contents of LDAP server follow:\n") - for dn, attrs in slapd.ldapsearch(): - _log.debug("dn: " + dn) - for name, val in attrs: - _log.debug("%r: %r", name, val) - _log.debug('slapd URL: %r', slapd.get_url()) - slapd.wait() - -Slapd.check_paths() - -if __name__ == '__main__' and sys.argv == ['run']: - run() diff --git a/Tests/t_cext.py b/Tests/t_cext.py index 19a1255..0311605 100644 --- a/Tests/t_cext.py +++ b/Tests/t_cext.py @@ -5,6 +5,7 @@ import os import unittest from Tests import slapd +from Tests.slapd import SlapdObject # Switch off processing .ldaprc or ldap.conf before importing _ldap os.environ['LDAPNOINIT'] = '1' @@ -15,7 +16,7 @@ def get_reusable_server(): global reusable_server if reusable_server is None: - reusable_server = slapd.Slapd() + reusable_server = SlapdObject() return reusable_server class TestLdapCExtension(unittest.TestCase): @@ -34,7 +35,7 @@ def _init_server(self, reuse_existing=True): if reuse_existing: server = get_reusable_server() else: - server = slapd.Slapd() # private server + server = SlapdObject() # private server server.start() # no effect if already started self.server = server return server @@ -48,7 +49,7 @@ def _init(self, reuse_existing=True, bind=True): if bind: # Perform a simple bind l.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION3) - m = l.simple_bind(server.root_dn, server.root_password) + m = l.simple_bind(server.root_dn, server.root_pw) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ONE, self.timeout) self.assertTrue(result, _ldap.RES_BIND) return l diff --git a/Tests/t_search.py b/Tests/t_search.py index e938cab..f051fe2 100644 --- a/Tests/t_search.py +++ b/Tests/t_search.py @@ -1,6 +1,11 @@ -import ldap, unittest -import slapd +import os +import unittest +from Tests.slapd import SlapdObject +# Switch off processing .ldaprc or ldap.conf before importing _ldap +os.environ['LDAPNOINIT'] = '1' + +import ldap from ldap.ldapobject import LDAPObject server = None @@ -10,9 +15,9 @@ class TestSearch(unittest.TestCase): def setUp(self): global server if server is None: - server = slapd.Slapd() + server = SlapdObject() server.start() - base = server.get_dn_suffix() + base = server.suffix # insert some Foo* objects via ldapadd server.ldapadd("\n".join([ @@ -41,13 +46,13 @@ def setUp(self): l = LDAPObject(server.get_url()) l.protocol_version = 3 l.set_option(ldap.OPT_REFERRALS,0) - l.simple_bind_s(server.get_root_dn(), - server.get_root_password()) + l.simple_bind_s(server.root_dn, + server.root_pw) self.ldap = l self.server = server def test_search_subtree(self): - base = self.server.get_dn_suffix() + base = self.server.suffix l = self.ldap result = l.search_s(base, ldap.SCOPE_SUBTREE, '(cn=Foo*)', ['*']) @@ -65,7 +70,7 @@ def test_search_subtree(self): ) def test_search_onelevel(self): - base = self.server.get_dn_suffix() + base = self.server.suffix l = self.ldap result = l.search_s(base, ldap.SCOPE_ONELEVEL, '(cn=Foo*)', ['*']) @@ -81,7 +86,7 @@ def test_search_onelevel(self): ) def test_search_oneattr(self): - base = self.server.get_dn_suffix() + base = self.server.suffix l = self.ldap result = l.search_s(base, ldap.SCOPE_SUBTREE, '(cn=Foo4)', ['cn'])