Skip to content

Commit

Permalink
Merge pull request #90 – Fix set_option() of timeout to handle -1 as …
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Viktorin authored and GitHub committed Dec 4, 2017
2 parents c5ad802 + 2aa8bca commit 5687863
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Modules/
* Fix memory leak in whoami
* Fix internal error handling of LDAPControl_to_List()
* Fix two memory leaks and release GIL in encode_assertion_control
* Allow set_option() to set timeouts to infinity
and, thanks to Michael Ströder:
* removed unused code schema.c
* moved code from version.c to ldapmodule.c
Expand Down
6 changes: 6 additions & 0 deletions Doc/reference/ldap.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ following option identifiers are defined as constants:
.. py:data:: OPT_NETWORK_TIMEOUT
.. versionchanged:: 3.0
A timeout of ``-1`` resets timeout to infinity.

.. py:data:: OPT_PROTOCOL_VERSION
Sets the LDAP protocol version used for a connection. This is mapped to
Expand All @@ -180,6 +183,9 @@ following option identifiers are defined as constants:
.. py:data:: OPT_TIMEOUT
.. versionchanged:: 3.0
A timeout of ``-1`` resets timeout to infinity.

.. py:data:: OPT_URI
.. _ldap-sasl-options:
Expand Down
40 changes: 15 additions & 25 deletions Modules/options.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,20 @@ LDAP_set_option(LDAPObject *self, int option, PyObject *value)
if (!PyArg_Parse(value, "d:set_option", &doubleval))
return 0;
if (doubleval >= 0) {
set_timeval_from_double( &tv, doubleval );
set_timeval_from_double( &tv, doubleval );
ptr = &tv;
} else if (doubleval == -1) {
/* -1 is infinity timeout */
tv.tv_sec = -1;
tv.tv_usec = 0;
ptr = &tv;
} else {
ptr = NULL;
PyErr_Format(
PyExc_ValueError,
"timeout must be >= 0 or -1 for infinity, got %d",
option
);
return 0;
}
break;
case LDAP_OPT_SERVER_CONTROLS:
Expand Down Expand Up @@ -180,10 +190,9 @@ LDAP_get_option(LDAPObject *self, int option)
struct timeval *tv;
LDAPAPIInfo apiinfo;
LDAPControl **lcs;
LDAPControl *lc;
char *strval;
PyObject *extensions, *v, *tup;
Py_ssize_t i, num_extensions, num_controls;
PyObject *extensions, *v;
Py_ssize_t i, num_extensions;
LDAP *ld;

ld = self ? self->ldap : NULL;
Expand Down Expand Up @@ -352,27 +361,8 @@ LDAP_get_option(LDAPObject *self, int option)
if (res != LDAP_OPT_SUCCESS)
return option_error(res, "ldap_get_option");

if (lcs == NULL)
return PyList_New(0);

/* Get the number of controls */
num_controls = 0;
while (lcs[num_controls])
num_controls++;

/* We'll build a list of controls, with each control a tuple */
v = PyList_New(num_controls);
for (i = 0; i < num_controls; i++) {
lc = lcs[i];
tup = Py_BuildValue("(sbs)",
lc->ldctl_oid,
lc->ldctl_iscritical,
lc->ldctl_value.bv_val);
PyList_SET_ITEM(v, i, tup);
}

v = LDAPControls_to_List(lcs);
ldap_controls_free(lcs);

return v;

default:
Expand Down
1 change: 1 addition & 0 deletions Tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@
from . import t_ldap_schema_subentry
from . import t_untested_mods
from . import t_ldap_controls_libldap
from . import t_ldap_options
106 changes: 106 additions & 0 deletions Tests/t_ldap_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import os
import unittest

# Switch off processing .ldaprc or ldap.conf before importing _ldap
os.environ['LDAPNOINIT'] = '1'

import ldap
from ldap.controls import RequestControlTuples
from ldap.controls.pagedresults import SimplePagedResultsControl
from ldap.controls.openldap import SearchNoOpControl
from slapdtest import requires_tls


SENTINEL = object()

TEST_CTRL = RequestControlTuples([
# with BER data
SimplePagedResultsControl(criticality=0, size=5, cookie=b'cookie'),
# value-less
SearchNoOpControl(criticality=1),
])
TEST_CTRL_EXPECTED = [
TEST_CTRL[0],
# get_option returns empty bytes
(TEST_CTRL[1][0], TEST_CTRL[1][1], b''),
]


class TestGlobalOptions(unittest.TestCase):
def _check_option(self, option, value, expected=SENTINEL,
nonevalue=None):
old = ldap.get_option(option)
try:
ldap.set_option(option, value)
new = ldap.get_option(option)
if expected is SENTINEL:
self.assertEqual(new, value)
else:
self.assertEqual(new, expected)
finally:
ldap.set_option(option, old if old is not None else nonevalue)
self.assertEqual(ldap.get_option(option), old)

def test_invalid(self):
with self.assertRaises(ValueError):
ldap.get_option(-1)
with self.assertRaises(ValueError):
ldap.set_option(-1, '')

def test_timeout(self):
self._check_option(ldap.OPT_TIMEOUT, 0, nonevalue=-1)
self._check_option(ldap.OPT_TIMEOUT, 10.5, nonevalue=-1)
with self.assertRaises(ValueError):
self._check_option(ldap.OPT_TIMEOUT, -5, nonevalue=-1)
with self.assertRaises(TypeError):
ldap.set_option(ldap.OPT_TIMEOUT, object)

def test_network_timeout(self):
self._check_option(ldap.OPT_NETWORK_TIMEOUT, 0, nonevalue=-1)
self._check_option(ldap.OPT_NETWORK_TIMEOUT, 10.5, nonevalue=-1)
with self.assertRaises(ValueError):
self._check_option(ldap.OPT_NETWORK_TIMEOUT, -5, nonevalue=-1)

def _test_controls(self, option):
self._check_option(option, [])
self._check_option(option, TEST_CTRL, TEST_CTRL_EXPECTED)
self._check_option(option, tuple(TEST_CTRL), TEST_CTRL_EXPECTED)
with self.assertRaises(TypeError):
ldap.set_option(option, object)

with self.assertRaises(TypeError):
# must contain a tuple
ldap.set_option(option, [list(TEST_CTRL[0])])
with self.assertRaises(TypeError):
# data must be bytes or None
ldap.set_option(
option,
[TEST_CTRL[0][0], TEST_CTRL[0][1], u'data']
)

def test_client_controls(self):
self._test_controls(ldap.OPT_CLIENT_CONTROLS)

def test_server_controls(self):
self._test_controls(ldap.OPT_SERVER_CONTROLS)

def test_uri(self):
self._check_option(ldap.OPT_URI, "ldapi:///path/to/socket")
with self.assertRaises(TypeError):
ldap.set_option(ldap.OPT_URI, object)

@requires_tls()
def test_cafile(self):
# None or a distribution or OS-specific path
ldap.get_option(ldap.OPT_X_TLS_CACERTFILE)

def test_readonly(self):
value = ldap.get_option(ldap.OPT_API_INFO)
self.assertIsInstance(value, dict)
with self.assertRaises(ValueError) as e:
ldap.set_option(ldap.OPT_API_INFO, value)
self.assertIn('read-only', str(e.exception))


if __name__ == '__main__':
unittest.main()

0 comments on commit 5687863

Please sign in to comment.