diff --git a/Tests/slapd.py b/Tests/slapd.py index 8e2bd65..caafac7 100644 --- a/Tests/slapd.py +++ b/Tests/slapd.py @@ -1,4 +1,3 @@ - """ Utilities for starting up a test slapd server and talking to it with ldapsearch/ldapadd. @@ -9,7 +8,6 @@ import time import subprocess import logging -import base64 import unittest # determine log level @@ -29,46 +27,50 @@ # a template string for generating simple slapd.conf file SLAPD_CONF_TEMPLATE = """ moduleload back_%(database)s -include %(schema_include)s +include "%(schema_include)s" loglevel %(loglevel)s allow bind_v2 database %(database)s -directory %(directory)s -suffix %(suffix)s -rootdn %(rootdn)s -rootpw %(rootpw)s +directory "%(directory)s" +suffix "%(suffix)s" +rootdn "%(rootdn)s" +rootpw "%(rootpw)s" """ -def quote(s): - '''Quotes the '"' and '\' characters in a string and surrounds with "..."''' - return '"%s"' % s.replace('\\', '\\\\').replace('"', '\\"') +LOCALHOST = '127.0.0.1' def mkdirs(path): - """Creates the directory path unless it already exists""" + """ + Creates the directory path unless it already exists + """ if not os.access(os.path.join(path, os.path.curdir), os.F_OK): _LOGGER.debug("creating temp directory %s", path) os.mkdir(path) def delete_directory_content(path): + """ + Recursively delete content of directory + """ for dirpath, dirnames, filenames in os.walk(path, topdown=False): - for n in filenames: - _LOGGER.info("remove %s", os.path.join(dirpath, n)) - os.remove(os.path.join(dirpath, n)) - for n in dirnames: - _LOGGER.info("rmdir %s", os.path.join(dirpath, n)) - os.rmdir(os.path.join(dirpath, n)) - -LOCALHOST = '127.0.0.1' + for filename in filenames: + _LOGGER.info("remove %s", os.path.join(dirpath, filename)) + os.remove(os.path.join(dirpath, filename)) + for dirname in dirnames: + _LOGGER.info("rmdir %s", os.path.join(dirpath, dirname)) + os.rmdir(os.path.join(dirpath, dirname)) def find_available_tcp_port(host=LOCALHOST): - s = socket.socket() - s.bind((host, 0)) - port = s.getsockname()[1] - s.close() + """ + find an available port for TCP connection + """ + sock = socket.socket() + sock.bind((host, 0)) + port = sock.getsockname()[1] + sock.close() _LOGGER.info("Found available port %d", port) return port -class SlapdObject: +class SlapdObject(object): """ Controller class for a slapd instance, OpenLDAP's server. @@ -94,7 +96,6 @@ class SlapdObject: INIT_SCHEMA_FILE = os.environ.get('SCHEMA_FILE', 'core.schema') INIT_SCHEMA_PATH = os.environ.get('SCHEMA_PATH', os.path.join(SCHEMADIR, INIT_SCHEMA_FILE)) PATH_LDAPADD = os.path.join(BINDIR, 'ldapadd') - PATH_LDAPSEARCH = os.path.join(BINDIR, 'ldapsearch') PATH_LDAPWHOAMI = os.path.join(BINDIR, 'ldapwhoami') PATH_SLAPD = os.path.join(SBINDIR, 'slapd') PATH_SLAPTEST = os.path.join(SBINDIR, 'slaptest') @@ -104,22 +105,22 @@ def __init__(self): self._port = find_available_tcp_port(LOCALHOST) self.ldap_uri = "ldap://%s:%d/" % (LOCALHOST, self._port) self._log = _LOGGER - self._tmpdir = os.path.join(self.TMPDIR, 'python-ldap-test') - self._slapd_conf = os.path.join(self._tmpdir, "slapd.conf") - self._db_directory = os.path.join(self._tmpdir, "openldap-data") + self.testrundir = os.path.join(self.TMPDIR, 'python-ldap-test') + self._slapd_conf = os.path.join(self.testrundir, "slapd.conf") + self._db_directory = os.path.join(self.testrundir, "openldap-data") def _gen_config(self): """ generates a slapd.conf and returns it as one string """ config_dict = { - 'schema_include': quote(self.INIT_SCHEMA_PATH), + 'schema_include': self.INIT_SCHEMA_PATH, 'loglevel': self.slapd_loglevel, 'database': self.database, - 'directory': quote(self._db_directory), - 'suffix': quote(self.suffix), - 'rootdn': quote(self.root_dn), - 'rootpw': quote(self.root_pw), + 'directory': self._db_directory, + 'suffix': self.suffix, + 'rootdn': self.root_dn, + 'rootpw': self.root_pw, } return self.slapd_conf_template % config_dict @@ -135,25 +136,27 @@ def start(self): Starts the slapd server process running, and waits for it to come up. """ if self._proc is None: - ok = False + start_ok = False config_path = None # init directory structure - delete_directory_content(self._tmpdir) - mkdirs(self._tmpdir) + delete_directory_content(self.testrundir) + mkdirs(self.testrundir) mkdirs(self._db_directory) try: self._write_config() self._test_configuration() self._start_slapd() self._wait_for_slapd() - ok = True + start_ok = True self._log.debug("slapd ready at %s", self.ldap_uri) self.started() finally: - if not ok: + if not start_ok: if config_path: - try: os.remove(config_path) - except os.error: pass + try: + os.remove(config_path) + except os.error: + pass if self._proc: self.stop() @@ -170,7 +173,6 @@ def _start_slapd(self): def _wait_for_slapd(self): # Waits until the LDAP server socket is open, or slapd crashed - s = socket.socket() while 1: if self._proc.poll() is not None: self._stopped() @@ -232,8 +234,8 @@ def _test_configuration(self): else: popen_list.append('-Q') try: - p = subprocess.Popen(popen_list) - if p.wait() != 0: + proc = subprocess.Popen(popen_list) + if proc.wait() != 0: raise RuntimeError("configuration test failed") self._log.debug("configuration seems ok") finally: @@ -243,7 +245,7 @@ def ldapwhoami(self, extra_args=None): """Runs ldapwhoami on this slapd instance""" extra_args = extra_args or [] self._log.debug("whoami") - p = subprocess.Popen( + proc = subprocess.Popen( [ self.PATH_LDAPWHOAMI, "-x", @@ -253,14 +255,14 @@ def ldapwhoami(self, extra_args=None): ] + extra_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) - if p.wait() != 0: + if proc.wait() != 0: raise RuntimeError("ldapwhoami process failed") def ldapadd(self, ldif, extra_args=None): """Runs ldapadd on this slapd instance, passing it the ldif content""" extra_args = extra_args or [] self._log.debug("adding %s", repr(ldif)) - p = subprocess.Popen( + proc = subprocess.Popen( [ self.PATH_LDAPADD, "-x", @@ -270,82 +272,10 @@ def ldapadd(self, ldif, extra_args=None): ] + extra_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) - p.communicate(ldif) - if p.wait() != 0: + proc.communicate(ldif) + if proc.wait() != 0: raise RuntimeError("ldapadd process failed") - def ldapsearch( - self, base=None, filterstr='(objectClass=*)', attrs=None, - scope='sub', extra_args=None - ): - attrs = attrs or [] - extra_args = extra_args or [] - if base is None: - base = self.suffix - self._log.debug("ldapsearch filterstr=%s", repr(filterstr)) - p = subprocess.Popen( - [ - self.PATH_LDAPSEARCH, - "-x", - "-D", self.root_dn, - "-w", self.root_pw, - "-H", self.ldap_uri, - "-b", base, - "-s", scope, - "-LL", - ]+extra_args+[filterstr]+attrs, - stdout=subprocess.PIPE, - ) - output = p.communicate()[0] - if p.wait() != 0: - raise RuntimeError("ldapadd process failed") - - # RFC 2849: LDIF format - # unfold - lines = [] - for l in output.split('\n'): - if l.startswith(' '): - lines[-1] = lines[-1] + l[1:] - elif l == '' and lines and lines[-1] == '': - pass # ignore multiple blank lines - else: - lines.append(l) - # Remove comments - lines = [l for l in lines if not l.startswith("#")] - - # Remove leading version and blank line(s) - if lines and lines[0] == '': - del lines[0] - if not lines or lines[0] != 'version: 1': - raise RuntimeError("expected 'version: 1', got " + repr(lines[:1])) - del lines[0] - if lines and lines[0] == '': - del lines[0] - - # ensure the ldif ends with a blank line (unless it is just blank) - if lines and lines[-1] != '': - lines.append('') - - objects = [] - obj = [] - for line in lines: - if line == '': # end of an object - if obj[0][0] != 'dn': - raise RuntimeError("first line not dn", repr(obj)) - objects.append((obj[0][1], obj[1:])) - obj = [] - else: - attr, value = line.split(':', 2) - if value.startswith(': '): - value = base64.decodestring(value[2:]) - elif value.startswith(' '): - value = value[1:] - else: - raise RuntimeError("bad line: " + repr(line)) - obj.append((attr, value)) - assert obj == [] - return objects - def started(self): """ This method is called when the LDAP server has started up and is empty. @@ -392,7 +322,7 @@ def _open_ldap_conn(self, who=None, cred=None): import ldap ldap_conn = self.ldap_object_class(self.server.ldap_uri) ldap_conn.protocol_version = 3 - ldap_conn.set_option(ldap.OPT_REFERRALS,0) + ldap_conn.set_option(ldap.OPT_REFERRALS, 0) ldap_conn.simple_bind_s(who or self.server.root_dn, cred or self.server.root_pw) return ldap_conn @@ -400,16 +330,16 @@ def _open_ldap_conn(self, who=None, cred=None): def setUpClass(cls): if cls.server is None: cls.server = cls.server_class() - cls.server.start() + cls.server.start() cls.server = cls.server @classmethod def tearDownClass(cls): try: cls.server.stop() - except Exception, err: + except AttributeError: pass try: - delete_directory_content(cls.server._tmpdir) - except Exception, err: + delete_directory_content(cls.server.testrundir) + except AttributeError: pass