refactor: Raise exception

instead of returning None if lookup fails
This commit is contained in:
Philipp Hahn 2022-03-15 10:50:26 +01:00
parent 1344a12648
commit 4d040d5b98
2 changed files with 19 additions and 14 deletions

View File

@ -75,10 +75,12 @@ class LdapConfigurator(ConflictChecker):
self.create_machine_secret_file(password)
def modify_old_entry_or_add_machine_to_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str, admin_dn: str) -> None:
if get_machines_ldap_dn(dc_ip, admin_username, admin_pw, admin_dn):
self.modify_machine_in_ldap(password, dc_ip, admin_username, admin_pw, admin_dn)
else:
try:
get_machines_ldap_dn(dc_ip, admin_username, admin_pw, admin_dn)
except LookupError:
self.add_machine_to_ldap(password, dc_ip, admin_username, admin_pw, ldap_base, admin_dn)
else:
self.modify_machine_in_ldap(password, dc_ip, admin_username, admin_pw, admin_dn)
def modify_machine_in_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> None:
userinfo_logger.info('Updating old LDAP entry for this machine on the UCS DC')

View File

@ -68,23 +68,26 @@ def is_samba_dc(admin_username: str, admin_pw: str, dc_ip: str, admin_dn: str) -
return False
def get_machines_ldap_dn(dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> Optional[str]:
def get_machines_ldap_dn(dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> str:
for udm_type in ['computers/ubuntu', 'computers/linux', 'computers/ucc']:
machines_ldap_dn = get_machines_ldap_dn_given_the_udm_type(udm_type, dc_ip, admin_username, admin_pw, admin_dn)
if machines_ldap_dn:
return machines_ldap_dn
return None
try:
return get_machines_ldap_dn_given_the_udm_type(udm_type, dc_ip, admin_username, admin_pw, admin_dn)
except LookupError:
pass
raise LookupError(dc_ip)
def get_machines_udm_type(dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> Optional[str]:
def get_machines_udm_type(dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> str:
for udm_type in ['computers/ubuntu', 'computers/linux', 'computers/ucc']:
machines_ldap_dn = get_machines_ldap_dn_given_the_udm_type(udm_type, dc_ip, admin_username, admin_pw, admin_dn)
if machines_ldap_dn:
try:
get_machines_ldap_dn_given_the_udm_type(udm_type, dc_ip, admin_username, admin_pw, admin_dn)
return udm_type
return None
except LookupError:
pass
raise LookupError(dc_ip)
def get_machines_ldap_dn_given_the_udm_type(udm_type: str, dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> Optional[str]:
def get_machines_ldap_dn_given_the_udm_type(udm_type: str, dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> str:
hostname = gethostname()
udm_command = ['/usr/sbin/udm', udm_type, 'list', '--binddn', admin_dn, '--bindpwdfile', '/dev/shm/%sdomain-join' % (admin_username,), '--filter', 'name=%s' % (hostname,)]
escaped_udm_command = ' '.join([pipes.quote(x) for x in udm_command])
@ -97,4 +100,4 @@ def get_machines_ldap_dn_given_the_udm_type(udm_type: str, dc_ip: str, admin_use
if b"dn:" == line[0:3].lower():
machines_ldap_dn = line[3:].strip()
return machines_ldap_dn.decode()
return None
raise LookupError(hostname)