doc: Add PEP-484 type hints

This commit is contained in:
Philipp Hahn 2022-03-09 09:40:28 +01:00
parent cc570eed1d
commit 4a56539630
15 changed files with 167 additions and 138 deletions

View File

@ -36,22 +36,25 @@ import os
import subprocess
import sys
from getpass import getpass
from typing import Dict, Optional
from univention_domain_join.utils.distributions import get_distribution
from univention_domain_join.utils.domain import get_master_ip_through_dns, get_ucs_domainname
from univention_domain_join.utils.general import execute_as_root
OUTPUT_SINK = open(os.devnull, 'w')
userinfo_logger: Optional[logging.Logger] = None
debugging_logger: Optional[logging.Logger] = None
def check_if_run_as_root():
def check_if_run_as_root() -> None:
if os.getuid() != 0:
print('This tool must be executed as root.')
exit(1)
@execute_as_root
def set_up_logging():
def set_up_logging() -> None:
global userinfo_logger
global debugging_logger
@ -78,7 +81,7 @@ def set_up_logging():
debugging_logger.addHandler(logfile_handler)
def get_joiner_for_this_distribution(dc_ip, admin_username, admin_pw, skip_login_manager, force_ucs_dns):
def get_joiner_for_this_distribution(dc_ip: str, admin_username: str, admin_pw: str, skip_login_manager: bool, force_ucs_dns: bool) -> object:
distribution = get_distribution()
try:
distribution_join_module = importlib.import_module('univention_domain_join.distributions.%s' % (distribution.lower(),))
@ -94,17 +97,17 @@ def get_joiner_for_this_distribution(dc_ip, admin_username, admin_pw, skip_login
exit(1)
def get_admin_username():
def get_admin_username() -> str:
return input('Please enter the user name of a domain administrator: ')
def get_admin_password(admin_username):
def get_admin_password(admin_username: str) -> str:
# TODO: Don't ask for the password if ssh works passwordless already.
return getpass(prompt='Please enter the password for %s: ' % (admin_username,))
@execute_as_root
def check_if_ssh_works_with_given_account(dc_ip, admin_username, admin_pw):
def check_if_ssh_works_with_given_account(dc_ip: str, admin_username: str, admin_pw: str) -> None:
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), 'echo foo'],
stdin=subprocess.PIPE, stdout=OUTPUT_SINK, stderr=OUTPUT_SINK
@ -116,7 +119,7 @@ def check_if_ssh_works_with_given_account(dc_ip, admin_username, admin_pw):
@execute_as_root
def get_ucr_variables_from_dc(dc_ip, admin_username, admin_pw):
def get_ucr_variables_from_dc(dc_ip: str, admin_username: str, admin_pw: str) -> Dict[str, str]:
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), '/usr/sbin/ucr shell | grep -v ^hostname='],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE

View File

@ -34,19 +34,21 @@ import logging
import os
import subprocess
import sys
from typing import Dict, Optional, Tuple
from PyQt5.QtCore import QRegExp, QThread, pyqtSignal
from PyQt5.QtGui import QFontMetrics, QIcon, QPixmap, QRegExpValidator
from PyQt5.QtWidgets import QAction, QApplication, QCheckBox, QFrame, QHBoxLayout, QLabel, QLineEdit, QMainWindow, QMenuBar, QMessageBox, QPushButton, QVBoxLayout, QWidget
from PyQt5.QtWidgets import QAction, QApplication, QBoxLayout, QCheckBox, QFrame, QHBoxLayout, QLabel, QLineEdit, QMainWindow, QMenuBar, QMessageBox, QPushButton, QVBoxLayout, QWidget
from univention_domain_join.utils.distributions import get_distribution
from univention_domain_join.utils.domain import get_master_ip_through_dns, get_ucs_domainname
from univention_domain_join.utils.general import execute_as_root
OUTPUT_SINK = open(os.devnull, 'w')
userinfo_logger: Optional[logging.Logger] = None
def check_if_run_as_root():
def check_if_run_as_root() -> None:
if os.getuid() != 0:
app = QApplication(sys.argv)
form = NotRootDialog()
@ -55,7 +57,7 @@ def check_if_run_as_root():
@execute_as_root
def set_up_logging():
def set_up_logging() -> None:
global userinfo_logger
verbose_formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')
@ -90,7 +92,7 @@ class DistributionException(Exception):
class NotRootDialog(QMessageBox):
def __init__(self):
def __init__(self) -> None:
super(self.__class__, self).__init__()
self.setWindowTitle('Univention Domain Join')
scriptDir = os.path.dirname(os.path.realpath(__file__))
@ -100,7 +102,7 @@ class NotRootDialog(QMessageBox):
class DomainJoinGui(QMainWindow):
def __init__(self):
def __init__(self) -> None:
super(self.__class__, self).__init__()
self.regex_ipv4 = r'(25[0-5]|2[0-4][0-9]|1?[0-9]{1,2})(\.(25[0-5]|2[0-4][0-9]|1?[0-9]{1,2})){3}'
@ -119,7 +121,7 @@ class DomainJoinGui(QMainWindow):
self.setTabOrder(self.domainname_or_ip_input, self. admin_username_input)
self.setTabOrder(self.admin_username_input, self.admin_password_input)
def build_main_window(self):
def build_main_window(self) -> None:
main_layout = QVBoxLayout()
self.add_menu_bar()
@ -134,7 +136,7 @@ class DomainJoinGui(QMainWindow):
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
def add_menu_bar(self):
def add_menu_bar(self) -> None:
menu_bar = QMenuBar(self)
help_menu = menu_bar.addMenu('Help')
@ -144,7 +146,7 @@ class DomainJoinGui(QMainWindow):
self.setMenuBar(menu_bar)
def about(self):
def about(self) -> None:
self.about_dialog = QMessageBox.about(
self, 'About',
'<h1>Univention Domain Join</h1>'
@ -155,7 +157,7 @@ class DomainJoinGui(QMainWindow):
'<p>Copyright: <a href="https://www.univention.com">Univention GmbH</a></p>'
)
def add_general_description_group(self, layout):
def add_general_description_group(self, layout: QBoxLayout) -> None:
description_group = QWidget()
description_group_layout = QHBoxLayout()
self.add_general_description(description_group_layout)
@ -164,7 +166,7 @@ class DomainJoinGui(QMainWindow):
description_group.setLayout(description_group_layout)
layout.addWidget(description_group)
def add_general_description(self, layout):
def add_general_description(self, layout: QBoxLayout) -> None:
short_description = QLabel(
'<h3>Univention Domain Join Assistant</h3>'
'<p>Use this tool to configure this computer to be part of your UCS domain.</p>'
@ -174,18 +176,18 @@ class DomainJoinGui(QMainWindow):
short_description.setWordWrap(True)
layout.addWidget(short_description, stretch=10)
def add_domain_join_icon(self, layout):
def add_domain_join_icon(self, layout: QBoxLayout) -> None:
icon = QLabel()
scriptDir = os.path.dirname(os.path.realpath(__file__))
icon.setPixmap(QPixmap(scriptDir + os.path.sep + 'domain.png'))
layout.addWidget(icon)
def add_hline(self, layout):
def add_hline(self, layout: QBoxLayout) -> None:
frame = QFrame()
frame.setFrameShape(QFrame.HLine)
layout.addWidget(frame)
def add_inputs_group(self, layout):
def add_inputs_group(self, layout: QBoxLayout) -> None:
inputs_box = QWidget()
inputs_box_layout = QVBoxLayout()
self.add_inputs_description(inputs_box_layout)
@ -196,7 +198,7 @@ class DomainJoinGui(QMainWindow):
inputs_box.setLayout(inputs_box_layout)
layout.addWidget(inputs_box)
def add_inputs_description(self, layout):
def add_inputs_description(self, layout: QBoxLayout) -> None:
inputs_description = QLabel(
'To perform the domain join you need to provide the domain name or '
'the IP address of an UCS DC and the credentials of a domain administrator.'
@ -204,7 +206,7 @@ class DomainJoinGui(QMainWindow):
inputs_description.setWordWrap(True)
layout.addWidget(inputs_description)
def add_domainname_or_ip_input(self, layout):
def add_domainname_or_ip_input(self, layout: QBoxLayout) -> None:
short_description = QLabel('Domain name or IP address:')
short_description.setWordWrap(True)
layout.addWidget(short_description)
@ -219,14 +221,14 @@ class DomainJoinGui(QMainWindow):
layout.addWidget(self.domainname_or_ip_input)
self.try_filling_in_domainname()
def try_filling_in_domainname(self):
def try_filling_in_domainname(self) -> None:
self.domainname_thread = DomainnameDetectionThread()
self.domainname_or_ip_input.setPlaceholderText('Detecting domain name...')
self.domainname_thread.domain['QString'].connect(self.domainname_detection_successful)
self.domainname_thread.finished.connect(self.domainname_detection_finished)
self.domainname_thread.start()
def domainname_detection_successful(self, domainname):
def domainname_detection_successful(self, domainname: str) -> None:
domainname_qregex = QRegExp(self.regex_domainname)
# self.domainname_or_ip_input.text() is used to make sure the user
# didn't fill in the field already.
@ -234,10 +236,10 @@ class DomainJoinGui(QMainWindow):
self.domainname_or_ip_input.setText(domainname)
self.admin_password_input.setFocus()
def domainname_detection_finished(self):
def domainname_detection_finished(self) -> None:
self.domainname_or_ip_input.setPlaceholderText('e.g. mydomain.com or 10.0.0.4')
def add_username_input(self, layout):
def add_username_input(self, layout: QBoxLayout) -> None:
short_description = QLabel('Domain administrator\'s username:')
short_description.setWordWrap(True)
layout.addWidget(short_description)
@ -250,14 +252,14 @@ class DomainJoinGui(QMainWindow):
self.admin_username_input.setValidator(username_validator)
layout.addWidget(self.admin_username_input)
def add_force_ucs_dns(self, layout):
def add_force_ucs_dns(self, layout: QBoxLayout) -> None:
short_description = QLabel('Change the system\'s DNS settings and set the UCS DC as DNS nameserver (default is to use the standard network settings, but make sure the your system can resolve the hostname of the UCS DC and the UCS master system)')
short_description.setWordWrap(True)
layout.addWidget(short_description)
self.force_ucs_dns_input = QCheckBox('Set UCS DC as DNS server')
layout.addWidget(self.force_ucs_dns_input)
def add_password_input(self, layout):
def add_password_input(self, layout: QBoxLayout) -> None:
short_description = QLabel('Domain administrator\'s password:')
short_description.setWordWrap(True)
layout.addWidget(short_description)
@ -271,7 +273,7 @@ class DomainJoinGui(QMainWindow):
self.admin_password_input.setValidator(password_validator)
layout.addWidget(self.admin_password_input)
def add_buttons(self, layout):
def add_buttons(self, layout: QBoxLayout) -> None:
button_widget = QWidget()
button_layout = QHBoxLayout()
button_layout.addStretch()
@ -285,7 +287,7 @@ class DomainJoinGui(QMainWindow):
button_widget.setLayout(button_layout)
layout.addWidget(button_widget)
def join_domain_if_inputs_are_ok(self):
def join_domain_if_inputs_are_ok(self) -> None:
if (
self.domainname_or_ip_input.hasAcceptableInput() and
self.admin_username_input.hasAcceptableInput() and
@ -303,7 +305,7 @@ class DomainJoinGui(QMainWindow):
self.missing_inputs_dialog = MissingInputsDialog()
self.missing_inputs_dialog.exec_()
def get_domainname_or_dc_ip(self):
def get_domainname_or_dc_ip(self) -> Tuple[Optional[str], Optional[str]]:
input_text = self.domainname_or_ip_input.text()
domainname_qregex = QRegExp(self.regex_domainname)
ip_qregex = QRegExp('%s|%s' % (self.regex_ipv4, self.regex_ipv6))
@ -314,7 +316,7 @@ class DomainJoinGui(QMainWindow):
else:
return None, None
def join_domain(self, dc_ip, admin_username, admin_pw, force_ucs_dns):
def join_domain(self, dc_ip: str, admin_username: str, admin_pw: str, force_ucs_dns: bool) -> None:
self.join_thread = JoinThread(dc_ip, admin_username, admin_pw, force_ucs_dns)
self.join_thread.join_started.connect(self.join_started)
self.join_thread.join_successful.connect(self.join_successful)
@ -323,33 +325,33 @@ class DomainJoinGui(QMainWindow):
self.join_thread.dist_failed.connect(self.dist_failed)
self.join_thread.start()
def join_started(self):
def join_started(self) -> None:
self.join_button.setText('Joining...')
self.join_button.setEnabled(False)
self.cancel_button.setEnabled(False)
def join_successful(self):
def join_successful(self) -> None:
self.join_button.setText('Join')
self.cancel_button.setText('Close')
self.cancel_button.setEnabled(True)
self.successful_join_dialog = SuccessfulJoinDialog()
self.successful_join_dialog.exec_()
def join_failed(self, err):
def join_failed(self, err: str) -> None:
self.join_button.setText('Join')
self.join_button.setEnabled(True)
self.cancel_button.setEnabled(True)
self.successful_join_dialog = FailedJoinDialog(err)
self.successful_join_dialog.exec_()
def ssh_failed(self):
def ssh_failed(self) -> None:
self.join_button.setText('Join')
self.join_button.setEnabled(True)
self.cancel_button.setEnabled(True)
self.successful_join_dialog = FailedSSHDialog()
self.successful_join_dialog.exec_()
def dist_failed(self):
def dist_failed(self) -> None:
self.join_button.setText('Join')
self.join_button.setEnabled(False)
self.cancel_button.setEnabled(True)
@ -358,7 +360,7 @@ class DomainJoinGui(QMainWindow):
class SuccessfulJoinDialog(QMessageBox):
def __init__(self):
def __init__(self) -> None:
super(self.__class__, self).__init__()
self.setWindowTitle('Successful Join')
scriptDir = os.path.dirname(os.path.realpath(__file__))
@ -367,7 +369,7 @@ class SuccessfulJoinDialog(QMessageBox):
class FailedJoinDialog(QMessageBox):
def __init__(self, err):
def __init__(self, err: str) -> None:
super(self.__class__, self).__init__()
self.setWindowTitle('Failed Join')
scriptDir = os.path.dirname(os.path.realpath(__file__))
@ -379,7 +381,7 @@ class FailedJoinDialog(QMessageBox):
class FailedSSHDialog(QMessageBox):
def __init__(self):
def __init__(self) -> None:
super(self.__class__, self).__init__()
self.setWindowTitle('SSH Connection Failed')
scriptDir = os.path.dirname(os.path.realpath(__file__))
@ -390,7 +392,7 @@ class FailedSSHDialog(QMessageBox):
class FailedDistDialog(QMessageBox):
def __init__(self):
def __init__(self) -> None:
super(self.__class__, self).__init__()
self.setWindowTitle('Distribution Check Failed')
scriptDir = os.path.dirname(os.path.realpath(__file__))
@ -401,7 +403,7 @@ class FailedDistDialog(QMessageBox):
class MissingInputsDialog(QMessageBox):
def __init__(self):
def __init__(self) -> None:
super(self.__class__, self).__init__()
self.setWindowTitle('Missing Inputs')
scriptDir = os.path.dirname(os.path.realpath(__file__))
@ -413,7 +415,7 @@ class MissingInputsDialog(QMessageBox):
class DnsNotWorkingDialog(QMessageBox):
def __init__(self):
def __init__(self) -> None:
super(self.__class__, self).__init__()
self.setWindowTitle('DNS Not Working')
scriptDir = os.path.dirname(os.path.realpath(__file__))
@ -428,7 +430,7 @@ class DnsNotWorkingDialog(QMessageBox):
class DomainnameDetectionThread(QThread):
domain = pyqtSignal('QString')
def run(self):
def run(self) -> None:
domainname = get_ucs_domainname()
if domainname:
self.domain.emit(domainname)
@ -441,14 +443,14 @@ class JoinThread(QThread):
join_failed = pyqtSignal('QString')
join_successful = pyqtSignal()
def __init__(self, dc_ip, admin_username, admin_pw, force_ucs_dns):
def __init__(self, dc_ip: str, admin_username: str, admin_pw: str, force_ucs_dns: bool) -> None:
super(self.__class__, self).__init__()
self.dc_ip = dc_ip
self.admin_username = admin_username
self.admin_pw = admin_pw
self.force_ucs_dns = force_ucs_dns
def run(self):
def run(self) -> None:
self.join_started.emit()
try:
try:
@ -468,7 +470,7 @@ class JoinThread(QThread):
return
self.join_successful.emit()
def get_joiner_for_this_distribution(self):
def get_joiner_for_this_distribution(self) -> object:
distribution = get_distribution()
try:
distribution_join_module = importlib.import_module('univention_domain_join.distributions.%s' % (distribution.lower(),))
@ -482,7 +484,7 @@ class JoinThread(QThread):
raise DistributionException()
@execute_as_root
def check_if_ssh_works_with_given_account(self):
def check_if_ssh_works_with_given_account(self) -> bool:
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (self.admin_username, self.dc_ip), 'echo foo'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
@ -497,7 +499,7 @@ class JoinThread(QThread):
return True
@execute_as_root
def get_ucr_variables_from_dc(self):
def get_ucr_variables_from_dc(self) -> Optional[Dict[str, str]]:
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (self.admin_username, self.dc_ip), '/usr/sbin/ucr shell | grep -v ^hostname='],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE

View File

@ -53,3 +53,20 @@ line_length = 220
multi_line_output = 3
include_trailing_comma = 1
src_paths = univention_domain_join
[mypy]
warn_unused_configs = True
disallow_untyped_defs = True
disallow_incomplete_defs = True
no_implicit_optional = True
exclude = (?x)(build|dist|venv|setup\.py)
files = .
[mypy-ldap.*]
ignore_missing_imports = True
[mypy-IPy]
ignore_missing_imports = True
[mypy-netifaces]
ignore_missing_imports = True

View File

@ -32,6 +32,7 @@
import logging
import os
import time
from typing import Dict
from univention_domain_join.join_steps.dns_configurator import DnsConfigurator
from univention_domain_join.join_steps.kerberos_configurator import KerberosConfigurator
@ -54,7 +55,7 @@ class DcResolveException(Exception):
class Joiner(object):
def __init__(self, ucr_variables, admin_username, admin_pw, dc_ip, skip_login_manager, force_ucs_dns):
def __init__(self, ucr_variables: Dict[str, str], admin_username: str, admin_pw: str, dc_ip: str, skip_login_manager: bool, force_ucs_dns: bool) -> None:
self.admin_username = admin_username
self.admin_pw = admin_pw
self.dc_ip = dc_ip
@ -71,7 +72,7 @@ class Joiner(object):
self.ldap_server_name = ucr_variables['ldap_server_name']
self.kerberos_realm = ucr_variables['kerberos_realm']
def check_if_join_is_possible_without_problems(self):
def check_if_join_is_possible_without_problems(self) -> None:
if not self.skip_login_manager and LoginManagerConfigurator().configuration_conflicts():
userinfo_logger.critical(
'Joining the UCS domain is not safely possible.\n'
@ -79,7 +80,7 @@ class Joiner(object):
)
raise DomainJoinException()
def create_backup_of_config_files(self):
def create_backup_of_config_files(self) -> None:
backup_dir = self.create_backup_dir()
if self.force_ucs_dns:
DnsConfigurator(self.nameservers, self.domain).backup(backup_dir)
@ -92,12 +93,12 @@ class Joiner(object):
userinfo_logger.info('Created a backup of all configuration files, that will be modified at \'%s\'.' % backup_dir)
@execute_as_root
def create_backup_dir(self):
def create_backup_dir(self) -> str:
backup_dir = os.path.join('/var/univention-backup', time.strftime("%Y%m%d%H%M%S_domain-join", time.gmtime()))
os.makedirs(backup_dir)
return backup_dir
def join_domain(self):
def join_domain(self) -> None:
try:
if self.force_ucs_dns:
userinfo_logger.info('changing network/dns configuration as requested.')

View File

@ -34,6 +34,7 @@ import logging
import os
import subprocess
from shutil import copyfile
from typing import List
import dns.resolver
@ -47,7 +48,7 @@ class DnsConfigurationException(Exception):
class DnsConfigurator(object):
def __init__(self, nameservers, domain):
def __init__(self, nameservers: List[str], domain: str) -> None:
self.nameservers = nameservers
self.domain = domain
@ -69,11 +70,11 @@ class DnsConfigurator(object):
else:
self.working_configurator = DnsConfiguratorTrusty()
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
self.working_configurator.backup(backup_dir)
@execute_as_root
def configure_dns(self):
def configure_dns(self) -> None:
self.working_configurator.configure_dns(self.nameservers, self.domain)
if self.domain.endswith('.local'):
subprocess.check_call([
@ -83,7 +84,7 @@ class DnsConfigurator(object):
], close_fds=True)
self.check_if_dns_works()
def check_if_dns_works(self):
def check_if_dns_works(self) -> None:
resolver = dns.resolver.Resolver()
try:
resolver.query('_domaincontroller_master._tcp.%s.' % (self.domain,), 'SRV')
@ -96,20 +97,20 @@ class DnsConfigurator(object):
class DnsConfiguratorTrusty(object):
def __init__(self):
def __init__(self) -> None:
self.sub_configurators = (DnsConfiguratorDHClient(), DnsConfiguratorOldNetworkManager(), DnsConfiguratorResolvconf())
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
for configurator in self.sub_configurators:
configurator.backup(backup_dir)
def configure_dns(self, nameservers, domain):
def configure_dns(self, nameservers: List[str], domain: str) -> None:
for configurator in self.sub_configurators:
configurator.configure_dns(nameservers, domain)
class DnsConfiguratorSystemd(object):
def works_on_this_system(self):
def works_on_this_system(self) -> bool:
ssh_process = subprocess.Popen(
['service', 'systemd-resolved', 'status'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
@ -118,7 +119,7 @@ class DnsConfiguratorSystemd(object):
return ssh_process.returncode == 0
@execute_as_root
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
if os.path.isfile('/etc/systemd/resolved.conf'):
userinfo_logger.warn('Warning: /etc/systemd/resolved.conf already exists.')
os.makedirs(os.path.join(backup_dir, 'etc/systemd'))
@ -128,7 +129,7 @@ class DnsConfiguratorSystemd(object):
)
@execute_as_root
def configure_dns(self, nameservers, domain):
def configure_dns(self, nameservers: List[str], domain: str) -> None:
userinfo_logger.info('Writing /etc/systemd/resolved.conf')
with open('/etc/systemd/resolved.conf', 'w') as conf_file:
conf_file.write('[Resolve]\n')
@ -140,7 +141,7 @@ class DnsConfiguratorSystemd(object):
class DnsConfiguratorNetworkManager(object):
def works_on_this_system(self):
def works_on_this_system(self) -> bool:
# could also check lsb_release -sr here instead
p = subprocess.Popen(
['nmcli', '-v'],
@ -157,12 +158,12 @@ class DnsConfiguratorNetworkManager(object):
return p.returncode == 0
@execute_as_root
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
# TODO: where does nmcli store the DNS settings?
return
@execute_as_root
def configure_dns(self, nameservers, domain):
def configure_dns(self, nameservers: List[str], domain: str) -> None:
p = subprocess.Popen(
['nmcli', '-t', '-f', 'NAME,UUID,DEVICE', 'connection', 'show', '--active'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
@ -193,7 +194,7 @@ class DnsConfiguratorNetworkManager(object):
class DnsConfiguratorOldNetworkManager(object):
@execute_as_root
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
p = subprocess.Popen(
['nmcli', '-t', '-f', 'NAME,UUID', 'connection', 'list'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
@ -215,7 +216,7 @@ class DnsConfiguratorOldNetworkManager(object):
os.chmod(fn_backup, 0o600)
@execute_as_root
def configure_dns(self, nameservers, domain):
def configure_dns(self, nameservers: List[str], domain: str) -> None:
ns_string = ';'.join(filter(lambda x: x, nameservers)) + ';'
p = subprocess.Popen(
['nmcli', '-t', '-f', 'NAME,UUID', 'connection', 'list'],
@ -240,7 +241,7 @@ class DnsConfiguratorOldNetworkManager(object):
class DnsConfiguratorDHClient(object):
@execute_as_root
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
if os.path.isfile('/etc/dhcp/dhclient.conf'):
os.makedirs(os.path.join(backup_dir, 'etc/dhcp'))
copyfile(
@ -249,7 +250,7 @@ class DnsConfiguratorDHClient(object):
)
@execute_as_root
def configure_dns(self, nameservers, domain):
def configure_dns(self, nameservers: List[str], domain: str) -> None:
ns_string = " ".join(filter(lambda x: x, nameservers))
p = subprocess.Popen([
'grep', '-q', '^prepend domain-name-servers %s' % ns_string,
@ -266,7 +267,7 @@ class DnsConfiguratorDHClient(object):
class DnsConfiguratorResolvconf(object):
@execute_as_root
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
if os.path.isfile('/etc/resolvconf/resolv.conf.d/base'):
userinfo_logger.warn('Warning: /etc/resolvconf/resolv.conf.d/base already exists.')
os.makedirs(os.path.join(backup_dir, 'etc/resolvconf/resolv.conf.d'))
@ -276,7 +277,7 @@ class DnsConfiguratorResolvconf(object):
)
@execute_as_root
def configure_dns(self, nameservers, domain):
def configure_dns(self, nameservers: List[str], domain: str) -> None:
userinfo_logger.info('Writing /etc/resolvconf/resolv.conf.d/base')
with open('/etc/resolvconf/resolv.conf.d/base', 'w') as conf_file:
for nameserver in nameservers:

View File

@ -42,7 +42,7 @@ userinfo_logger = logging.getLogger('userinfo')
class ConflictChecker(object):
def config_file_exists(self):
def config_file_exists(self) -> bool:
if os.path.isfile('/etc/krb5.conf'):
userinfo_logger.warn('Warning: /etc/krb5.conf already exists.')
return True
@ -51,7 +51,7 @@ class ConflictChecker(object):
class KerberosConfigurator(ConflictChecker):
@execute_as_root
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
if self.config_file_exists():
if not os.path.exists(os.path.join(backup_dir, 'etc')):
os.makedirs(os.path.join(backup_dir, 'etc'))
@ -60,12 +60,12 @@ class KerberosConfigurator(ConflictChecker):
os.path.join(backup_dir, 'etc/krb5.conf')
)
def configure_kerberos(self, kerberos_realm, ldap_master, ldap_server_name, is_samba_dc, dc_ip):
def configure_kerberos(self, kerberos_realm: str, ldap_master: str, ldap_server_name: str, is_samba_dc: bool, dc_ip: str) -> None:
self.write_config_file(kerberos_realm, ldap_master, ldap_server_name, is_samba_dc)
self.synchronize_time_with_master(dc_ip)
@execute_as_root
def write_config_file(self, kerberos_realm, ldap_master, ldap_server_name, is_samba_dc):
def write_config_file(self, kerberos_realm: str, ldap_master: str, ldap_server_name: str, is_samba_dc: bool) -> None:
userinfo_logger.info('Writing /etc/krb5.conf ')
if is_samba_dc:
kpasswd_name = ldap_server_name
@ -98,7 +98,7 @@ class KerberosConfigurator(ConflictChecker):
conf_file.write(config)
@execute_as_root
def synchronize_time_with_master(self, dc_ip):
def synchronize_time_with_master(self, dc_ip: str) -> None:
userinfo_logger.info('Synchronizing time with the DC')
subprocess.check_call(
['ntpdate', '-bu', dc_ip],

View File

@ -50,7 +50,7 @@ class LdapConfigutationException(Exception):
class ConflictChecker(object):
def ldap_conf_exists(self):
def ldap_conf_exists(self) -> bool:
if os.path.isfile('/etc/ldap/ldap.conf'):
userinfo_logger.warn('Warning: /etc/ldap/ldap.conf already exists.')
return True
@ -59,7 +59,7 @@ class ConflictChecker(object):
class LdapConfigurator(ConflictChecker):
@execute_as_root
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
if self.ldap_conf_exists():
os.makedirs(os.path.join(backup_dir, 'etc/ldap'))
copyfile(
@ -67,21 +67,21 @@ class LdapConfigurator(ConflictChecker):
os.path.join(backup_dir, 'etc/ldap/ldap.conf')
)
def configure_ldap(self, dc_ip, ldap_server_name, admin_username, admin_pw, ldap_base, admin_dn):
def configure_ldap(self, dc_ip: str, ldap_server_name: str, admin_username: str, admin_pw: str, ldap_base: str, admin_dn: str) -> None:
RootCertificateProvider().provide_ucs_root_certififcate(dc_ip)
password = self.random_password()
self.modify_old_entry_or_add_machine_to_ldap(password, dc_ip, admin_username, admin_pw, ldap_base, admin_dn)
self.create_ldap_conf_file(ldap_server_name, ldap_base)
self.create_machine_secret_file(password)
def modify_old_entry_or_add_machine_to_ldap(self, password, dc_ip, admin_username, admin_pw, ldap_base, admin_dn):
def modify_old_entry_or_add_machine_to_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str, admin_dn: str) -> None:
if get_machines_ldap_dn(dc_ip, admin_username, admin_pw, admin_dn):
self.modify_machine_in_ldap(password, dc_ip, admin_username, admin_pw, admin_dn)
else:
self.add_machine_to_ldap(password, dc_ip, admin_username, admin_pw, ldap_base, admin_dn)
@execute_as_root
def modify_machine_in_ldap(self, password, dc_ip, admin_username, admin_pw, admin_dn):
def modify_machine_in_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> None:
userinfo_logger.info('Updating old LDAP entry for this machine on the UCS DC')
release_id = subprocess.check_output(['lsb_release', '-is']).strip().decode()
@ -109,7 +109,7 @@ class LdapConfigurator(ConflictChecker):
raise LdapConfigutationException()
@execute_as_root
def add_machine_to_ldap(self, password, dc_ip, admin_username, admin_pw, ldap_base, admin_dn):
def add_machine_to_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str, admin_dn: str) -> None:
userinfo_logger.info('Adding LDAP entry for this machine on the UCS DC')
hostname = subprocess.check_output(['hostname', '-s']).strip().decode()
release_id = subprocess.check_output(['lsb_release', '-is']).strip().decode()
@ -137,7 +137,7 @@ class LdapConfigurator(ConflictChecker):
raise LdapConfigutationException()
@execute_as_root
def get_admin_dn(self, dc_ip, admin_username, admin_pw, ldap_base):
def get_admin_dn(self, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str) -> str:
userinfo_logger.info('Getting the DN of the Administrator ')
ldap_command = 'ldapsearch -QLLL -Y GSSAPI uid=%s dn' % (pipes.quote(admin_username),)
ssh_process = subprocess.Popen(
@ -152,7 +152,7 @@ class LdapConfigurator(ConflictChecker):
return admin_dn
@execute_as_root
def create_ldap_conf_file(self, ldap_server_name, ldap_base):
def create_ldap_conf_file(self, ldap_server_name: str, ldap_base: str) -> None:
userinfo_logger.info('Writing /etc/ldap/ldap.conf ')
ldap_conf = \
"TLS_CACERT /etc/univention/ssl/ucsCA/CAcert.pem\n" \
@ -163,13 +163,13 @@ class LdapConfigurator(ConflictChecker):
conf_file.write(ldap_conf)
@execute_as_root
def create_machine_secret_file(self, password):
def create_machine_secret_file(self, password: str) -> None:
userinfo_logger.info('Writing /etc/machine.secret ')
with open('/etc/machine.secret', 'w') as secret_file:
secret_file.write(password)
os.chmod('/etc/machine.secret', stat.S_IREAD)
def random_password(self, length=20):
def random_password(self, length: int = 20) -> str:
chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!"#$%&\'()*+,-./:;<=>?@[]^_`{|}~'
password = ''
for _ in range(length):

View File

@ -42,7 +42,7 @@ userinfo_logger = logging.getLogger('userinfo')
class ConflictChecker(object):
def configuration_conflicts(self):
def configuration_conflicts(self) -> bool:
login_manager = self.determin_used_login_manager()
if login_manager in ['lightdm', 'gdm3', 'sddm']:
return False
@ -55,7 +55,7 @@ class ConflictChecker(object):
userinfo_logger.error(' This error can be avoided by using the --skip-login-manager parameter.')
return True
def determin_used_login_manager(self):
def determin_used_login_manager(self) -> str:
with open('/etc/X11/default-display-manager', 'r') as login_manager_file:
login_manager = os.path.basename(login_manager_file.read()).strip()
@ -67,19 +67,19 @@ class ConflictChecker(object):
return login_manager
def lightdm_config_file_exists(self):
def lightdm_config_file_exists(self) -> bool:
if os.path.isfile('/etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf'):
userinfo_logger.warn('Warning: /etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf already exists.')
return True
return False
def kde_greeter_is_installed(self):
def kde_greeter_is_installed(self) -> bool:
return 0 == subprocess.call(
['dpkg', '-s', 'lightdm-kde-greeter'],
stdout=OUTPUT_SINK, stderr=OUTPUT_SINK
)
def theme_with_accountsservice_is_ok(self):
def theme_with_accountsservice_is_ok(self) -> bool:
if os.path.isfile('/etc/lightdm/lightdm-kde-greeter.conf'):
with open('/etc/lightdm/lightdm-kde-greeter.conf', 'r') as greeter_config_file:
for line in greeter_config_file:
@ -93,7 +93,7 @@ class ConflictChecker(object):
class LoginManagerConfigurator(ConflictChecker):
@execute_as_root
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
if self.lightdm_config_file_exists():
os.makedirs(os.path.join(backup_dir, 'etc/lightdm/lightdm.conf.d'))
copyfile(
@ -101,13 +101,13 @@ class LoginManagerConfigurator(ConflictChecker):
os.path.join(backup_dir, 'etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf')
)
def enable_login_with_foreign_usernames(self):
def enable_login_with_foreign_usernames(self) -> None:
login_manager = self.determin_used_login_manager()
if login_manager == 'lightdm':
self.enable_login_with_foreign_usernames_for_lightdm()
@execute_as_root
def enable_login_with_foreign_usernames_for_lightdm(self):
def enable_login_with_foreign_usernames_for_lightdm(self) -> None:
userinfo_logger.info('Writing /etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf ')
lightdm_config = \

View File

@ -42,13 +42,13 @@ userinfo_logger = logging.getLogger('userinfo')
class ConflictChecker(object):
def home_dir_conf_file_exists(self):
def home_dir_conf_file_exists(self) -> bool:
if os.path.isfile('/usr/share/pam-configs/ucs_mkhomedir'):
userinfo_logger.warn('Warning: /usr/share/pam-configs/ucs_mkhomedir already exists.')
return True
return False
def group_conf_file_exists(self):
def group_conf_file_exists(self) -> bool:
if os.path.isfile('/usr/share/pam-configs/local_groups'):
userinfo_logger.warn('Warning: /usr/share/pam-configs/local_groups already exists.')
return True
@ -58,7 +58,7 @@ class ConflictChecker(object):
class PamConfigurator(ConflictChecker):
@execute_as_root
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
copy_home_dir_conf = self.home_dir_conf_file_exists()
copy_group_conf = self.group_conf_file_exists()
if copy_home_dir_conf or copy_group_conf:
@ -79,13 +79,13 @@ class PamConfigurator(ConflictChecker):
os.path.join(backup_dir, 'etc/security/group.conf')
)
def setup_pam(self):
def setup_pam(self) -> None:
self.configure_home_dir_creation()
self.add_users_to_requiered_system_groups()
self.update_pam()
@execute_as_root
def configure_home_dir_creation(self):
def configure_home_dir_creation(self) -> None:
userinfo_logger.info('Writing /usr/share/pam-configs/ucs_mkhomedir ')
home_dir_conf = \
@ -98,12 +98,12 @@ class PamConfigurator(ConflictChecker):
with open('/usr/share/pam-configs/ucs_mkhomedir', 'w') as conf_file:
conf_file.write(home_dir_conf)
def add_users_to_requiered_system_groups(self):
def add_users_to_requiered_system_groups(self) -> None:
self.add_groups_to_group_conf()
self.write_pam_group_conf()
@execute_as_root
def add_groups_to_group_conf(self):
def add_groups_to_group_conf(self) -> None:
if self.group_conf_already_ok():
return
@ -115,7 +115,7 @@ class PamConfigurator(ConflictChecker):
'*;*;*;Al0000-2400;audio,cdrom,dialout,floppy,plugdev,adm\n'
)
def group_conf_already_ok(self):
def group_conf_already_ok(self) -> bool:
with open('/etc/security/group.conf', 'r') as groups_file:
for line in groups_file:
if '*;*;*;Al0000-2400;audio,cdrom,dialout,floppy,plugdev,adm\n' in line:
@ -123,7 +123,7 @@ class PamConfigurator(ConflictChecker):
return False
@execute_as_root
def write_pam_group_conf(self):
def write_pam_group_conf(self) -> None:
userinfo_logger.info('Adding groups to /usr/share/pam-configs/local_groups ')
group_conf = \
@ -138,7 +138,7 @@ class PamConfigurator(ConflictChecker):
conf_file.write(group_conf)
@execute_as_root
def update_pam(self):
def update_pam(self) -> None:
userinfo_logger.info('Updating PAM')
env = os.environ.copy()

View File

@ -41,17 +41,17 @@ userinfo_logger = logging.getLogger('userinfo')
class RootCertificateProvider(object):
def provide_ucs_root_certififcate(self, dc_ip):
def provide_ucs_root_certififcate(self, dc_ip: str) -> None:
if not self.ucs_root_certificate_available_locally():
self.download_ucs_root_certificate(dc_ip)
self.add_certificate_to_certificate_store()
def ucs_root_certificate_available_locally(self):
def ucs_root_certificate_available_locally(self) -> bool:
return os.path.isfile('/etc/univention/ssl/ucsCA/CAcert.pem') and \
os.path.isfile('/usr/local/share/ca-certificates/UCSdomain.crt')
@execute_as_root
def download_ucs_root_certificate(self, dc_ip):
def download_ucs_root_certificate(self, dc_ip: str) -> None:
userinfo_logger.info('Downloading the UCS root certificate to /etc/univention/ssl/ucsCA/CAcert.pem')
if not os.path.exists('/etc/univention/ssl/ucsCA'):
@ -67,7 +67,7 @@ class RootCertificateProvider(object):
)
@execute_as_root
def add_certificate_to_certificate_store(self):
def add_certificate_to_certificate_store(self) -> None:
userinfo_logger.info('Adding the UCS root certificate to the certificate store')
os.symlink('/etc/univention/ssl/ucsCA/CAcert.pem', '/usr/local/share/ca-certificates/UCSdomain.crt')

View File

@ -45,7 +45,7 @@ userinfo_logger = logging.getLogger('userinfo')
class ConflictChecker(object):
def sssd_conf_file_exists(self):
def sssd_conf_file_exists(self) -> bool:
if os.path.isfile('/etc/sssd/sssd.conf'):
userinfo_logger.warn('Warning: /etc/sssd/sssd.conf already exists.')
return True
@ -55,7 +55,7 @@ class ConflictChecker(object):
class SssdConfigurator(ConflictChecker):
@execute_as_root
def backup(self, backup_dir):
def backup(self, backup_dir: str) -> None:
if self.sssd_conf_file_exists():
os.makedirs(os.path.join(backup_dir, 'etc/sssd'))
copyfile(
@ -64,7 +64,7 @@ class SssdConfigurator(ConflictChecker):
)
@execute_as_root
def setup_sssd(self, dc_ip, ldap_master, ldap_server_name, admin_username, admin_pw, ldap_base, kerberos_realm, admin_dn, is_samba_dc):
def setup_sssd(self, dc_ip: str, ldap_master: str, ldap_server_name: str, admin_username: str, admin_pw: str, ldap_base: str, kerberos_realm: str, admin_dn: str, is_samba_dc: bool) -> None:
self.ldap_password = open('/etc/machine.secret').read().strip()
RootCertificateProvider().provide_ucs_root_certififcate(dc_ip)
self.write_sssd_conf(dc_ip, ldap_master, ldap_server_name, admin_username, admin_pw, ldap_base, kerberos_realm, admin_dn, is_samba_dc)
@ -72,7 +72,7 @@ class SssdConfigurator(ConflictChecker):
self.restart_sssd()
@execute_as_root
def write_sssd_conf(self, dc_ip, ldap_master, ldap_server_name, admin_username, admin_pw, ldap_base, kerberos_realm, admin_dn, is_samba_dc):
def write_sssd_conf(self, dc_ip: str, ldap_master: str, ldap_server_name: str, admin_username: str, admin_pw: str, ldap_base: str, kerberos_realm: str, admin_dn: str, is_samba_dc: bool) -> None:
userinfo_logger.info('Writing /etc/sssd/sssd.conf ')
if is_samba_dc:
kpasswd_server = ldap_server_name
@ -120,7 +120,7 @@ class SssdConfigurator(ConflictChecker):
os.chmod('/etc/sssd/sssd.conf', stat.S_IREAD | stat.S_IWRITE)
@execute_as_root
def configure_sssd(self):
def configure_sssd(self) -> None:
userinfo_logger.info('Configuring auth config profile for sssd')
subprocess.check_call(
@ -129,7 +129,7 @@ class SssdConfigurator(ConflictChecker):
)
@execute_as_root
def restart_sssd(self):
def restart_sssd(self) -> None:
userinfo_logger.info('Restarting sssd')
subprocess.check_call(

View File

@ -32,5 +32,5 @@
import subprocess
def get_distribution():
def get_distribution() -> str:
return subprocess.check_output(['lsb_release', '-is']).strip().decode()

View File

@ -32,6 +32,7 @@
import os
import socket
import subprocess
from typing import List, Optional, Set
import dns.resolver
import IPy
@ -40,7 +41,7 @@ import netifaces
OUTPUT_SINK = open(os.devnull, 'w')
def get_master_ip_through_dns(domain):
def get_master_ip_through_dns(domain: str) -> Optional[str]:
resolver = dns.resolver.Resolver()
try:
response = resolver.query('_domaincontroller_master._tcp.%s.' % (domain,), 'SRV')
@ -50,7 +51,7 @@ def get_master_ip_through_dns(domain):
return socket.gethostbyname(master_fqdn)
def get_ucs_domainname():
def get_ucs_domainname() -> Optional[str]:
domainname = get_ucs_domainname_via_local_configuration()
if not domainname:
domainname = get_ucs_domainname_via_reverse_dns()
@ -59,7 +60,7 @@ def get_ucs_domainname():
return domainname
def get_ucs_domainname_via_local_configuration():
def get_ucs_domainname_via_local_configuration() -> Optional[str]:
try:
domainname = socket.getfqdn().split('.', 1)[1]
except Exception:
@ -67,7 +68,7 @@ def get_ucs_domainname_via_local_configuration():
return domainname
def get_ucs_domainname_via_reverse_dns():
def get_ucs_domainname_via_reverse_dns() -> Optional[str]:
ip_addresses = get_all_ip_addresses()
possible_ucs_domainnames = set()
for ip_address in ip_addresses:
@ -79,7 +80,7 @@ def get_ucs_domainname_via_reverse_dns():
return None
def get_ucs_domainname_of_dns_server():
def get_ucs_domainname_of_dns_server() -> Optional[str]:
nameservers = get_nameservers()
possible_ucs_domainnames = set()
for nameserver in nameservers:
@ -91,7 +92,7 @@ def get_ucs_domainname_of_dns_server():
return None
def get_nameservers():
def get_nameservers() -> Set[str]:
output = subprocess.check_output(['systemd-resolve', '--status'])
nameservers = set()
@ -108,7 +109,7 @@ def get_nameservers():
return nameservers
def is_only_ip(line):
def is_only_ip(line: str) -> bool:
try:
IPy.IP(line.strip())
return True
@ -116,7 +117,7 @@ def is_only_ip(line):
return False
def get_all_ip_addresses():
def get_all_ip_addresses() -> List[str]:
ip_addresses = []
for interface in netifaces.interfaces():
# Skip the loopback device.
@ -127,7 +128,7 @@ def get_all_ip_addresses():
return ip_addresses
def get_ipv4_addresses(interface):
def get_ipv4_addresses(interface: str) -> List[str]:
short_addresses = []
if netifaces.AF_INET in netifaces.ifaddresses(interface):
ipv4_addresses = netifaces.ifaddresses(interface)[netifaces.AF_INET]
@ -136,7 +137,7 @@ def get_ipv4_addresses(interface):
return short_addresses
def get_ipv6_addresses(interface):
def get_ipv6_addresses(interface: str) -> List[str]:
short_addresses = []
if netifaces.AF_INET6 in netifaces.ifaddresses(interface):
ipv6_addresses = netifaces.ifaddresses(interface)[netifaces.AF_INET6]
@ -147,7 +148,7 @@ def get_ipv6_addresses(interface):
return short_addresses
def get_ucs_domainname_from_fqdn(fqdn):
def get_ucs_domainname_from_fqdn(fqdn: str) -> Optional[str]:
try:
domainname = fqdn.split('.', 1)[1]
# Check if the _domaincontroller_master._tcp record exists, to ensure

View File

@ -31,10 +31,13 @@
import os
import socket
from typing import Any, Callable, TypeVar, cast
F = TypeVar('F', bound=Callable[..., Any])
def execute_as_root(func):
def root_wrapper(*args, **kwargs):
def execute_as_root(func: F) -> F:
def root_wrapper(*args: Any, **kwargs: Any) -> Any:
old_euid = os.geteuid()
os.seteuid(0)
try:
@ -42,10 +45,10 @@ def execute_as_root(func):
finally:
os.seteuid(old_euid)
return return_value
return root_wrapper
return cast(F, root_wrapper)
def name_is_resolvable(name):
def name_is_resolvable(name: str) -> bool:
try:
return bool(socket.getaddrinfo(name, 22, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP))
except Exception:

View File

@ -31,6 +31,7 @@
import pipes
import subprocess
from typing import Optional
from ldap.filter import filter_format
@ -38,7 +39,7 @@ from univention_domain_join.utils.general import execute_as_root
@execute_as_root
def authenticate_admin(dc_ip, admin_username, admin_pw):
def authenticate_admin(dc_ip: str, admin_username: str, admin_pw: str) -> None:
ldap_command = ' echo {1} > /dev/shm/{0}domain-join; chmod 600 /dev/shm/{0}domain-join; kinit --password-file=/dev/shm/{0}domain-join {0}'.format(pipes.quote(admin_username), pipes.quote(admin_pw))
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), ldap_command],
@ -48,7 +49,7 @@ def authenticate_admin(dc_ip, admin_username, admin_pw):
@execute_as_root
def cleanup_authentication(dc_ip, admin_username, admin_pw):
def cleanup_authentication(dc_ip: str, admin_username: str, admin_pw: str) -> None:
ldap_command = 'rm -f /dev/shm/{0}domain-join; kdestroy'.format(pipes.quote(admin_username))
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), ldap_command],
@ -58,7 +59,7 @@ def cleanup_authentication(dc_ip, admin_username, admin_pw):
@execute_as_root
def is_samba_dc(admin_username, admin_pw, dc_ip, admin_dn):
def is_samba_dc(admin_username: str, admin_pw: str, dc_ip: str, admin_dn: str) -> bool:
ldap_command = ['ldapsearch', '-QLLL', filter_format('aRecord=%s', [dc_ip]), 'univentionService']
escaped_ldap_command = ' '.join([pipes.quote(x) for x in ldap_command])
ssh_process = subprocess.Popen(
@ -72,7 +73,7 @@ def is_samba_dc(admin_username, admin_pw, dc_ip, admin_dn):
return False
def get_machines_ldap_dn(dc_ip, admin_username, admin_pw, admin_dn):
def get_machines_ldap_dn(dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> Optional[str]:
for udm_type in ['computers/ubuntu', 'computers/linux', 'computers/ucc']:
machines_ldap_dn = get_machines_ldap_dn_given_the_udm_type(udm_type, dc_ip, admin_username, admin_pw, admin_dn)
if machines_ldap_dn:
@ -80,7 +81,7 @@ def get_machines_ldap_dn(dc_ip, admin_username, admin_pw, admin_dn):
return None
def get_machines_udm_type(dc_ip, admin_username, admin_pw, admin_dn):
def get_machines_udm_type(dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> Optional[str]:
for udm_type in ['computers/ubuntu', 'computers/linux', 'computers/ucc']:
machines_ldap_dn = get_machines_ldap_dn_given_the_udm_type(udm_type, dc_ip, admin_username, admin_pw, admin_dn)
if machines_ldap_dn:
@ -89,7 +90,7 @@ def get_machines_udm_type(dc_ip, admin_username, admin_pw, admin_dn):
@execute_as_root
def get_machines_ldap_dn_given_the_udm_type(udm_type, dc_ip, admin_username, admin_pw, admin_dn):
def get_machines_ldap_dn_given_the_udm_type(udm_type: str, dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> Optional[str]:
hostname = subprocess.check_output(['hostname', '-s']).strip().decode()
udm_command = ['/usr/sbin/udm', udm_type, 'list', '--binddn', admin_dn, '--bindpwdfile', '/dev/shm/%sdomain-join' % (admin_username,), '--filter', 'name=%s' % (hostname,)]
escaped_udm_command = ' '.join([pipes.quote(x) for x in udm_command])