diff --git a/Doc/reference/ldap.rst b/Doc/reference/ldap.rst index 0ce2e418..d9e0c5da 100644 --- a/Doc/reference/ldap.rst +++ b/Doc/reference/ldap.rst @@ -316,6 +316,7 @@ The module defines the following exceptions: is set to a truncated form of the name provided or alias dereferenced for the lowest entry (object or alias) that was matched. + The :py:const:`ctrls` field can be included to the dictionary, which is a list of response controls. .. py:exception:: ADMINLIMIT_EXCEEDED diff --git a/Doc/reference/slapdtest.rst b/Doc/reference/slapdtest.rst index bd54bb69..90c671ad 100644 --- a/Doc/reference/slapdtest.rst +++ b/Doc/reference/slapdtest.rst @@ -26,3 +26,4 @@ Classes .. autoclass:: slapdtest.SlapdTestCase :members: + diff --git a/Lib/slapdtest/_slapdtest.py b/Lib/slapdtest/_slapdtest.py index f1885caf..5b566c35 100644 --- a/Lib/slapdtest/_slapdtest.py +++ b/Lib/slapdtest/_slapdtest.py @@ -27,7 +27,9 @@ # a template string for generating simple slapd.conf file SLAPD_CONF_TEMPLATE = r""" + serverID %(serverid)s +%(moduleload_directives)s moduleload back_%(database)s %(include_directives)s loglevel %(loglevel)s @@ -43,6 +45,8 @@ rootdn "%(rootdn)s" rootpw "%(rootpw)s" +%(overlay_configurations)s + TLSCACertificateFile "%(cafile)s" TLSCertificateFile "%(servercert)s" TLSCertificateKeyFile "%(serverkey)s" @@ -187,6 +191,19 @@ class SlapdObject(object): 'core.schema', ) + #: List (or tuple) of OpenLDAP module names you want to activate. + #: Default is empty. + modules = () + + #: List (or tuple) of OpenLDAP overlay settings you want to include. + #: Default is empty. + #: Each element is a dict of the form of:: + #: + #: {"name": overlay_name, + #: "configuration": configuration_text} + #: + overlays = () + TMPDIR = os.environ.get('TMP', os.getcwd()) if 'SCHEMA' in os.environ: SCHEMADIR = os.environ['SCHEMA'] @@ -331,9 +348,22 @@ def gen_config(self): ) for schema_file in self.openldap_schema_files ) + + moduleload_directives = '\n'.join( + "moduleload {module}".format(module=module) + for module in self.modules + ) + + overlay_configurations = '\n'.join( + "overlay {name}\n{configuration}".format(**overlay) + for overlay in self.overlays + ) + config_dict = { 'serverid': hex(self.server_id), 'schema_prefix':self._schema_prefix, + 'moduleload_directives': moduleload_directives, + 'overlay_configurations': overlay_configurations, 'include_directives': include_directives, 'loglevel': self.slapd_loglevel, 'database': self.database, diff --git a/Modules/LDAPObject.c b/Modules/LDAPObject.c index bc26727e..0d980959 100644 --- a/Modules/LDAPObject.c +++ b/Modules/LDAPObject.c @@ -1162,6 +1162,20 @@ l_ldap_result4(LDAPObject *self, PyObject *args) LDAP_END_ALLOW_THREADS(self); } + if (!(pyctrls = LDAPControls_to_List(serverctrls))) { + int err = LDAP_NO_MEMORY; + + LDAP_BEGIN_ALLOW_THREADS(self); + ldap_set_option(self->ldap, LDAP_OPT_ERROR_NUMBER, &err); + LDAP_END_ALLOW_THREADS(self); + ldap_msgfree(msg); + Py_XDECREF(valuestr); + return LDAPerror(self->ldap, "LDAPControls_to_List"); + } + ldap_controls_free(serverctrls); + + /* Always call Py_XDECREF(pyctrls) before returning after here */ + if (result != LDAP_SUCCESS) { /* result error */ char *e, err[1024]; @@ -1173,21 +1187,11 @@ l_ldap_result4(LDAPObject *self, PyObject *args) e = "ldap_parse_result"; ldap_msgfree(msg); Py_XDECREF(valuestr); - return LDAPerror(self->ldap, e); + retval = LDAPraise_exception(self->ldap, e, pyctrls); + Py_XDECREF(pyctrls); + return retval; } - if (!(pyctrls = LDAPControls_to_List(serverctrls))) { - int err = LDAP_NO_MEMORY; - - LDAP_BEGIN_ALLOW_THREADS(self); - ldap_set_option(self->ldap, LDAP_OPT_ERROR_NUMBER, &err); - LDAP_END_ALLOW_THREADS(self); - ldap_msgfree(msg); - Py_XDECREF(valuestr); - return LDAPerror(self->ldap, "LDAPControls_to_List"); - } - ldap_controls_free(serverctrls); - pmsg = LDAPmessage_to_python(self->ldap, msg, add_ctrls, add_intermediates); diff --git a/Modules/constants.c b/Modules/constants.c index f8da3736..5f8fc321 100644 --- a/Modules/constants.c +++ b/Modules/constants.c @@ -48,7 +48,7 @@ LDAPerr(int errnum) /* Convert an LDAP error into an informative python exception */ PyObject * -LDAPerror(LDAP *l, char *msg) +LDAPraise_exception(LDAP *l, char *msg, PyObject *pyctrls) { if (l == NULL) { PyErr_SetFromErrno(LDAPexception_class); @@ -104,6 +104,10 @@ LDAPerror(LDAP *l, char *msg) ldap_memfree(matched); } + if (pyctrls != NULL) { + PyDict_SetItemString(info, "ctrls", pyctrls); + } + if (errnum == LDAP_REFERRAL) { str = PyUnicode_FromString(msg); if (str) @@ -125,6 +129,17 @@ LDAPerror(LDAP *l, char *msg) } } + +/* Convert an LDAP error into an informative python exception. + This is the convenient function for the case where the exception + doesn't have to include any response controls. */ +PyObject * +LDAPerror(LDAP *l, char *msg) +{ + return LDAPraise_exception(l, msg, NULL); +} + + /* initialise the module constants */ int diff --git a/Modules/constants.h b/Modules/constants.h index 8a390b5b..3b66c1d3 100644 --- a/Modules/constants.h +++ b/Modules/constants.h @@ -12,6 +12,7 @@ extern PyObject *LDAPconstant(int); extern PyObject *LDAPexception_class; extern PyObject *LDAPerror(LDAP *, char *msg); +extern PyObject *LDAPraise_exception(LDAP *, char *msg, PyObject *pyctrls); PyObject *LDAPerr(int errnum); #ifndef LDAP_CONTROL_PAGE_OID diff --git a/Tests/t_ldap_controls_ppolicy.py b/Tests/t_ldap_controls_ppolicy.py index 8644e563..d20ee500 100644 --- a/Tests/t_ldap_controls_ppolicy.py +++ b/Tests/t_ldap_controls_ppolicy.py @@ -9,6 +9,7 @@ PP_GRACEAUTH = b'0\x84\x00\x00\x00\t\xa0\x84\x00\x00\x00\x03\x81\x01\x02' PP_TIMEBEFORE = b'0\x84\x00\x00\x00\t\xa0\x84\x00\x00\x00\x03\x80\x012' +PP_ACCOUNT_LOCKOUT = b'0\x03\x81\x01\x01' class TestControlsPPolicy(unittest.TestCase): @@ -28,6 +29,11 @@ def test_ppolicy_timebefore(self): pp.decodeControlValue(PP_TIMEBEFORE) self.assertPPolicy(pp, timeBeforeExpiration=50) + def test_ppolicy_account_lockout(self): + pp = ppolicy.PasswordPolicyControl() + pp.decodeControlValue(PP_ACCOUNT_LOCKOUT) + self.assertPPolicy(pp, error=1) + if __name__ == '__main__': unittest.main() diff --git a/Tests/t_ldapobject.py b/Tests/t_ldapobject.py index 0619d514..90392258 100644 --- a/Tests/t_ldapobject.py +++ b/Tests/t_ldapobject.py @@ -28,9 +28,10 @@ os.environ['LDAPNOINIT'] = '1' import ldap +import ldap.controls +import ldap.controls.ppolicy from ldap.ldapobject import SimpleLDAPObject, ReconnectLDAPObject - -from slapdtest import SlapdTestCase +from slapdtest import SlapdTestCase, SlapdObject from slapdtest import requires_ldapi, requires_sasl, requires_tls @@ -75,6 +76,110 @@ """ +class PPolicyEnabledSlapdObject(SlapdObject): + """ + A subclass of :py:class:`SlapdObject` with password policy enabled. + Note that this class has no actual password policy configuration entries. + It is the job of the users of this class to define + the default password policies on their own. + The dn of the default is :attr:`.default_ppolicy_dn` of this class. + """ + + openldap_schema_files = ( + 'core.schema', 'ppolicy.schema' + ) + modules = ( + 'ppolicy', + ) + + default_ppolicy_dn = "cn=default-ppolicy,%(suffix)s" % { + 'suffix': SlapdObject.suffix + } + + overlays = ( + { + 'name': 'ppolicy', + 'configuration': "\n".join([ + 'ppolicy_default "{}"'.format(default_ppolicy_dn), + # let slapd tell the clients that they are locked out + 'ppolicy_use_lockout']) + }, + ) + + +class Test02_ResponseControl(SlapdTestCase): + """ + tests abount response controls sent by the server + """ + + ldap_object_class = SimpleLDAPObject + server_class = PPolicyEnabledSlapdObject + + @classmethod + def setUpClass(cls): + super(Test02_ResponseControl, cls).setUpClass() + # insert some Foo* objects via ldapadd + cls.server.ldapadd( + LDIF_TEMPLATE % { + 'suffix': cls.server.suffix, + 'rootdn': cls.server.root_dn, + 'rootcn': cls.server.root_cn, + 'rootpw': cls.server.root_pw, + 'dc': cls.server.suffix.split(',')[0][3:], + } + ) + + # Very strict pwdMaxFailure in order to easily test the cases where + # bind failure with response controls is needed + cls.server.ldapadd( + '''dn: {dn} +objectClass: organizationalRole +objectClass: pwdPolicy +cn: default-ppolicy +pwdAttribute: userPassword +pwdLockout: TRUE +pwdMaxFailure: 1 +pwdLockoutDuration: 60 +pwdFailureCountInterval: 3600'''.format(dn=cls.server.default_ppolicy_dn) + ) + + def test_response_controls_are_attached_to_exceptions(self): + base = self.server.suffix + cn = "test_response_controls_are_attached_to_exceptions" + user_dn = "cn={},{}".format(cn, base) + password = "user5_pw" + + self.server.ldapadd( + '''dn: {dn} +objectClass: applicationProcess +objectClass: simpleSecurityObject +cn: {cn} +userPassword: {password}'''.format(cn=cn, dn=user_dn, password=password) + ) + + ldap_conn = self.ldap_object_class(self.server.ldap_uri) + + # Firstly cause a bind failure to lock out the account + with self.assertRaises(ldap.INVALID_CREDENTIALS) as cm: + wrong_password = 'wrong' + password + ldap_conn.simple_bind_s(user_dn, wrong_password) + + empty_controls = cm.exception.args[0]['ctrls'] + self.assertEqual(len(empty_controls), 0) + + with self.assertRaises(ldap.INVALID_CREDENTIALS) as cm: + ldap_conn.simple_bind_s( + user_dn, password, + serverctrls=[ldap.controls.ppolicy.PasswordPolicyControl()]) + + controls = cm.exception.args[0]['ctrls'] + decoded_controls = ldap.controls.DecodeControlTuples(controls) + self.assertEqual(len(decoded_controls), 1) + pp = decoded_controls[0] + expected_error = ldap.controls.ppolicy.PasswordPolicyError('accountLocked') + self.assertEqual(pp.error, int(expected_error)) + + class Test00_SimpleLDAPObject(SlapdTestCase): """ test LDAP search operations