From 2ce408cf8b7d5b084556de0d36bf5666c1fda515 Mon Sep 17 00:00:00 2001 From: stroeder Date: Wed, 26 Apr 2017 14:54:14 +0000 Subject: [PATCH] re-factored slapd.py, t_cext.py and t_search.py: new class SlapdTestCase used --- CHANGES | 3 +- Tests/slapd.py | 50 +++++++++-- Tests/t_cext.py | 212 ++++++++++++++++++++++------------------------ Tests/t_search.py | 109 +++++++++++------------- 4 files changed, 193 insertions(+), 181 deletions(-) diff --git a/CHANGES b/CHANGES index e574f9e..4174f6a 100644 --- a/CHANGES +++ b/CHANGES @@ -15,6 +15,7 @@ Tests/ which requires fairly recent OpenLDAP builds * env vars can be set for slapd.py to tweak path names of executables, temporary and schema data to be used +* new class SlapdTestCase ---------------------------------------------------------------- Released 2.4.35 2017-04-25 @@ -1392,4 +1393,4 @@ Released 2.0.0pre02 2002-02-01 ---------------------------------------------------------------- Released 1.10alpha3 2000-09-19 -$Id: CHANGES,v 1.428 2017/04/26 13:12:52 stroeder Exp $ +$Id: CHANGES,v 1.429 2017/04/26 14:54:14 stroeder Exp $ diff --git a/Tests/slapd.py b/Tests/slapd.py index 9cba85e..98ec6d4 100644 --- a/Tests/slapd.py +++ b/Tests/slapd.py @@ -10,6 +10,7 @@ import subprocess import logging import base64 +import unittest # determine log level try: @@ -106,14 +107,6 @@ def __init__(self): self._tmpdir = os.path.join(self.TMPDIR, 'python-ldap-test') self._slapd_conf = os.path.join(self._tmpdir, "slapd.conf") self._db_directory = os.path.join(self._tmpdir, "openldap-data") - # init directory structure - delete_directory_content(self._tmpdir) - mkdirs(self._tmpdir) - mkdirs(self._db_directory) - - def __del__(self): - self.stop() - delete_directory_content(self._tmpdir) def _gen_config(self): """ @@ -144,6 +137,10 @@ def start(self): if self._proc is None: ok = False config_path = None + # init directory structure + delete_directory_content(self._tmpdir) + mkdirs(self._tmpdir) + mkdirs(self._db_directory) try: self._write_config() self._test_configuration() @@ -378,3 +375,40 @@ def started(self): '' ]) ) + + +class SlapdTestCase(unittest.TestCase): + """ + test class which also clones or initializes a running slapd + """ + + server = None + + def _open_ldap_conn(self, who=None, cred=None): + """ + return a LDAPObject instance after simple bind + """ + import ldap + ldap_conn = self.ldap_object_class(self.server.ldap_uri) + ldap_conn.protocol_version = 3 + ldap_conn.set_option(ldap.OPT_REFERRALS,0) + ldap_conn.simple_bind_s(who or self.server.root_dn, cred or self.server.root_pw) + return ldap_conn + + @classmethod + def setUpClass(cls): + if cls.server is None: + cls.server = SlapdObject() + cls.server.start() + cls.server = cls.server + + @classmethod + def tearDownClass(cls): + try: + cls.server.stop() + except Exception, err: + pass + try: + delete_directory_content(cls.server._tmpdir) + except Exception, err: + pass diff --git a/Tests/t_cext.py b/Tests/t_cext.py index 1c55b4c..f9b28d8 100644 --- a/Tests/t_cext.py +++ b/Tests/t_cext.py @@ -4,54 +4,35 @@ import os import unittest -from Tests import slapd -from Tests.slapd import SlapdObject +from Tests.slapd import SlapdTestCase # Switch off processing .ldaprc or ldap.conf before importing _ldap os.environ['LDAPNOINIT'] = '1' +# import the plain C wrapper module import _ldap -reusable_server = None -def get_reusable_server(): - global reusable_server - if reusable_server is None: - reusable_server = SlapdObject() - return reusable_server -class TestLdapCExtension(unittest.TestCase): +class TestLdapCExtension(SlapdTestCase): """ These tests apply only to the _ldap module and therefore bypass the LDAPObject wrapper completely. """ - timeout = 3 + timeout = 5 - def _init_server(self, reuse_existing=True): - """ - Sets self.server to a test LDAP server and self.server.suffix to its base - """ - global reusable_server - if reuse_existing: - server = get_reusable_server() - else: - server = SlapdObject() # private server - server.start() # no effect if already started - self.server = server - return server - - def _init(self, reuse_existing=True, bind=True): + def _open_conn(self, bind=True): """ Starts a server, and returns a LDAPObject bound to it """ - server = self._init_server(reuse_existing) - l = _ldap.initialize(server.ldap_uri) + l = _ldap.initialize(self.server.ldap_uri) if bind: # Perform a simple bind l.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION3) - m = l.simple_bind(server.root_dn, server.root_pw) + m = l.simple_bind(self.server.root_dn, self.server.root_pw) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ONE, self.timeout) - self.assertTrue(result, _ldap.RES_BIND) + self.assertEqual(result, _ldap.RES_BIND) + self.assertEqual(type(msgid), type(0)) return l def assertNotNone(self, expr, msg=None): @@ -63,6 +44,9 @@ def assertNone(self, expr, msg=None): # Test for the existence of a whole bunch of constants # that the C module is supposed to export def test_constants(self): + """ + Test whether all libldap-derived constants are correct + """ self.assertEquals(_ldap.PORT, 389) self.assertEquals(_ldap.VERSION1, 1) self.assertEquals(_ldap.VERSION2, 2) @@ -164,11 +148,12 @@ def test_constants(self): self.assertNotNone(_ldap.URL_ERR_MEM) def test_simple_bind(self): - l = self._init() + l = self._open_conn() def test_simple_anonymous_bind(self): - l = self._init(bind=False) + l = self._open_conn(bind=False) m = l.simple_bind("", "") + self.assertEqual(type(m), type(0)) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertTrue(result, _ldap.RES_BIND) self.assertEquals(msgid, m) @@ -184,24 +169,25 @@ def test_simple_anonymous_bind(self): self.assertTrue(pmsg[0][1].has_key('objectClass')) def test_unbind(self): - l = self._init() + l = self._open_conn() m = l.unbind_ext() self.assertNone(m) - # Second attempt to unbind should yield an exception - try: l.unbind_ext() - except _ldap.error: pass + try: + l.unbind_ext() + except _ldap.error: + pass def test_search_ext_individual(self): - l = self._init() - + l = self._open_conn() + # send search request m = l.search_ext( self.server.suffix, _ldap.SCOPE_SUBTREE, '(objectClass=dcObject)' ) + self.assertEqual(type(m), type(0)) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ONE, self.timeout) - # Expect to get just one object self.assertEquals(result, _ldap.RES_SEARCH_ENTRY) self.assertEquals(len(pmsg), 1) @@ -220,26 +206,23 @@ def test_search_ext_individual(self): self.assertEquals(ctrls, []) def test_abandon(self): - l = self._init() - + l = self._open_conn() m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(objectClass=*)') - ret = l.abandon_ext(m) self.assertNone(ret) - - got_timeout = False try: r = l.result4(m, _ldap.MSG_ALL, 0.3) # (timeout /could/ be longer) except _ldap.TIMEOUT, e: - got_timeout = True - self.assertTrue(got_timeout) + pass + else: + self.fail("expected TIMEOUT, got %r" % r) def test_search_ext_all(self): - l = self._init() - + l = self._open_conn() + # send search request m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(objectClass=*)') + self.assertEqual(type(m), type(0)) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) - # Expect to get some objects self.assertEquals(result, _ldap.RES_SEARCH_RESULT) self.assertTrue(len(pmsg) >= 2) @@ -247,8 +230,10 @@ def test_search_ext_all(self): self.assertEquals(ctrls, []) def test_add(self): - l = self._init() - + """ + test add operation + """ + l = self._open_conn() m = l.add_ext( "cn=Foo," + self.server.suffix, [ @@ -257,23 +242,20 @@ def test_add(self): ('description', 'testing'), ] ) - + self.assertEqual(type(m), type(0)) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) self.assertEquals(pmsg, []) self.assertEquals(msgid, m) self.assertEquals(ctrls, []) - # search for it back m = l.search_ext(self.server.suffix, _ldap.SCOPE_SUBTREE, '(cn=Foo)') result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) - # Expect to get the objects self.assertEquals(result, _ldap.RES_SEARCH_RESULT) self.assertEquals(len(pmsg), 1) self.assertEquals(msgid, m) self.assertEquals(ctrls, []) - self.assertEquals( pmsg[0], ( @@ -287,8 +269,10 @@ def test_add(self): ) def test_compare(self): - l = self._init() - + """ + test compare operation + """ + l = self._open_conn() # first, add an object with a field we can compare on dn = "cn=CompareTest," + self.server.suffix m = l.add_ext( @@ -300,53 +284,49 @@ def test_compare(self): ('userPassword', 'the_password'), ], ) + self.assertEqual(type(m), type(0)) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) - # try a false compare m = l.compare_ext(dn, "userPassword", "bad_string") - compared_false = False try: r = l.result4(m, _ldap.MSG_ALL, self.timeout) - self.fail(repr(r)) except _ldap.COMPARE_FALSE: - compared_false = True - self.assertTrue(compared_false) - + pass + else: + self.fail("expected COMPARE_FALSE, got %r" % r) # try a true compare m = l.compare_ext(dn, "userPassword", "the_password") - compared_true = False try: r = l.result4(m, _ldap.MSG_ALL, self.timeout) - self.fail(repr(r)) except _ldap.COMPARE_TRUE: - compared_true = True - self.assertTrue(compared_true) - + pass + else: + self.fail("expected COMPARE_TRUE, got %r" % r) + # try a compare on bad attribute m = l.compare_ext(dn, "badAttribute", "ignoreme") - raised_error = False try: r = l.result4(m, _ldap.MSG_ALL, self.timeout) - self.fail(repr(r)) except _ldap.error: - raised_error = True - self.assertTrue(raised_error) + pass + else: + self.fail("expected LDAPError, got %r" % r) def test_delete_no_such_object(self): - l = self._init() - - # try deleting an object that doesn't exist - not_found = False + """ + try deleting an object that doesn't exist + """ + l = self._open_conn() m = l.delete_ext("cn=DoesNotExist,"+self.server.suffix) try: r = l.result4(m, _ldap.MSG_ALL, self.timeout) - self.fail(r) except _ldap.NO_SUCH_OBJECT: - not_found = True - self.assertTrue(not_found) + pass + else: + self.fail("expected NO_SUCH_OBJECT, got %r" % r) def test_delete(self): - l = self._init() + l = self._open_conn() # first, add an object we will delete dn = "cn=Deleteme,"+self.server.suffix m = l.add_ext( @@ -356,6 +336,7 @@ def test_delete(self): ('cn', 'Deleteme'), ] ) + self.assertEqual(type(m), type(0)) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) @@ -367,10 +348,9 @@ def test_delete(self): self.assertEquals(ctrls, []) def test_modify_no_such_object(self): - l = self._init() + l = self._open_conn() # try deleting an object that doesn't exist - not_found = False m = l.modify_ext( "cn=DoesNotExist,"+self.server.suffix, [ @@ -379,27 +359,35 @@ def test_modify_no_such_object(self): ) try: r = l.result4(m, _ldap.MSG_ALL, self.timeout) - self.fail(r) except _ldap.NO_SUCH_OBJECT: - not_found = True - self.assertTrue(not_found) + pass + else: + self.fail("expected NO_SUCH_OBJECT, got %r" % r) - def DISABLED_test_modify_no_such_object_empty_attrs(self): - # XXX ldif-backend for slapd appears broken??? - l = self._init() - # try deleting an object that doesn't exist + def test_modify_no_such_object_empty_attrs(self): + """ + try deleting an object that doesn't exist + """ + l = self._open_conn() m = l.modify_ext( "cn=DoesNotExist,"+self.server.suffix, [ - (_ldap.MOD_ADD, 'description', []), + (_ldap.MOD_ADD, 'description', ['dummy']), ] ) self.assertTrue(isinstance(m, int)) - r = l.result4(m, _ldap.MSG_ALL, self.timeout) # what should happen?? - self.fail(r) + try: + r = l.result4(m, _ldap.MSG_ALL, self.timeout) + except _ldap.NO_SUCH_OBJECT: + pass + else: + self.fail("expected NO_SUCH_OBJECT, got %r" % r) def test_modify(self): - l = self._init() + """ + test modify operation + """ + l = self._open_conn() # first, add an object we will delete dn = "cn=AddToMe,"+self.server.suffix m = l.add_ext( @@ -411,6 +399,7 @@ def test_modify(self): ('description', 'a description'), ] ) + self.assertEqual(type(m), type(0)) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) @@ -439,7 +428,7 @@ def test_modify(self): self.assertEquals(d, ['a description', 'b desc', 'c desc']) def test_rename(self): - l = self._init() + l = self._open_conn() dn = "cn=RenameMe,"+self.server.suffix m = l.add_ext( dn, @@ -448,6 +437,7 @@ def test_rename(self): ('cn', 'RenameMe'), ] ) + self.assertEqual(type(m), type(0)) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) @@ -538,31 +528,29 @@ def test_rename(self): def test_whoami(self): - l = self._init() + l = self._open_conn() r = l.whoami_s() self.assertEquals("dn:" + self.server.root_dn, r) def test_whoami_unbound(self): - l = self._init(bind=False) + l = self._open_conn(bind=False) l.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION3) r = l.whoami_s() self.assertEquals("", r) def test_whoami_anonymous(self): - l = self._init(bind=False) + l = self._open_conn(bind=False) l.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION3) - # Anonymous bind m = l.simple_bind("", "") result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertTrue(result, _ldap.RES_BIND) - + # check with Who Am I? extended operation r = l.whoami_s() self.assertEquals("", r) def test_passwd(self): - l = self._init() - + l = self._open_conn() # first, create a user to change password on dn = "cn=PasswordTest," + self.server.suffix m = l.add_ext( @@ -574,17 +562,18 @@ def test_passwd(self): ('userPassword', 'initial'), ] ) + self.assertEqual(type(m), type(0)) result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) self.assertEquals(result, _ldap.RES_ADD) - # try changing password with a wrong old-pw m = l.passwd(dn, "bogus", "ignored") + self.assertEqual(type(m), type(0)) try: r = l.result4(m, _ldap.MSG_ALL, self.timeout) - self.fail("expected UNWILLING_TO_PERFORM") except _ldap.UNWILLING_TO_PERFORM: pass - + else: + self.fail("expected UNWILLING_TO_PERFORM, got %r" % r) # try changing password with a correct old-pw m = l.passwd(dn, "initial", "changed") result, pmsg, msgid, ctrls = l.result4(m, _ldap.MSG_ALL, self.timeout) @@ -599,8 +588,10 @@ def test_options(self): try: _ldap.set_option(_ldap.OPT_PROTOCOL_VERSION, "3") - self.fail("expected string value to raise a type error") - except TypeError: pass + except TypeError: + pass + else: + self.fail("expected string value to raise a TypeError") _ldap.set_option(_ldap.OPT_PROTOCOL_VERSION, _ldap.VERSION2) v = _ldap.get_option(_ldap.OPT_PROTOCOL_VERSION) @@ -611,7 +602,7 @@ def test_options(self): finally: _ldap.set_option(_ldap.OPT_PROTOCOL_VERSION, oldval) - l = self._init() + l = self._open_conn() # Try changing some basic options and checking that they took effect @@ -626,9 +617,10 @@ def test_options(self): # Try setting options that will yield a known error. try: _ldap.get_option(_ldap.OPT_MATCHED_DN) - self.fail("expected ValueError") except ValueError: pass + else: + self.fail("expected ValueError") def _require_attr(self, obj, attrname): """Returns true if the attribute exists on the object. @@ -643,24 +635,22 @@ def _require_attr(self, obj, attrname): return False def test_sasl(self): - l = self._init() + l = self._open_conn() if not self._require_attr(l, 'sasl_interactive_bind_s'): # HAVE_SASL return # TODO def test_tls(self): - l = self._init() + l = self._open_conn() if not self._require_attr(l, 'start_tls_s'): # HAVE_TLS return # TODO def test_cancel(self): - l = self._init() + l = self._open_conn() if not self._require_attr(l, 'cancel'): # FEATURE_CANCEL return - def test_str2dn(self): - pass if __name__ == '__main__': unittest.main() diff --git a/Tests/t_search.py b/Tests/t_search.py index 2c5fa7b..a2a6a99 100644 --- a/Tests/t_search.py +++ b/Tests/t_search.py @@ -1,6 +1,6 @@ import os import unittest -from Tests.slapd import SlapdObject +from Tests.slapd import SlapdTestCase # Switch off processing .ldaprc or ldap.conf before importing _ldap os.environ['LDAPNOINIT'] = '1' @@ -8,93 +8,80 @@ import ldap from ldap.ldapobject import LDAPObject -server = None +LDIF_TEMPLATE = """dn: cn=Foo1,%(suffix)s +objectClass: organizationalRole +cn: Foo1 -class TestSearch(unittest.TestCase): +dn: cn=Foo2,%(suffix)s +objectClass: organizationalRole +cn: Foo2 + +dn: cn=Foo3,%(suffix)s +objectClass: organizationalRole +cn: Foo3 + +dn: ou=Container,%(suffix)s +objectClass: organizationalUnit +ou: Container + +dn: cn=Foo4,ou=Container,%(suffix)s +objectClass: organizationalRole +cn: Foo4 + +""" + + +class TestSearch(SlapdTestCase): + + ldap_object_class = LDAPObject + + @classmethod + def setUpClass(cls): + SlapdTestCase.setUpClass() + # insert some Foo* objects via ldapadd + cls.server.ldapadd(LDIF_TEMPLATE % {'suffix':cls.server.suffix}) def setUp(self): - global server - if server is None: - server = SlapdObject() - server.start() - base = server.suffix - - # insert some Foo* objects via ldapadd - server.ldapadd("\n".join([ - "dn: cn=Foo1,"+base, - "objectClass: organizationalRole", - "cn: Foo1", - "", - "dn: cn=Foo2,"+base, - "objectClass: organizationalRole", - "cn: Foo2", - "", - "dn: cn=Foo3,"+base, - "objectClass: organizationalRole", - "cn: Foo3", - "", - "dn: ou=Container,"+base, - "objectClass: organizationalUnit", - "ou: Container", - "", - "dn: cn=Foo4,ou=Container,"+base, - "objectClass: organizationalRole", - "cn: Foo4", - "", - ])+"\n") - - l = LDAPObject(server.ldap_uri) - l.protocol_version = 3 - l.set_option(ldap.OPT_REFERRALS,0) - l.simple_bind_s(server.root_dn, - server.root_pw) - self.ldap = l - self.server = server + try: + self._ldap_conn + except AttributeError: + # open local LDAP connection + self._ldap_conn = self._open_ldap_conn() def test_search_subtree(self): - base = self.server.suffix - l = self.ldap - - result = l.search_s(base, ldap.SCOPE_SUBTREE, '(cn=Foo*)', ['*']) + result = self._ldap_conn.search_s(self.server.suffix, ldap.SCOPE_SUBTREE, '(cn=Foo*)', ['*']) result.sort() self.assertEquals(result, - [('cn=Foo1,'+base, + [('cn=Foo1,'+self.server.suffix, {'cn': ['Foo1'], 'objectClass': ['organizationalRole']}), - ('cn=Foo2,'+base, + ('cn=Foo2,'+self.server.suffix, {'cn': ['Foo2'], 'objectClass': ['organizationalRole']}), - ('cn=Foo3,'+base, + ('cn=Foo3,'+self.server.suffix, {'cn': ['Foo3'], 'objectClass': ['organizationalRole']}), - ('cn=Foo4,ou=Container,'+base, + ('cn=Foo4,ou=Container,'+self.server.suffix, {'cn': ['Foo4'], 'objectClass': ['organizationalRole']}), ] ) def test_search_onelevel(self): - base = self.server.suffix - l = self.ldap - - result = l.search_s(base, ldap.SCOPE_ONELEVEL, '(cn=Foo*)', ['*']) + result = self._ldap_conn.search_s(self.server.suffix, ldap.SCOPE_ONELEVEL, '(cn=Foo*)', ['*']) result.sort() self.assertEquals(result, - [('cn=Foo1,'+base, + [('cn=Foo1,'+self.server.suffix, {'cn': ['Foo1'], 'objectClass': ['organizationalRole']}), - ('cn=Foo2,'+base, + ('cn=Foo2,'+self.server.suffix, {'cn': ['Foo2'], 'objectClass': ['organizationalRole']}), - ('cn=Foo3,'+base, + ('cn=Foo3,'+self.server.suffix, {'cn': ['Foo3'], 'objectClass': ['organizationalRole']}), ] ) def test_search_oneattr(self): - base = self.server.suffix - l = self.ldap - - result = l.search_s(base, ldap.SCOPE_SUBTREE, '(cn=Foo4)', ['cn']) + result = self._ldap_conn.search_s(self.server.suffix, ldap.SCOPE_SUBTREE, '(cn=Foo4)', ['cn']) result.sort() self.assertEquals(result, - [('cn=Foo4,ou=Container,'+base, {'cn': ['Foo4']})] + [('cn=Foo4,ou=Container,'+self.server.suffix, {'cn': ['Foo4']})] ) - if __name__ == '__main__': unittest.main()