Skip to content

Commit

Permalink
code-cleaning in slapd.py and t_cext.py and set env var LDAPNOINIT=1
Browse files Browse the repository at this point in the history
  • Loading branch information
stroeder committed Apr 26, 2017
1 parent dbb85b1 commit 43d3dc2
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 224 deletions.
7 changes: 6 additions & 1 deletion CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ Lib/
* gracefully handle KeyError in LDAPObject._ldap_call() when
using errno

Tests/
* code-cleaning in slapd.py and t_cext.py
* set env var LDAPNOINIT=1 in t_cext.py to avoid interference
with locally installed .ldaprc or ldap.conf

----------------------------------------------------------------
Released 2.4.35 2017-04-25

Expand Down Expand Up @@ -1383,4 +1388,4 @@ Released 2.0.0pre02 2002-02-01
----------------------------------------------------------------
Released 1.10alpha3 2000-09-19

$Id: CHANGES,v 1.424 2017/04/25 17:50:33 stroeder Exp $
$Id: CHANGES,v 1.425 2017/04/26 08:58:06 stroeder Exp $
216 changes: 110 additions & 106 deletions Tests/slapd.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
and talking to it with ldapsearch/ldapadd.
"""

import sys, os, socket, time, subprocess, logging
import sys
import os
import socket
import time
import subprocess
import logging
import base64

_log = logging.getLogger("slapd")

def quote(s):
'''Quotes the '"' and '\' characters in a string and surrounds with "..."'''
return '"' + s.replace('\\','\\\\').replace('"','\\"') + '"'
return '"%s"' % s.replace('\\', '\\\\').replace('"', '\\"')

def mkdirs(path):
"""Creates the directory path unless it already exists"""
Expand All @@ -20,7 +26,7 @@ def mkdirs(path):
return path

def delete_directory_content(path):
for dirpath,dirnames,filenames in os.walk(path, topdown=False):
for dirpath, dirnames, filenames in os.walk(path, topdown=False):
for n in filenames:
_log.info("remove %s", os.path.join(dirpath, n))
os.remove(os.path.join(dirpath, n))
Expand Down Expand Up @@ -50,6 +56,11 @@ class Slapd:
server is shut down.
"""

suffix = "dc=slapd-test,dc=python-ldap,dc=org"
root_cn = "Manager"
root_dn = "cn=%s,%s" % (root_cn, suffix)
root_password = "password"

_log = logging.getLogger("Slapd")

# Use /var/tmp to placate apparmour on Ubuntu:
Expand All @@ -62,18 +73,16 @@ class Slapd:
PATH_SLAPD = os.path.join(PATH_SBINDIR, "slapd")
PATH_SLAPTEST = os.path.join(PATH_SBINDIR, "slaptest")

# TODO add paths for other OSs

def check_paths(cls):
"""
Checks that the configured executable paths look valid.
If they don't, then logs warning messages (not errors).
"""
for name,path in (
for name, path in (
("slapd", cls.PATH_SLAPD),
("ldapadd", cls.PATH_LDAPADD),
("ldapsearch", cls.PATH_LDAPSEARCH),
):
):
cls._log.debug("checking %s executable at %s", name, path)
if not os.access(path, os.X_OK):
cls._log.warn("cannot find %s executable at %s", name, path)
Expand All @@ -84,43 +93,19 @@ def __init__(self):
self._proc = None
self._port = 0
self._tmpdir = self.PATH_TMPDIR
self._dn_suffix = "dc=python-ldap,dc=org"
self._root_cn = "Manager"
self._root_password = "password"
self._slapd_debug_level = 0

# Setters
def set_port(self, port):
self._port = port
def set_dn_suffix(self, dn):
self._dn_suffix = dn
def set_root_cn(self, cn):
self._root_cn = cn
def set_root_password(self, pw):
self._root_password = pw
def set_tmpdir(self, path):
self._tmpdir = path
def set_slapd_debug_level(self, level):
self._slapd_debug_level = level
def set_debug(self):
self._log.setLevel(logging.DEBUG)
self.set_slapd_debug_level('Any')

# getters
def get_url(self):
return "ldap://%s:%d/" % self.get_address()

def get_address(self):
if self._port == 0:
self._port = find_available_tcp_port(LOCALHOST)
return (LOCALHOST, self._port)
def get_dn_suffix(self):
return self._dn_suffix
def get_root_dn(self):
return "cn=" + self._root_cn + "," + self.get_dn_suffix()
def get_root_password(self):
return self._root_password

def get_tmpdir(self):
return os.environ.get('TMP',self._tmpdir)
return os.environ.get('TMP', self._tmpdir)

def __del__(self):
self.stop()
Expand All @@ -142,14 +127,14 @@ def configure(self, cfg):
cfg.append("database ldif")
cfg.append("directory " + quote(ldif_dir))

cfg.append("suffix " + quote(self.get_dn_suffix()))
cfg.append("rootdn " + quote(self.get_root_dn()))
cfg.append("rootpw " + quote(self.get_root_password()))
cfg.append("suffix " + quote(self.suffix))
cfg.append("rootdn " + quote(self.root_dn))
cfg.append("rootpw " + quote(self.root_password))

def _write_config(self):
"""Writes the slapd.conf file out, and returns the path to it."""
path = os.path.join(self._tmpdir, "slapd.conf")
ldif_dir = mkdirs(self._tmpdir)
_ = mkdirs(self._tmpdir)
if os.access(path, os.F_OK):
self._log.debug("deleting existing %s", path)
os.remove(path)
Expand Down Expand Up @@ -184,11 +169,12 @@ def _start_slapd(self):
# Spawns/forks the slapd process
config_path = self._write_config()
self._log.info("starting slapd")
self._proc = subprocess.Popen([self.PATH_SLAPD,
"-f", config_path,
"-h", self.get_url(),
"-d", str(self._slapd_debug_level),
])
self._proc = subprocess.Popen([
self.PATH_SLAPD,
"-f", config_path,
"-h", self.get_url(),
"-d", str(self._slapd_debug_level),
])
self._proc_config = config_path

def _wait_for_slapd(self):
Expand All @@ -199,12 +185,12 @@ def _wait_for_slapd(self):
self._stopped()
raise RuntimeError("slapd exited before opening port")
try:
self._log.debug("Connecting to %s", repr(self.get_address()))
s.connect(self.get_address())
s.close()
return
self._log.debug("Connecting to %s", repr(self.get_address()))
s.connect(self.get_address())
s.close()
return
except socket.error:
time.sleep(1)
time.sleep(1)

def stop(self):
"""Stops the slapd server, and waits for it to terminate"""
Expand All @@ -213,11 +199,9 @@ def stop(self):
if hasattr(self._proc, 'terminate'):
self._proc.terminate()
else:
import posix, signal
posix.kill(self._proc.pid, signal.SIGHUP)
#time.sleep(1)
#posix.kill(self._proc.pid, signal.SIGTERM)
#posix.kill(self._proc.pid, signal.SIGKILL)
import posix
import signal
posix.kill(self._proc.pid, signal.SIGTERM)
self.wait()

def restart(self):
Expand Down Expand Up @@ -262,33 +246,46 @@ def _test_configuration(self):
finally:
os.remove(config_path)

def ldapadd(self, ldif, extra_args=[]):
def ldapadd(self, ldif, extra_args=None):
"""Runs ldapadd on this slapd instance, passing it the ldif content"""
extra_args = extra_args or []
self._log.debug("adding %s", repr(ldif))
p = subprocess.Popen([self.PATH_LDAPADD,
p = subprocess.Popen(
[
self.PATH_LDAPADD,
"-x",
"-D", self.get_root_dn(),
"-w", self.get_root_password(),
"-H", self.get_url()] + extra_args,
stdin = subprocess.PIPE, stdout=subprocess.PIPE)
"-D", self.root_dn,
"-w", self.root_password,
"-H", self.get_url()
] + extra_args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
)
p.communicate(ldif)
if p.wait() != 0:
raise RuntimeError("ldapadd process failed")

def ldapsearch(self, base=None, filter='(objectClass=*)', attrs=[],
scope='sub', extra_args=[]):
if base is None: base = self.get_dn_suffix()
self._log.debug("ldapsearch filter=%s", repr(filter))
p = subprocess.Popen([self.PATH_LDAPSEARCH,
def ldapsearch(
self, base=None, filterstr='(objectClass=*)', attrs=None,
scope='sub', extra_args=None
):
attrs = attrs or []
extra_args = extra_args or []
if base is None:
base = self.suffix
self._log.debug("ldapsearch filterstr=%s", repr(filterstr))
p = subprocess.Popen(
[
self.PATH_LDAPSEARCH,
"-x",
"-D", self.get_root_dn(),
"-w", self.get_root_password(),
"-D", self.root_dn,
"-w", self.root_password,
"-H", self.get_url(),
"-b", base,
"-s", scope,
"-LL",
] + extra_args + [ filter ] + attrs,
stdout = subprocess.PIPE)
]+extra_args+[filterstr]+attrs,
stdout=subprocess.PIPE,
)
output = p.communicate()[0]
if p.wait() != 0:
raise RuntimeError("ldapadd process failed")
Expand All @@ -307,14 +304,17 @@ def ldapsearch(self, base=None, filter='(objectClass=*)', attrs=[],
lines = [l for l in lines if not l.startswith("#")]

# Remove leading version and blank line(s)
if lines and lines[0] == '': del lines[0]
if lines and lines[0] == '':
del lines[0]
if not lines or lines[0] != 'version: 1':
raise RuntimeError("expected 'version: 1', got " + repr(lines[:1]))
del lines[0]
if lines and lines[0] == '': del lines[0]
if lines and lines[0] == '':
del lines[0]

# ensure the ldif ends with a blank line (unless it is just blank)
if lines and lines[-1] != '': lines.append('')
if lines and lines[-1] != '':
lines.append('')

objects = []
obj = []
Expand All @@ -325,14 +325,14 @@ def ldapsearch(self, base=None, filter='(objectClass=*)', attrs=[],
objects.append((obj[0][1], obj[1:]))
obj = []
else:
attr,value = line.split(':',2)
attr, value = line.split(':', 2)
if value.startswith(': '):
value = base64.decodestring(value[2:])
elif value.startswith(' '):
value = value[1:]
else:
raise RuntimeError("bad line: " + repr(line))
obj.append((attr,value))
obj.append((attr, value))
assert obj == []
return objects

Expand All @@ -342,42 +342,46 @@ def started(self):
By default, this method adds the two initial objects,
the domain object and the root user object.
"""
assert self.get_dn_suffix().startswith("dc=")
suffix_dc = self.get_dn_suffix().split(',')[0][3:]
assert self.get_root_dn().startswith("cn=")
assert self.get_root_dn().endswith("," + self.get_dn_suffix())
root_cn = self.get_root_dn().split(',')[0][3:]

self._log.debug("adding %s and %s",
self.get_dn_suffix(),
self.get_root_dn())

self.ldapadd("\n".join([
'dn: ' + self.get_dn_suffix(),
'objectClass: dcObject',
'objectClass: organization',
'dc: ' + suffix_dc,
'o: ' + suffix_dc,
'',
'dn: ' + self.get_root_dn(),
'objectClass: organizationalRole',
'cn: ' + root_cn,
''
]))

Slapd.check_paths()
assert self.suffix.startswith("dc=")
suffix_dc = self.suffix.split(',')[0][3:]
assert self.root_dn.startswith("cn=")
assert self.root_dn.endswith("," + self.suffix)

self._log.debug(
"adding %s and %s",
self.suffix,
self.root_dn,
)

self.ldapadd(
"\n".join([
'dn: '+self.suffix,
'objectClass: dcObject',
'objectClass: organization',
'dc: '+suffix_dc,
'o: '+suffix_dc,
'',
'dn: '+self.root_dn,
'objectClass: organizationalRole',
'cn: '+self.root_cn,
''
])
)

if __name__ == '__main__' and sys.argv == ['run']:
def run():
logging.basicConfig(level=logging.DEBUG)
slapd = Slapd()
print("Starting slapd...")
_log.debug("Starting slapd...")
slapd.start()
print("Contents of LDAP server follow:\n")
for dn,attrs in slapd.ldapsearch():
print("dn: " + dn)
for name,val in attrs:
print(name + ": " + val)
print("")
print(slapd.get_url())
_log.debug("Contents of LDAP server follow:\n")
for dn, attrs in slapd.ldapsearch():
_log.debug("dn: " + dn)
for name, val in attrs:
_log.debug("%r: %r", name, val)
_log.debug('slapd URL: %r', slapd.get_url())
slapd.wait()

Slapd.check_paths()

if __name__ == '__main__' and sys.argv == ['run']:
run()
Loading

0 comments on commit 43d3dc2

Please sign in to comment.