From 43d3dc2fd228bcec160f75c81f319af7c4bfcc83 Mon Sep 17 00:00:00 2001 From: stroeder Date: Wed, 26 Apr 2017 08:58:06 +0000 Subject: [PATCH] code-cleaning in slapd.py and t_cext.py and set env var LDAPNOINIT=1 --- CHANGES | 7 +- Tests/slapd.py | 216 +++++++++++++++++++------------------ Tests/t_cext.py | 281 ++++++++++++++++++++++++++++-------------------- 3 files changed, 280 insertions(+), 224 deletions(-) diff --git a/CHANGES b/CHANGES index 98173d9..7261105 100644 --- a/CHANGES +++ b/CHANGES @@ -7,6 +7,11 @@ Lib/ * gracefully handle KeyError in LDAPObject._ldap_call() when using errno +Tests/ +* code-cleaning in slapd.py and t_cext.py +* set env var LDAPNOINIT=1 in t_cext.py to avoid interference + with locally installed .ldaprc or ldap.conf + ---------------------------------------------------------------- Released 2.4.35 2017-04-25 @@ -1383,4 +1388,4 @@ Released 2.0.0pre02 2002-02-01 ---------------------------------------------------------------- Released 1.10alpha3 2000-09-19 -$Id: CHANGES,v 1.424 2017/04/25 17:50:33 stroeder Exp $ +$Id: CHANGES,v 1.425 2017/04/26 08:58:06 stroeder Exp $ diff --git a/Tests/slapd.py b/Tests/slapd.py index 0c5d925..b0b0696 100644 --- a/Tests/slapd.py +++ b/Tests/slapd.py @@ -4,13 +4,19 @@ and talking to it with ldapsearch/ldapadd. """ -import sys, os, socket, time, subprocess, logging +import sys +import os +import socket +import time +import subprocess +import logging +import base64 _log = logging.getLogger("slapd") def quote(s): '''Quotes the '"' and '\' characters in a string and surrounds with "..."''' - return '"' + s.replace('\\','\\\\').replace('"','\\"') + '"' + return '"%s"' % s.replace('\\', '\\\\').replace('"', '\\"') def mkdirs(path): """Creates the directory path unless it already exists""" @@ -20,7 +26,7 @@ def mkdirs(path): return path def delete_directory_content(path): - for dirpath,dirnames,filenames in os.walk(path, topdown=False): + for dirpath, dirnames, filenames in os.walk(path, topdown=False): for n in filenames: _log.info("remove %s", os.path.join(dirpath, n)) os.remove(os.path.join(dirpath, n)) @@ -50,6 +56,11 @@ class Slapd: server is shut down. """ + suffix = "dc=slapd-test,dc=python-ldap,dc=org" + root_cn = "Manager" + root_dn = "cn=%s,%s" % (root_cn, suffix) + root_password = "password" + _log = logging.getLogger("Slapd") # Use /var/tmp to placate apparmour on Ubuntu: @@ -62,18 +73,16 @@ class Slapd: PATH_SLAPD = os.path.join(PATH_SBINDIR, "slapd") PATH_SLAPTEST = os.path.join(PATH_SBINDIR, "slaptest") - # TODO add paths for other OSs - def check_paths(cls): """ Checks that the configured executable paths look valid. If they don't, then logs warning messages (not errors). """ - for name,path in ( + for name, path in ( ("slapd", cls.PATH_SLAPD), ("ldapadd", cls.PATH_LDAPADD), ("ldapsearch", cls.PATH_LDAPSEARCH), - ): + ): cls._log.debug("checking %s executable at %s", name, path) if not os.access(path, os.X_OK): cls._log.warn("cannot find %s executable at %s", name, path) @@ -84,43 +93,19 @@ def __init__(self): self._proc = None self._port = 0 self._tmpdir = self.PATH_TMPDIR - self._dn_suffix = "dc=python-ldap,dc=org" - self._root_cn = "Manager" - self._root_password = "password" self._slapd_debug_level = 0 - # Setters - def set_port(self, port): - self._port = port - def set_dn_suffix(self, dn): - self._dn_suffix = dn - def set_root_cn(self, cn): - self._root_cn = cn - def set_root_password(self, pw): - self._root_password = pw - def set_tmpdir(self, path): - self._tmpdir = path - def set_slapd_debug_level(self, level): - self._slapd_debug_level = level - def set_debug(self): - self._log.setLevel(logging.DEBUG) - self.set_slapd_debug_level('Any') - # getters def get_url(self): return "ldap://%s:%d/" % self.get_address() + def get_address(self): if self._port == 0: self._port = find_available_tcp_port(LOCALHOST) return (LOCALHOST, self._port) - def get_dn_suffix(self): - return self._dn_suffix - def get_root_dn(self): - return "cn=" + self._root_cn + "," + self.get_dn_suffix() - def get_root_password(self): - return self._root_password + def get_tmpdir(self): - return os.environ.get('TMP',self._tmpdir) + return os.environ.get('TMP', self._tmpdir) def __del__(self): self.stop() @@ -142,14 +127,14 @@ def configure(self, cfg): cfg.append("database ldif") cfg.append("directory " + quote(ldif_dir)) - cfg.append("suffix " + quote(self.get_dn_suffix())) - cfg.append("rootdn " + quote(self.get_root_dn())) - cfg.append("rootpw " + quote(self.get_root_password())) + cfg.append("suffix " + quote(self.suffix)) + cfg.append("rootdn " + quote(self.root_dn)) + cfg.append("rootpw " + quote(self.root_password)) def _write_config(self): """Writes the slapd.conf file out, and returns the path to it.""" path = os.path.join(self._tmpdir, "slapd.conf") - ldif_dir = mkdirs(self._tmpdir) + _ = mkdirs(self._tmpdir) if os.access(path, os.F_OK): self._log.debug("deleting existing %s", path) os.remove(path) @@ -184,11 +169,12 @@ def _start_slapd(self): # Spawns/forks the slapd process config_path = self._write_config() self._log.info("starting slapd") - self._proc = subprocess.Popen([self.PATH_SLAPD, - "-f", config_path, - "-h", self.get_url(), - "-d", str(self._slapd_debug_level), - ]) + self._proc = subprocess.Popen([ + self.PATH_SLAPD, + "-f", config_path, + "-h", self.get_url(), + "-d", str(self._slapd_debug_level), + ]) self._proc_config = config_path def _wait_for_slapd(self): @@ -199,12 +185,12 @@ def _wait_for_slapd(self): self._stopped() raise RuntimeError("slapd exited before opening port") try: - self._log.debug("Connecting to %s", repr(self.get_address())) - s.connect(self.get_address()) - s.close() - return + self._log.debug("Connecting to %s", repr(self.get_address())) + s.connect(self.get_address()) + s.close() + return except socket.error: - time.sleep(1) + time.sleep(1) def stop(self): """Stops the slapd server, and waits for it to terminate""" @@ -213,11 +199,9 @@ def stop(self): if hasattr(self._proc, 'terminate'): self._proc.terminate() else: - import posix, signal - posix.kill(self._proc.pid, signal.SIGHUP) - #time.sleep(1) - #posix.kill(self._proc.pid, signal.SIGTERM) - #posix.kill(self._proc.pid, signal.SIGKILL) + import posix + import signal + posix.kill(self._proc.pid, signal.SIGTERM) self.wait() def restart(self): @@ -262,33 +246,46 @@ def _test_configuration(self): finally: os.remove(config_path) - def ldapadd(self, ldif, extra_args=[]): + def ldapadd(self, ldif, extra_args=None): """Runs ldapadd on this slapd instance, passing it the ldif content""" + extra_args = extra_args or [] self._log.debug("adding %s", repr(ldif)) - p = subprocess.Popen([self.PATH_LDAPADD, + p = subprocess.Popen( + [ + self.PATH_LDAPADD, "-x", - "-D", self.get_root_dn(), - "-w", self.get_root_password(), - "-H", self.get_url()] + extra_args, - stdin = subprocess.PIPE, stdout=subprocess.PIPE) + "-D", self.root_dn, + "-w", self.root_password, + "-H", self.get_url() + ] + extra_args, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + ) p.communicate(ldif) if p.wait() != 0: raise RuntimeError("ldapadd process failed") - def ldapsearch(self, base=None, filter='(objectClass=*)', attrs=[], - scope='sub', extra_args=[]): - if base is None: base = self.get_dn_suffix() - self._log.debug("ldapsearch filter=%s", repr(filter)) - p = subprocess.Popen([self.PATH_LDAPSEARCH, + def ldapsearch( + self, base=None, filterstr='(objectClass=*)', attrs=None, + scope='sub', extra_args=None + ): + attrs = attrs or [] + extra_args = extra_args or [] + if base is None: + base = self.suffix + self._log.debug("ldapsearch filterstr=%s", repr(filterstr)) + p = subprocess.Popen( + [ + self.PATH_LDAPSEARCH, "-x", - "-D", self.get_root_dn(), - "-w", self.get_root_password(), + "-D", self.root_dn, + "-w", self.root_password, "-H", self.get_url(), "-b", base, "-s", scope, "-LL", - ] + extra_args + [ filter ] + attrs, - stdout = subprocess.PIPE) + ]+extra_args+[filterstr]+attrs, + stdout=subprocess.PIPE, + ) output = p.communicate()[0] if p.wait() != 0: raise RuntimeError("ldapadd process failed") @@ -307,14 +304,17 @@ def ldapsearch(self, base=None, filter='(objectClass=*)', attrs=[], lines = [l for l in lines if not l.startswith("#")] # Remove leading version and blank line(s) - if lines and lines[0] == '': del lines[0] + if lines and lines[0] == '': + del lines[0] if not lines or lines[0] != 'version: 1': raise RuntimeError("expected 'version: 1', got " + repr(lines[:1])) del lines[0] - if lines and lines[0] == '': del lines[0] + if lines and lines[0] == '': + del lines[0] # ensure the ldif ends with a blank line (unless it is just blank) - if lines and lines[-1] != '': lines.append('') + if lines and lines[-1] != '': + lines.append('') objects = [] obj = [] @@ -325,14 +325,14 @@ def ldapsearch(self, base=None, filter='(objectClass=*)', attrs=[], objects.append((obj[0][1], obj[1:])) obj = [] else: - attr,value = line.split(':',2) + attr, value = line.split(':', 2) if value.startswith(': '): value = base64.decodestring(value[2:]) elif value.startswith(' '): value = value[1:] else: raise RuntimeError("bad line: " + repr(line)) - obj.append((attr,value)) + obj.append((attr, value)) assert obj == [] return objects @@ -342,42 +342,46 @@ def started(self): By default, this method adds the two initial objects, the domain object and the root user object. """ - assert self.get_dn_suffix().startswith("dc=") - suffix_dc = self.get_dn_suffix().split(',')[0][3:] - assert self.get_root_dn().startswith("cn=") - assert self.get_root_dn().endswith("," + self.get_dn_suffix()) - root_cn = self.get_root_dn().split(',')[0][3:] - - self._log.debug("adding %s and %s", - self.get_dn_suffix(), - self.get_root_dn()) - - self.ldapadd("\n".join([ - 'dn: ' + self.get_dn_suffix(), - 'objectClass: dcObject', - 'objectClass: organization', - 'dc: ' + suffix_dc, - 'o: ' + suffix_dc, - '', - 'dn: ' + self.get_root_dn(), - 'objectClass: organizationalRole', - 'cn: ' + root_cn, - '' - ])) - -Slapd.check_paths() + assert self.suffix.startswith("dc=") + suffix_dc = self.suffix.split(',')[0][3:] + assert self.root_dn.startswith("cn=") + assert self.root_dn.endswith("," + self.suffix) + + self._log.debug( + "adding %s and %s", + self.suffix, + self.root_dn, + ) + + self.ldapadd( + "\n".join([ + 'dn: '+self.suffix, + 'objectClass: dcObject', + 'objectClass: organization', + 'dc: '+suffix_dc, + 'o: '+suffix_dc, + '', + 'dn: '+self.root_dn, + 'objectClass: organizationalRole', + 'cn: '+self.root_cn, + '' + ]) + ) -if __name__ == '__main__' and sys.argv == ['run']: +def run(): logging.basicConfig(level=logging.DEBUG) slapd = Slapd() - print("Starting slapd...") + _log.debug("Starting slapd...") slapd.start() - print("Contents of LDAP server follow:\n") - for dn,attrs in slapd.ldapsearch(): - print("dn: " + dn) - for name,val in attrs: - print(name + ": " + val) - print("") - print(slapd.get_url()) + _log.debug("Contents of LDAP server follow:\n") + for dn, attrs in slapd.ldapsearch(): + _log.debug("dn: " + dn) + for name, val in attrs: + _log.debug("%r: %r", name, val) + _log.debug('slapd URL: %r', slapd.get_url()) slapd.wait() +Slapd.check_paths() + +if __name__ == '__main__' and sys.argv == ['run']: + run() diff --git a/Tests/t_cext.py b/Tests/t_cext.py index 159d6fb..19a1255 100644 --- a/Tests/t_cext.py +++ b/Tests/t_cext.py @@ -1,10 +1,16 @@ +""" +Tests the LDAP C Extension module called _ldap +""" +import os import unittest -import _ldap -import logging - from Tests import slapd +# Switch off processing .ldaprc or ldap.conf before importing _ldap +os.environ['LDAPNOINIT'] = '1' + +import _ldap + reusable_server = None def get_reusable_server(): global reusable_server @@ -13,40 +19,43 @@ def get_reusable_server(): return reusable_server class TestLdapCExtension(unittest.TestCase): - """Tests the LDAP C Extension module, _ldap. - These tests apply only to the _ldap module and bypass the - LDAPObject wrapper completely.""" + """ + These tests apply only to the _ldap module and therefore bypass the + LDAPObject wrapper completely. + """ timeout = 3 def _init_server(self, reuse_existing=True): + """ + Sets self.server to a test LDAP server and self.server.suffix to its base + """ global reusable_server - """Sets self.server to a test LDAP server and self.base - to its base""" if reuse_existing: server = get_reusable_server() else: server = slapd.Slapd() # private server - #server.set_debug() # enables verbose messages server.start() # no effect if already started self.server = server - self.base = server.get_dn_suffix() return server def _init(self, reuse_existing=True, bind=True): - """Starts a server, and returns a LDAPObject bound to it""" + """ + Starts a server, and returns a LDAPObject bound to it + """ server = self._init_server(reuse_existing) l = _ldap.initialize(server.get_url()) if bind: # Perform a simple bind l.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION3) - m = l.simple_bind(server.get_root_dn(), server.get_root_password()) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ONE, self.timeout) + m = l.simple_bind(server.root_dn, server.root_password) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ONE, self.timeout) self.assertTrue(result, _ldap.RES_BIND) return l def assertNotNone(self, expr, msg=None): self.failIf(expr is None, msg or repr(expr)) + def assertNone(self, expr, msg=None): self.failIf(expr is not None, msg or repr(expr)) @@ -146,7 +155,7 @@ def test_constants(self): self.assertNotNone(_ldap.AVA_NONPRINTABLE) # these two constants are pointless? XXX - self.assertEquals(_ldap.OPT_ON, 1) + self.assertEquals(_ldap.OPT_ON, 1) self.assertEquals(_ldap.OPT_OFF, 0) # these constants useless after ldap_url_parse() was dropped XXX @@ -159,7 +168,7 @@ def test_simple_bind(self): def test_simple_anonymous_bind(self): l = self._init(bind=False) m = l.simple_bind("", "") - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertTrue(result, _ldap.RES_BIND) self.assertEquals(msgid, m) self.assertEquals(pmsg, []) @@ -167,7 +176,7 @@ def test_simple_anonymous_bind(self): # see if we can get the rootdse while we're here m = l.search_ext("", _ldap.SCOPE_BASE, '(objectClass=*)') - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_SEARCH_RESULT) self.assertEquals(pmsg[0][0], "") # rootDSE has no dn self.assertEquals(msgid, m) @@ -185,22 +194,25 @@ def test_unbind(self): def test_search_ext_individual(self): l = self._init() - m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, - '(objectClass=dcObject)') - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ONE, self.timeout) + m = l.search_ext( + self.server.suffix, + _ldap.SCOPE_SUBTREE, + '(objectClass=dcObject)' + ) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ONE, self.timeout) # Expect to get just one object self.assertEquals(result, _ldap.RES_SEARCH_ENTRY) self.assertEquals(len(pmsg), 1) self.assertEquals(len(pmsg[0]), 2) - self.assertEquals(pmsg[0][0], self.base) - self.assertEquals(pmsg[0][0], self.base) + self.assertEquals(pmsg[0][0], self.server.suffix) + self.assertEquals(pmsg[0][0], self.server.suffix) self.assertTrue('dcObject' in pmsg[0][1]['objectClass']) self.assertTrue('organization' in pmsg[0][1]['objectClass']) self.assertEquals(msgid, m) self.assertEquals(ctrls, []) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ONE, self.timeout) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ONE, self.timeout) self.assertEquals(result, _ldap.RES_SEARCH_RESULT) self.assertEquals(pmsg, []) self.assertEquals(msgid, m) @@ -209,7 +221,7 @@ def test_search_ext_individual(self): def test_abandon(self): l = self._init() - m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(objectClass=*)') + m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(objectClass=*)') ret = l.abandon_ext(m) self.assertNone(ret) @@ -224,8 +236,8 @@ def test_abandon(self): def test_search_ext_all(self): l = self._init() - m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(objectClass=*)') - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(objectClass=*)') + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) # Expect to get some objects self.assertEquals(result, _ldap.RES_SEARCH_RESULT) @@ -236,21 +248,24 @@ def test_search_ext_all(self): def test_add(self): l = self._init() - m = l.add_ext("cn=Foo," + self.base, [ - ('objectClass','organizationalRole'), - ('cn', 'Foo'), - ('description', 'testing'), - ]) + m = l.add_ext( + "cn=Foo," + self.server.suffix, + [ + ('objectClass', 'organizationalRole'), + ('cn', 'Foo'), + ('description', 'testing'), + ] + ) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) self.assertEquals(pmsg, []) self.assertEquals(msgid, m) self.assertEquals(ctrls, []) # search for it back - m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=Foo)') - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(cn=Foo)') + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) # Expect to get the objects self.assertEquals(result, _ldap.RES_SEARCH_RESULT) @@ -258,23 +273,33 @@ def test_add(self): self.assertEquals(msgid, m) self.assertEquals(ctrls, []) - self.assertEquals(pmsg[0], ('cn=Foo,'+self.base, - { 'objectClass': ['organizationalRole'], - 'cn': ['Foo'], - 'description': ['testing'] })) + self.assertEquals( + pmsg[0], + ( + 'cn=Foo,'+self.server.suffix, + { + 'objectClass': ['organizationalRole'], + 'cn': ['Foo'], + 'description': ['testing'], + } + ) + ) def test_compare(self): l = self._init() # first, add an object with a field we can compare on - dn = "cn=CompareTest," + self.base - m = l.add_ext(dn, [ - ('objectClass','person'), - ('sn', 'CompareTest'), - ('cn', 'CompareTest'), - ('userPassword', 'the_password'), - ]) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + dn = "cn=CompareTest," + self.server.suffix + m = l.add_ext( + dn, + [ + ('objectClass', 'person'), + ('sn', 'CompareTest'), + ('cn', 'CompareTest'), + ('userPassword', 'the_password'), + ], + ) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) # try a false compare @@ -311,7 +336,7 @@ def test_delete_no_such_object(self): # try deleting an object that doesn't exist not_found = False - m = l.delete_ext("cn=DoesNotExist,"+self.base) + m = l.delete_ext("cn=DoesNotExist,"+self.server.suffix) try: r = l.result4(m, _ldap.MSG_ALL, self.timeout) self.fail(r) @@ -322,16 +347,19 @@ def test_delete_no_such_object(self): def test_delete(self): l = self._init() # first, add an object we will delete - dn = "cn=Deleteme,"+self.base - m = l.add_ext(dn, [ - ('objectClass','organizationalRole'), - ('cn', 'Deleteme'), - ]) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + dn = "cn=Deleteme,"+self.server.suffix + m = l.add_ext( + dn, + [ + ('objectClass', 'organizationalRole'), + ('cn', 'Deleteme'), + ] + ) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) m = l.delete_ext(dn) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_DELETE) self.assertEquals(msgid, m) self.assertEquals(pmsg, []) @@ -342,9 +370,12 @@ def test_modify_no_such_object(self): # try deleting an object that doesn't exist not_found = False - m = l.modify_ext("cn=DoesNotExist,"+self.base, [ + m = l.modify_ext( + "cn=DoesNotExist,"+self.server.suffix, + [ (_ldap.MOD_ADD, 'description', ['blah']), - ]) + ] + ) try: r = l.result4(m, _ldap.MSG_ALL, self.timeout) self.fail(r) @@ -354,13 +385,14 @@ def test_modify_no_such_object(self): def DISABLED_test_modify_no_such_object_empty_attrs(self): # XXX ldif-backend for slapd appears broken??? - l = self._init() - # try deleting an object that doesn't exist - m = l.modify_ext("cn=DoesNotExist,"+self.base, [ + m = l.modify_ext( + "cn=DoesNotExist,"+self.server.suffix, + [ (_ldap.MOD_ADD, 'description', []), - ]) + ] + ) self.assertTrue(isinstance(m, int)) r = l.result4(m, _ldap.MSG_ALL, self.timeout) # what should happen?? self.fail(r) @@ -368,35 +400,38 @@ def DISABLED_test_modify_no_such_object_empty_attrs(self): def test_modify(self): l = self._init() # first, add an object we will delete - dn = "cn=AddToMe,"+self.base - m = l.add_ext(dn, [ - ('objectClass','person'), - ('cn', 'AddToMe'), - ('sn', 'Modify'), - ('description', 'a description'), - ]) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + dn = "cn=AddToMe,"+self.server.suffix + m = l.add_ext( + dn, + [ + ('objectClass', 'person'), + ('cn', 'AddToMe'), + ('sn', 'Modify'), + ('description', 'a description'), + ] + ) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) - m = l.modify_ext(dn, [ + m = l.modify_ext( + dn, + [ (_ldap.MOD_ADD, 'description', ['b desc', 'c desc']), - ]) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + ] + ) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_MODIFY) self.assertEquals(pmsg, []) self.assertEquals(msgid, m) self.assertEquals(ctrls, []) - # search for it back - m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=AddToMe)') - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) - + m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(cn=AddToMe)') + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) # Expect to get the objects self.assertEquals(result, _ldap.RES_SEARCH_RESULT) self.assertEquals(len(pmsg), 1) self.assertEquals(msgid, m) self.assertEquals(ctrls, []) - self.assertEquals(pmsg[0][0], dn) d = list(pmsg[0][1]['description']) d.sort() @@ -404,34 +439,37 @@ def test_modify(self): def test_rename(self): l = self._init() - dn = "cn=RenameMe,"+self.base - m = l.add_ext(dn, [ - ('objectClass','organizationalRole'), - ('cn', 'RenameMe'), - ]) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + dn = "cn=RenameMe,"+self.server.suffix + m = l.add_ext( + dn, + [ + ('objectClass', 'organizationalRole'), + ('cn', 'RenameMe'), + ] + ) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) # do the rename with same parent m = l.rename(dn, "cn=IAmRenamed") - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_MODRDN) self.assertEquals(msgid, m) self.assertEquals(pmsg, []) self.assertEquals(ctrls, []) # make sure the old one is gone - m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=RenameMe)') - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(cn=RenameMe)') + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_SEARCH_RESULT) self.assertEquals(len(pmsg), 0) # expect no results self.assertEquals(msgid, m) self.assertEquals(ctrls, []) # check that the new one looks right - dn2 = "cn=IAmRenamed,"+self.base - m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=IAmRenamed)') - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + dn2 = "cn=IAmRenamed,"+self.server.suffix + m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(cn=IAmRenamed)') + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_SEARCH_RESULT) self.assertEquals(msgid, m) self.assertEquals(ctrls, []) @@ -440,23 +478,29 @@ def test_rename(self): self.assertEquals(pmsg[0][1]['cn'], ['IAmRenamed']) # create the container - containerDn = "ou=RenameContainer,"+self.base - m = l.add_ext(containerDn, [ - ('objectClass','organizationalUnit'), - ('ou', 'RenameContainer'), - ]) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + containerDn = "ou=RenameContainer,"+self.server.suffix + m = l.add_ext( + containerDn, + [ + ('objectClass', 'organizationalUnit'), + ('ou', 'RenameContainer'), + ] + ) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) - # WORKAROUND bug in slapd. (Without an existing child, + # WORKAROUND bug in slapd. (Without an existing child, # renames into a container object do not work for the ldif backend, # the renamed object appears to be deleted, not moved.) # see http://www.openldap.org/its/index.cgi/Software%20Bugs?id=5408 - m = l.add_ext("cn=Bogus," + containerDn, [ - ('objectClass','organizationalRole'), - ('cn', 'Bogus'), - ]) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + m = l.add_ext( + "cn=Bogus," + containerDn, + [ + ('objectClass', 'organizationalRole'), + ('cn', 'Bogus'), + ] + ) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) # now rename from dn2 to the conater @@ -464,26 +508,26 @@ def test_rename(self): # Now try renaming dn2 across container (simultaneous name change) m = l.rename(dn2, "cn=IAmRenamedAgain", containerDn) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_MODRDN) self.assertEquals(msgid, m) self.assertEquals(pmsg, []) self.assertEquals(ctrls, []) # make sure dn2 is gone - m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=IAmRenamed)') - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(cn=IAmRenamed)') + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_SEARCH_RESULT) self.assertEquals(len(pmsg), 0) # expect no results self.assertEquals(msgid, m) self.assertEquals(ctrls, []) - m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(objectClass=*)') - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(objectClass=*)') + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) # make sure dn3 is there - m = l.search_ext(self.base, _ldap.SCOPE_SUBTREE, '(cn=IAmRenamedAgain)') - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(cn=IAmRenamedAgain)') + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_SEARCH_RESULT) self.assertEquals(msgid, m) self.assertEquals(ctrls, []) @@ -495,7 +539,7 @@ def test_rename(self): def test_whoami(self): l = self._init() r = l.whoami_s() - self.assertEquals("dn:" + self.server.get_root_dn(), r) + self.assertEquals("dn:" + self.server.root_dn, r) def test_whoami_unbound(self): l = self._init(bind=False) @@ -509,7 +553,7 @@ def test_whoami_anonymous(self): # Anonymous bind m = l.simple_bind("", "") - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertTrue(result, _ldap.RES_BIND) r = l.whoami_s() @@ -519,14 +563,17 @@ def test_passwd(self): l = self._init() # first, create a user to change password on - dn = "cn=PasswordTest," + self.base - m = l.add_ext(dn, [ - ('objectClass','person'), - ('sn', 'PasswordTest'), - ('cn', 'PasswordTest'), - ('userPassword', 'initial'), - ]) - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + dn = "cn=PasswordTest," + self.server.suffix + m = l.add_ext( + dn, + [ + ('objectClass', 'person'), + ('sn', 'PasswordTest'), + ('cn', 'PasswordTest'), + ('userPassword', 'initial'), + ] + ) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) # try changing password with a wrong old-pw @@ -539,7 +586,7 @@ def test_passwd(self): # try changing password with a correct old-pw m = l.passwd(dn, "initial", "changed") - result,pmsg,msgid,ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) + result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(msgid, m) self.assertEquals(pmsg, []) self.assertEquals(result, _ldap.RES_EXTENDED)