diff --git a/CHANGES b/CHANGES index 884c1d0..df35179 100644 --- a/CHANGES +++ b/CHANGES @@ -36,6 +36,7 @@ Modules/ * Fix memory leak in whoami * Fix internal error handling of LDAPControl_to_List() * Fix two memory leaks and release GIL in encode_assertion_control +* Allow set_option() to set timeouts to infinity and, thanks to Michael Ströder: * removed unused code schema.c * moved code from version.c to ldapmodule.c diff --git a/Doc/reference/ldap.rst b/Doc/reference/ldap.rst index c28cdec..1f7ae5b 100644 --- a/Doc/reference/ldap.rst +++ b/Doc/reference/ldap.rst @@ -156,6 +156,9 @@ following option identifiers are defined as constants: .. py:data:: OPT_NETWORK_TIMEOUT + .. versionchanged:: 3.0 + A timeout of ``-1`` resets timeout to infinity. + .. py:data:: OPT_PROTOCOL_VERSION Sets the LDAP protocol version used for a connection. This is mapped to @@ -180,6 +183,9 @@ following option identifiers are defined as constants: .. py:data:: OPT_TIMEOUT + .. versionchanged:: 3.0 + A timeout of ``-1`` resets timeout to infinity. + .. py:data:: OPT_URI .. _ldap-sasl-options: diff --git a/Modules/options.c b/Modules/options.c index ac1eab6..647f859 100644 --- a/Modules/options.c +++ b/Modules/options.c @@ -140,10 +140,20 @@ LDAP_set_option(LDAPObject *self, int option, PyObject *value) if (!PyArg_Parse(value, "d:set_option", &doubleval)) return 0; if (doubleval >= 0) { - set_timeval_from_double( &tv, doubleval ); + set_timeval_from_double( &tv, doubleval ); + ptr = &tv; + } else if (doubleval == -1) { + /* -1 is infinity timeout */ + tv.tv_sec = -1; + tv.tv_usec = 0; ptr = &tv; } else { - ptr = NULL; + PyErr_Format( + PyExc_ValueError, + "timeout must be >= 0 or -1 for infinity, got %d", + option + ); + return 0; } break; case LDAP_OPT_SERVER_CONTROLS: @@ -180,10 +190,9 @@ LDAP_get_option(LDAPObject *self, int option) struct timeval *tv; LDAPAPIInfo apiinfo; LDAPControl **lcs; - LDAPControl *lc; char *strval; - PyObject *extensions, *v, *tup; - Py_ssize_t i, num_extensions, num_controls; + PyObject *extensions, *v; + Py_ssize_t i, num_extensions; LDAP *ld; ld = self ? self->ldap : NULL; @@ -352,27 +361,8 @@ LDAP_get_option(LDAPObject *self, int option) if (res != LDAP_OPT_SUCCESS) return option_error(res, "ldap_get_option"); - if (lcs == NULL) - return PyList_New(0); - - /* Get the number of controls */ - num_controls = 0; - while (lcs[num_controls]) - num_controls++; - - /* We'll build a list of controls, with each control a tuple */ - v = PyList_New(num_controls); - for (i = 0; i < num_controls; i++) { - lc = lcs[i]; - tup = Py_BuildValue("(sbs)", - lc->ldctl_oid, - lc->ldctl_iscritical, - lc->ldctl_value.bv_val); - PyList_SET_ITEM(v, i, tup); - } - + v = LDAPControls_to_List(lcs); ldap_controls_free(lcs); - return v; default: diff --git a/Tests/__init__.py b/Tests/__init__.py index 8ceb63b..d654573 100644 --- a/Tests/__init__.py +++ b/Tests/__init__.py @@ -22,3 +22,4 @@ from . import t_ldap_schema_subentry from . import t_untested_mods from . import t_ldap_controls_libldap +from . import t_ldap_options diff --git a/Tests/t_ldap_options.py b/Tests/t_ldap_options.py new file mode 100644 index 0000000..798ae46 --- /dev/null +++ b/Tests/t_ldap_options.py @@ -0,0 +1,106 @@ +import os +import unittest + +# Switch off processing .ldaprc or ldap.conf before importing _ldap +os.environ['LDAPNOINIT'] = '1' + +import ldap +from ldap.controls import RequestControlTuples +from ldap.controls.pagedresults import SimplePagedResultsControl +from ldap.controls.openldap import SearchNoOpControl +from slapdtest import requires_tls + + +SENTINEL = object() + +TEST_CTRL = RequestControlTuples([ + # with BER data + SimplePagedResultsControl(criticality=0, size=5, cookie=b'cookie'), + # value-less + SearchNoOpControl(criticality=1), +]) +TEST_CTRL_EXPECTED = [ + TEST_CTRL[0], + # get_option returns empty bytes + (TEST_CTRL[1][0], TEST_CTRL[1][1], b''), +] + + +class TestGlobalOptions(unittest.TestCase): + def _check_option(self, option, value, expected=SENTINEL, + nonevalue=None): + old = ldap.get_option(option) + try: + ldap.set_option(option, value) + new = ldap.get_option(option) + if expected is SENTINEL: + self.assertEqual(new, value) + else: + self.assertEqual(new, expected) + finally: + ldap.set_option(option, old if old is not None else nonevalue) + self.assertEqual(ldap.get_option(option), old) + + def test_invalid(self): + with self.assertRaises(ValueError): + ldap.get_option(-1) + with self.assertRaises(ValueError): + ldap.set_option(-1, '') + + def test_timeout(self): + self._check_option(ldap.OPT_TIMEOUT, 0, nonevalue=-1) + self._check_option(ldap.OPT_TIMEOUT, 10.5, nonevalue=-1) + with self.assertRaises(ValueError): + self._check_option(ldap.OPT_TIMEOUT, -5, nonevalue=-1) + with self.assertRaises(TypeError): + ldap.set_option(ldap.OPT_TIMEOUT, object) + + def test_network_timeout(self): + self._check_option(ldap.OPT_NETWORK_TIMEOUT, 0, nonevalue=-1) + self._check_option(ldap.OPT_NETWORK_TIMEOUT, 10.5, nonevalue=-1) + with self.assertRaises(ValueError): + self._check_option(ldap.OPT_NETWORK_TIMEOUT, -5, nonevalue=-1) + + def _test_controls(self, option): + self._check_option(option, []) + self._check_option(option, TEST_CTRL, TEST_CTRL_EXPECTED) + self._check_option(option, tuple(TEST_CTRL), TEST_CTRL_EXPECTED) + with self.assertRaises(TypeError): + ldap.set_option(option, object) + + with self.assertRaises(TypeError): + # must contain a tuple + ldap.set_option(option, [list(TEST_CTRL[0])]) + with self.assertRaises(TypeError): + # data must be bytes or None + ldap.set_option( + option, + [TEST_CTRL[0][0], TEST_CTRL[0][1], u'data'] + ) + + def test_client_controls(self): + self._test_controls(ldap.OPT_CLIENT_CONTROLS) + + def test_server_controls(self): + self._test_controls(ldap.OPT_SERVER_CONTROLS) + + def test_uri(self): + self._check_option(ldap.OPT_URI, "ldapi:///path/to/socket") + with self.assertRaises(TypeError): + ldap.set_option(ldap.OPT_URI, object) + + @requires_tls() + def test_cafile(self): + # None or a distribution or OS-specific path + ldap.get_option(ldap.OPT_X_TLS_CACERTFILE) + + def test_readonly(self): + value = ldap.get_option(ldap.OPT_API_INFO) + self.assertIsInstance(value, dict) + with self.assertRaises(ValueError) as e: + ldap.set_option(ldap.OPT_API_INFO, value) + self.assertIn('read-only', str(e.exception)) + + +if __name__ == '__main__': + unittest.main()