style: tab 2 spaces

find -name venv -prune -o -name \*.py -exec autopep8 --in-place {} +
This commit is contained in:
Philipp Hahn 2023-08-18 10:20:43 +02:00
parent 75cf638498
commit 7e334ffbd5
16 changed files with 1362 additions and 1362 deletions

View File

@ -19,151 +19,151 @@ from univention_domain_join.utils.general import execute_as_root, ssh
def check_if_run_as_root() -> None:
if os.getuid() != 0:
print('This tool must be executed as root.')
exit(1)
if os.getuid() != 0:
print('This tool must be executed as root.')
exit(1)
@execute_as_root
def set_up_logging(logfile: str) -> None:
verbose_formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')
plain_formatter = logging.Formatter('%(message)s')
verbose_formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')
plain_formatter = logging.Formatter('%(message)s')
os.makedirs(os.path.dirname(logfile), exist_ok=True)
logfile_handler = logging.FileHandler(logfile)
logfile_handler.setLevel(logging.DEBUG)
logfile_handler.setFormatter(verbose_formatter)
os.makedirs(os.path.dirname(logfile), exist_ok=True)
logfile_handler = logging.FileHandler(logfile)
logfile_handler.setLevel(logging.DEBUG)
logfile_handler.setFormatter(verbose_formatter)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(plain_formatter)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(plain_formatter)
userinfo_logger = logging.getLogger('userinfo')
userinfo_logger.setLevel(logging.DEBUG)
userinfo_logger.addHandler(logfile_handler)
userinfo_logger.addHandler(stdout_handler)
userinfo_logger = logging.getLogger('userinfo')
userinfo_logger.setLevel(logging.DEBUG)
userinfo_logger.addHandler(logfile_handler)
userinfo_logger.addHandler(stdout_handler)
debugging_logger = logging.getLogger('debugging')
debugging_logger.setLevel(logging.DEBUG)
debugging_logger.addHandler(logfile_handler)
debugging_logger = logging.getLogger('debugging')
debugging_logger.setLevel(logging.DEBUG)
debugging_logger.addHandler(logfile_handler)
def get_joiner_for_this_distribution(dc_ip: str, admin_username: str, admin_pw: str, skip_login_manager: bool, force_ucs_dns: bool) -> AbstractJoiner:
distribution = get_distribution()
try:
distribution_join_module = importlib.import_module('univention_domain_join.distributions.%s' % (distribution.lower(),))
if not admin_username:
admin_username = get_admin_username()
if not admin_pw:
admin_pw = get_admin_password(admin_username)
check_if_ssh_works_with_given_account(dc_ip, admin_username, admin_pw)
ucr_variables = get_ucr_variables_from_dc(dc_ip, admin_username, admin_pw)
return distribution_join_module.Joiner(ucr_variables, admin_username, admin_pw, dc_ip, skip_login_manager, force_ucs_dns)
except ImportError:
getLogger("userinfo").critical('The used distribution "%s" is not supported.' % (distribution,))
exit(1)
distribution = get_distribution()
try:
distribution_join_module = importlib.import_module('univention_domain_join.distributions.%s' % (distribution.lower(),))
if not admin_username:
admin_username = get_admin_username()
if not admin_pw:
admin_pw = get_admin_password(admin_username)
check_if_ssh_works_with_given_account(dc_ip, admin_username, admin_pw)
ucr_variables = get_ucr_variables_from_dc(dc_ip, admin_username, admin_pw)
return distribution_join_module.Joiner(ucr_variables, admin_username, admin_pw, dc_ip, skip_login_manager, force_ucs_dns)
except ImportError:
getLogger("userinfo").critical('The used distribution "%s" is not supported.' % (distribution,))
exit(1)
def get_admin_username() -> str:
return input('Please enter the user name of a domain administrator: ')
return input('Please enter the user name of a domain administrator: ')
def get_admin_password(admin_username: str) -> str:
# TODO: Don't ask for the password if ssh works passwordless already.
return getpass(prompt='Please enter the password for %s: ' % (admin_username,))
# TODO: Don't ask for the password if ssh works passwordless already.
return getpass(prompt='Please enter the password for %s: ' % (admin_username,))
def check_if_ssh_works_with_given_account(dc_ip: str, admin_username: str, admin_pw: str) -> None:
cmd = "true"
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
if ssh_process.returncode != 0:
getLogger("userinfo").critical('It\'s not possible to connect to the UCS DC via ssh, with the given credentials.')
exit(1)
cmd = "true"
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
if ssh_process.returncode != 0:
getLogger("userinfo").critical('It\'s not possible to connect to the UCS DC via ssh, with the given credentials.')
exit(1)
def get_ucr_variables_from_dc(dc_ip: str, admin_username: str, admin_pw: str) -> Dict[str, str]:
cmd = "/usr/sbin/ucr shell"
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert ssh_process.stdout
ucr_variables = dict(
line.decode("utf-8", "replace").strip().split("=", 1)
for line in ssh_process.stdout
)
cmd = "/usr/sbin/ucr shell"
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert ssh_process.stdout
ucr_variables = dict(
line.decode("utf-8", "replace").strip().split("=", 1)
for line in ssh_process.stdout
)
_, stderr = ssh_process.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
if ssh_process.wait() != 0:
getLogger("userinfo").critical('Fetching the UCR variables from the UCS DC failed.')
exit(1)
_, stderr = ssh_process.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
if ssh_process.wait() != 0:
getLogger("userinfo").critical('Fetching the UCR variables from the UCS DC failed.')
exit(1)
ucr_variables.pop("hostname", None)
return ucr_variables
ucr_variables.pop("hostname", None)
return ucr_variables
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description='Tool for joining a client computer into an UCS domain.'
)
parser.add_argument('--username', help='User name of a domain administrator')
parser.add_argument('--password', help='Password for the domain administrator')
parser.add_argument('--password-file', help='Path to a file, containing the password for the domain administrator', metavar="FILE")
parser.add_argument('--skip-login-manager', action='store_true', help='Do not configure the login manager')
parser.add_argument('--domain', help='Domain name. Can be left out if the domain is configured for this system')
parser.add_argument('--dc-ip', help='IP address of the UCS domain controller to join to. Can be used if --domain does not work. If unsure, use the IP of the UCS Master', metavar="IP")
parser.add_argument('--force-ucs-dns', action='store_true', help='Change the system\'s DNS settings and set the UCS DC as DNS nameserver (default is to use the standard network settings, but make sure the your system can resolve the hostname of the UCS DC and the UCS master system)')
parser.add_argument("--logfile", "-L", help="Path to log file %(default)s", metavar="FILE", default="/var/log/univention/domain-join-cli.log")
args = parser.parse_args()
return args
parser = argparse.ArgumentParser(
description='Tool for joining a client computer into an UCS domain.'
)
parser.add_argument('--username', help='User name of a domain administrator')
parser.add_argument('--password', help='Password for the domain administrator')
parser.add_argument('--password-file', help='Path to a file, containing the password for the domain administrator', metavar="FILE")
parser.add_argument('--skip-login-manager', action='store_true', help='Do not configure the login manager')
parser.add_argument('--domain', help='Domain name. Can be left out if the domain is configured for this system')
parser.add_argument('--dc-ip', help='IP address of the UCS domain controller to join to. Can be used if --domain does not work. If unsure, use the IP of the UCS Master', metavar="IP")
parser.add_argument('--force-ucs-dns', action='store_true', help='Change the system\'s DNS settings and set the UCS DC as DNS nameserver (default is to use the standard network settings, but make sure the your system can resolve the hostname of the UCS DC and the UCS master system)')
parser.add_argument("--logfile", "-L", help="Path to log file %(default)s", metavar="FILE", default="/var/log/univention/domain-join-cli.log")
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
args = parse_args()
check_if_run_as_root()
ruid = int(os.environ.get('SUDO_UID', 0))
if ruid:
os.setresuid(ruid, ruid, 0)
check_if_run_as_root()
ruid = int(os.environ.get('SUDO_UID', 0))
if ruid:
os.setresuid(ruid, ruid, 0)
set_up_logging(args.logfile)
set_up_logging(args.logfile)
try:
if not args.dc_ip:
if not args.domain:
args.domain = get_ucs_domainname()
getLogger("userinfo").info('Automatically detected the domain %r.' % (args.domain))
if not args.domain:
getLogger("userinfo").critical(
'Unable to determine the UCS domain automatically. Please provide '
'it using the --domain parameter or use the tool with --dc-ip.'
)
exit(1)
args.dc_ip = get_master_ip_through_dns(args.domain)
if not args.dc_ip:
getLogger("userinfo").critical(
'No DNS record for the DC master could be found. Please make sure that '
'the DC master is the DNS server for this computer or use this tool with --dc-ip.'
)
exit(1)
try:
if not args.dc_ip:
if not args.domain:
args.domain = get_ucs_domainname()
getLogger("userinfo").info('Automatically detected the domain %r.' % (args.domain))
if not args.domain:
getLogger("userinfo").critical(
'Unable to determine the UCS domain automatically. Please provide '
'it using the --domain parameter or use the tool with --dc-ip.'
)
exit(1)
args.dc_ip = get_master_ip_through_dns(args.domain)
if not args.dc_ip:
getLogger("userinfo").critical(
'No DNS record for the DC master could be found. Please make sure that '
'the DC master is the DNS server for this computer or use this tool with --dc-ip.'
)
exit(1)
if args.password:
password = args.password
elif args.password_file:
try:
with open(args.password_file) as password_file:
password = password_file.read().strip()
except IOError:
getLogger("userinfo").error('Error: The password file could not be read.')
password = None
else:
password = None
if args.password:
password = args.password
elif args.password_file:
try:
with open(args.password_file) as password_file:
password = password_file.read().strip()
except IOError:
getLogger("userinfo").error('Error: The password file could not be read.')
password = None
else:
password = None
distribution_joiner = get_joiner_for_this_distribution(args.dc_ip, args.username, password, args.skip_login_manager, args.force_ucs_dns)
distribution_joiner.check_if_join_is_possible_without_problems()
distribution_joiner.create_backup_of_config_files()
distribution_joiner.join_domain()
except Exception as e:
getLogger("userinfo").critical('An error occurred: %s. Please check %s for more information.' % (e, args.logfile))
getLogger("debugging").critical(e, exc_info=True)
exit(1)
distribution_joiner = get_joiner_for_this_distribution(args.dc_ip, args.username, password, args.skip_login_manager, args.force_ucs_dns)
distribution_joiner.check_if_join_is_possible_without_problems()
distribution_joiner.create_backup_of_config_files()
distribution_joiner.join_domain()
except Exception as e:
getLogger("userinfo").critical('An error occurred: %s. Please check %s for more information.' % (e, args.logfile))
getLogger("debugging").critical(e, exc_info=True)
exit(1)

View File

@ -23,472 +23,472 @@ LOG = '/var/log/univention/domain-join-gui.log'
def check_if_run_as_root() -> None:
if os.getuid() != 0:
app = QApplication(sys.argv)
form = NotRootDialog()
form.show()
sys.exit(app.exec_())
if os.getuid() != 0:
app = QApplication(sys.argv)
form = NotRootDialog()
form.show()
sys.exit(app.exec_())
@execute_as_root
def set_up_logging(logfile: str) -> None:
verbose_formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')
plain_formatter = logging.Formatter('%(message)s')
verbose_formatter = logging.Formatter('%(asctime)s %(name)s %(levelname)s %(message)s')
plain_formatter = logging.Formatter('%(message)s')
os.makedirs(os.path.dirname(logfile), exist_ok=True)
logfile_handler = logging.FileHandler(logfile)
logfile_handler.setLevel(logging.DEBUG)
logfile_handler.setFormatter(verbose_formatter)
os.makedirs(os.path.dirname(logfile), exist_ok=True)
logfile_handler = logging.FileHandler(logfile)
logfile_handler.setLevel(logging.DEBUG)
logfile_handler.setFormatter(verbose_formatter)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(plain_formatter)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(plain_formatter)
userinfo_logger = logging.getLogger('userinfo')
userinfo_logger.setLevel(logging.DEBUG)
userinfo_logger.addHandler(logfile_handler)
userinfo_logger.addHandler(stdout_handler)
userinfo_logger = logging.getLogger('userinfo')
userinfo_logger.setLevel(logging.DEBUG)
userinfo_logger.addHandler(logfile_handler)
userinfo_logger.addHandler(stdout_handler)
class DomainJoinException(Exception):
pass
pass
class SshException(Exception):
pass
pass
class DistributionException(Exception):
pass
pass
class NotRootDialog(QMessageBox):
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('Univention Domain Join')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText('This tool must be executed as root.')
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('Univention Domain Join')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText('This tool must be executed as root.')
class DomainJoinGui(QMainWindow):
def __init__(self) -> None:
super().__init__()
def __init__(self) -> None:
super().__init__()
self.regex_ipv4 = r'(25[0-5]|2[0-4][0-9]|1?[0-9]{1,2})(\.(25[0-5]|2[0-4][0-9]|1?[0-9]{1,2})){3}'
self.regex_ipv6 = r'(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))'
self.regex_domainname = r'(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|([a-zA-Z0-9][a-zA-Z0-9-_]{1,61}[a-zA-Z0-9]))\.([a-zA-Z]{2,8}|[a-zA-Z0-9-]{2,30}\.[a-zA-Z]{2,3})'
self.regex_ipv4 = r'(25[0-5]|2[0-4][0-9]|1?[0-9]{1,2})(\.(25[0-5]|2[0-4][0-9]|1?[0-9]{1,2})){3}'
self.regex_ipv6 = r'(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))'
self.regex_domainname = r'(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|([a-zA-Z0-9][a-zA-Z0-9-_]{1,61}[a-zA-Z0-9]))\.([a-zA-Z]{2,8}|[a-zA-Z0-9-]{2,30}\.[a-zA-Z]{2,3})'
self.setWindowTitle("Univention Domain Join")
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setWindowTitle("Univention Domain Join")
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.build_main_window()
self.build_main_window()
# Make the join_button the focused element on startup.
self.setTabOrder(self.join_button, self.cancel_button)
self.setTabOrder(self.cancel_button, self.domainname_or_ip_input)
self.setTabOrder(self.domainname_or_ip_input, self. admin_username_input)
self.setTabOrder(self.admin_username_input, self.admin_password_input)
# Make the join_button the focused element on startup.
self.setTabOrder(self.join_button, self.cancel_button)
self.setTabOrder(self.cancel_button, self.domainname_or_ip_input)
self.setTabOrder(self.domainname_or_ip_input, self. admin_username_input)
self.setTabOrder(self.admin_username_input, self.admin_password_input)
def build_main_window(self) -> None:
main_layout = QVBoxLayout()
def build_main_window(self) -> None:
main_layout = QVBoxLayout()
self.add_menu_bar()
self.add_general_description_group(main_layout)
self.add_hline(main_layout)
self.add_inputs_group(main_layout)
main_layout.addStretch(1)
self.add_buttons(main_layout)
self.admin_password_input.returnPressed.connect(self.join_button.click)
self.add_menu_bar()
self.add_general_description_group(main_layout)
self.add_hline(main_layout)
self.add_inputs_group(main_layout)
main_layout.addStretch(1)
self.add_buttons(main_layout)
self.admin_password_input.returnPressed.connect(self.join_button.click)
central_widget = QWidget()
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
central_widget = QWidget()
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
def add_menu_bar(self) -> None:
menu_bar = QMenuBar(self)
def add_menu_bar(self) -> None:
menu_bar = QMenuBar(self)
help_menu = menu_bar.addMenu('Help')
about_action = QAction('About', self)
about_action.triggered.connect(self.about)
help_menu.addAction(about_action)
help_menu = menu_bar.addMenu('Help')
about_action = QAction('About', self)
about_action.triggered.connect(self.about)
help_menu.addAction(about_action)
self.setMenuBar(menu_bar)
self.setMenuBar(menu_bar)
def about(self) -> None:
self.about_dialog = QMessageBox.about(
self, 'About',
'<h1>Univention Domain Join</h1>'
'<p>Univention Domain Join is a tool, which helps you integrate an '
'Ubuntu computer into an Univention Corporate Server domain.</p>'
'If you need help visit the <a href="https://help.univention.com/">Univention '
'forum</a> or <a href="https://www.univention.com/contact/">contact us</a>.'
'<p>Copyright: <a href="https://www.univention.com">Univention GmbH</a></p>'
)
def about(self) -> None:
self.about_dialog = QMessageBox.about(
self, 'About',
'<h1>Univention Domain Join</h1>'
'<p>Univention Domain Join is a tool, which helps you integrate an '
'Ubuntu computer into an Univention Corporate Server domain.</p>'
'If you need help visit the <a href="https://help.univention.com/">Univention '
'forum</a> or <a href="https://www.univention.com/contact/">contact us</a>.'
'<p>Copyright: <a href="https://www.univention.com">Univention GmbH</a></p>'
)
def add_general_description_group(self, layout: QBoxLayout) -> None:
description_group = QWidget()
description_group_layout = QHBoxLayout()
self.add_general_description(description_group_layout)
description_group_layout.addStretch(1)
self.add_domain_join_icon(description_group_layout)
description_group.setLayout(description_group_layout)
layout.addWidget(description_group)
def add_general_description_group(self, layout: QBoxLayout) -> None:
description_group = QWidget()
description_group_layout = QHBoxLayout()
self.add_general_description(description_group_layout)
description_group_layout.addStretch(1)
self.add_domain_join_icon(description_group_layout)
description_group.setLayout(description_group_layout)
layout.addWidget(description_group)
def add_general_description(self, layout: QBoxLayout) -> None:
short_description = QLabel(
'<h3>Univention Domain Join Assistant</h3>'
'<p>Use this tool to configure this computer to be part of your UCS domain.</p>'
)
font_metrics = QFontMetrics(short_description.font())
short_description.setMinimumWidth(40 * font_metrics.width('a'))
short_description.setWordWrap(True)
layout.addWidget(short_description, stretch=10)
def add_general_description(self, layout: QBoxLayout) -> None:
short_description = QLabel(
'<h3>Univention Domain Join Assistant</h3>'
'<p>Use this tool to configure this computer to be part of your UCS domain.</p>'
)
font_metrics = QFontMetrics(short_description.font())
short_description.setMinimumWidth(40 * font_metrics.width('a'))
short_description.setWordWrap(True)
layout.addWidget(short_description, stretch=10)
def add_domain_join_icon(self, layout: QBoxLayout) -> None:
icon = QLabel()
scriptDir = os.path.dirname(os.path.realpath(__file__))
icon.setPixmap(QPixmap(scriptDir + os.path.sep + 'domain.png'))
layout.addWidget(icon)
def add_domain_join_icon(self, layout: QBoxLayout) -> None:
icon = QLabel()
scriptDir = os.path.dirname(os.path.realpath(__file__))
icon.setPixmap(QPixmap(scriptDir + os.path.sep + 'domain.png'))
layout.addWidget(icon)
def add_hline(self, layout: QBoxLayout) -> None:
frame = QFrame()
frame.setFrameShape(QFrame.HLine)
layout.addWidget(frame)
def add_hline(self, layout: QBoxLayout) -> None:
frame = QFrame()
frame.setFrameShape(QFrame.HLine)
layout.addWidget(frame)
def add_inputs_group(self, layout: QBoxLayout) -> None:
inputs_box = QWidget()
inputs_box_layout = QVBoxLayout()
self.add_inputs_description(inputs_box_layout)
self.add_domainname_or_ip_input(inputs_box_layout)
self.add_username_input(inputs_box_layout)
self.add_password_input(inputs_box_layout)
self.add_force_ucs_dns(inputs_box_layout)
inputs_box.setLayout(inputs_box_layout)
layout.addWidget(inputs_box)
def add_inputs_group(self, layout: QBoxLayout) -> None:
inputs_box = QWidget()
inputs_box_layout = QVBoxLayout()
self.add_inputs_description(inputs_box_layout)
self.add_domainname_or_ip_input(inputs_box_layout)
self.add_username_input(inputs_box_layout)
self.add_password_input(inputs_box_layout)
self.add_force_ucs_dns(inputs_box_layout)
inputs_box.setLayout(inputs_box_layout)
layout.addWidget(inputs_box)
def add_inputs_description(self, layout: QBoxLayout) -> None:
inputs_description = QLabel(
'To perform the domain join you need to provide the domain name or '
'the IP address of an UCS DC and the credentials of a domain administrator.'
)
inputs_description.setWordWrap(True)
layout.addWidget(inputs_description)
def add_inputs_description(self, layout: QBoxLayout) -> None:
inputs_description = QLabel(
'To perform the domain join you need to provide the domain name or '
'the IP address of an UCS DC and the credentials of a domain administrator.'
)
inputs_description.setWordWrap(True)
layout.addWidget(inputs_description)
def add_domainname_or_ip_input(self, layout: QBoxLayout) -> None:
short_description = QLabel('Domain name or IP address:')
short_description.setWordWrap(True)
layout.addWidget(short_description)
self.domainname_or_ip_input = QLineEdit()
font_metrics = QFontMetrics(self.domainname_or_ip_input.font())
self.domainname_or_ip_input.setFixedWidth(32 * font_metrics.width('a'))
self.domainname_or_ip_input.setPlaceholderText('e.g. mydomain.com or 10.0.0.4')
domainname_or_ip_validator = QRegExpValidator(QRegExp(
r'%s|%s|%s' % (self.regex_ipv4, self.regex_ipv6, self.regex_domainname)
), self)
self.domainname_or_ip_input.setValidator(domainname_or_ip_validator)
layout.addWidget(self.domainname_or_ip_input)
self.try_filling_in_domainname()
def add_domainname_or_ip_input(self, layout: QBoxLayout) -> None:
short_description = QLabel('Domain name or IP address:')
short_description.setWordWrap(True)
layout.addWidget(short_description)
self.domainname_or_ip_input = QLineEdit()
font_metrics = QFontMetrics(self.domainname_or_ip_input.font())
self.domainname_or_ip_input.setFixedWidth(32 * font_metrics.width('a'))
self.domainname_or_ip_input.setPlaceholderText('e.g. mydomain.com or 10.0.0.4')
domainname_or_ip_validator = QRegExpValidator(QRegExp(
r'%s|%s|%s' % (self.regex_ipv4, self.regex_ipv6, self.regex_domainname)
), self)
self.domainname_or_ip_input.setValidator(domainname_or_ip_validator)
layout.addWidget(self.domainname_or_ip_input)
self.try_filling_in_domainname()
def try_filling_in_domainname(self) -> None:
self.domainname_thread = DomainnameDetectionThread()
self.domainname_or_ip_input.setPlaceholderText('Detecting domain name...')
self.domainname_thread.domain['QString'].connect(self.domainname_detection_successful)
self.domainname_thread.finished.connect(self.domainname_detection_finished)
self.domainname_thread.start()
def try_filling_in_domainname(self) -> None:
self.domainname_thread = DomainnameDetectionThread()
self.domainname_or_ip_input.setPlaceholderText('Detecting domain name...')
self.domainname_thread.domain['QString'].connect(self.domainname_detection_successful)
self.domainname_thread.finished.connect(self.domainname_detection_finished)
self.domainname_thread.start()
def domainname_detection_successful(self, domainname: str) -> None:
domainname_qregex = QRegExp(self.regex_domainname)
# self.domainname_or_ip_input.text() is used to make sure the user
# didn't fill in the field already.
if not self.domainname_or_ip_input.text() and domainname_qregex.exactMatch(domainname):
self.domainname_or_ip_input.setText(domainname)
self.admin_password_input.setFocus()
def domainname_detection_successful(self, domainname: str) -> None:
domainname_qregex = QRegExp(self.regex_domainname)
# self.domainname_or_ip_input.text() is used to make sure the user
# didn't fill in the field already.
if not self.domainname_or_ip_input.text() and domainname_qregex.exactMatch(domainname):
self.domainname_or_ip_input.setText(domainname)
self.admin_password_input.setFocus()
def domainname_detection_finished(self) -> None:
self.domainname_or_ip_input.setPlaceholderText('e.g. mydomain.com or 10.0.0.4')
def domainname_detection_finished(self) -> None:
self.domainname_or_ip_input.setPlaceholderText('e.g. mydomain.com or 10.0.0.4')
def add_username_input(self, layout: QBoxLayout) -> None:
short_description = QLabel('Domain administrator\'s username:')
short_description.setWordWrap(True)
layout.addWidget(short_description)
self.admin_username_input = QLineEdit()
font_metrics = QFontMetrics(self.domainname_or_ip_input.font())
self.admin_username_input.setFixedWidth(32 * font_metrics.width('a'))
self.admin_username_input.setPlaceholderText('Username')
self.admin_username_input.setText('Administrator')
username_validator = QRegExpValidator(QRegExp(r'\w+'), self)
self.admin_username_input.setValidator(username_validator)
layout.addWidget(self.admin_username_input)
def add_username_input(self, layout: QBoxLayout) -> None:
short_description = QLabel('Domain administrator\'s username:')
short_description.setWordWrap(True)
layout.addWidget(short_description)
self.admin_username_input = QLineEdit()
font_metrics = QFontMetrics(self.domainname_or_ip_input.font())
self.admin_username_input.setFixedWidth(32 * font_metrics.width('a'))
self.admin_username_input.setPlaceholderText('Username')
self.admin_username_input.setText('Administrator')
username_validator = QRegExpValidator(QRegExp(r'\w+'), self)
self.admin_username_input.setValidator(username_validator)
layout.addWidget(self.admin_username_input)
def add_force_ucs_dns(self, layout: QBoxLayout) -> None:
short_description = QLabel('Change the system\'s DNS settings and set the UCS DC as DNS nameserver (default is to use the standard network settings, but make sure the your system can resolve the hostname of the UCS DC and the UCS master system)')
short_description.setWordWrap(True)
layout.addWidget(short_description)
self.force_ucs_dns_input = QCheckBox('Set UCS DC as DNS server')
layout.addWidget(self.force_ucs_dns_input)
def add_force_ucs_dns(self, layout: QBoxLayout) -> None:
short_description = QLabel('Change the system\'s DNS settings and set the UCS DC as DNS nameserver (default is to use the standard network settings, but make sure the your system can resolve the hostname of the UCS DC and the UCS master system)')
short_description.setWordWrap(True)
layout.addWidget(short_description)
self.force_ucs_dns_input = QCheckBox('Set UCS DC as DNS server')
layout.addWidget(self.force_ucs_dns_input)
def add_password_input(self, layout: QBoxLayout) -> None:
short_description = QLabel('Domain administrator\'s password:')
short_description.setWordWrap(True)
layout.addWidget(short_description)
def add_password_input(self, layout: QBoxLayout) -> None:
short_description = QLabel('Domain administrator\'s password:')
short_description.setWordWrap(True)
layout.addWidget(short_description)
self.admin_password_input = QLineEdit()
font_metrics = QFontMetrics(self.domainname_or_ip_input.font())
self.admin_password_input.setFixedWidth(32 * font_metrics.width('a'))
self.admin_password_input.setPlaceholderText('Password')
self.admin_password_input.setEchoMode(QLineEdit.Password)
password_validator = QRegExpValidator(QRegExp(r'.+'), self)
self.admin_password_input.setValidator(password_validator)
layout.addWidget(self.admin_password_input)
self.admin_password_input = QLineEdit()
font_metrics = QFontMetrics(self.domainname_or_ip_input.font())
self.admin_password_input.setFixedWidth(32 * font_metrics.width('a'))
self.admin_password_input.setPlaceholderText('Password')
self.admin_password_input.setEchoMode(QLineEdit.Password)
password_validator = QRegExpValidator(QRegExp(r'.+'), self)
self.admin_password_input.setValidator(password_validator)
layout.addWidget(self.admin_password_input)
def add_buttons(self, layout: QBoxLayout) -> None:
button_widget = QWidget()
button_layout = QHBoxLayout()
button_layout.addStretch()
self.join_button = QPushButton('Join')
self.join_button.clicked.connect(self.join_domain_if_inputs_are_ok)
self.join_button.setAutoDefault(True)
button_layout.addWidget(self.join_button)
self.cancel_button = QPushButton('Cancel')
self.cancel_button.clicked.connect(exit)
button_layout.addWidget(self.cancel_button)
button_widget.setLayout(button_layout)
layout.addWidget(button_widget)
def add_buttons(self, layout: QBoxLayout) -> None:
button_widget = QWidget()
button_layout = QHBoxLayout()
button_layout.addStretch()
self.join_button = QPushButton('Join')
self.join_button.clicked.connect(self.join_domain_if_inputs_are_ok)
self.join_button.setAutoDefault(True)
button_layout.addWidget(self.join_button)
self.cancel_button = QPushButton('Cancel')
self.cancel_button.clicked.connect(exit)
button_layout.addWidget(self.cancel_button)
button_widget.setLayout(button_layout)
layout.addWidget(button_widget)
def join_domain_if_inputs_are_ok(self) -> None:
if (
self.domainname_or_ip_input.hasAcceptableInput() and
self.admin_username_input.hasAcceptableInput() and
self.admin_password_input.hasAcceptableInput()
):
dc_ip, domain = self.get_domainname_or_dc_ip()
if not dc_ip and domain:
dc_ip = get_master_ip_through_dns(domain)
if not dc_ip:
DnsNotWorkingDialog().exec_()
return
self.join_domain(dc_ip, str(self.admin_username_input.text()), str(self.admin_password_input.text()), self.force_ucs_dns_input.isChecked())
else:
MissingInputsDialog().exec_()
def join_domain_if_inputs_are_ok(self) -> None:
if (
self.domainname_or_ip_input.hasAcceptableInput() and
self.admin_username_input.hasAcceptableInput() and
self.admin_password_input.hasAcceptableInput()
):
dc_ip, domain = self.get_domainname_or_dc_ip()
if not dc_ip and domain:
dc_ip = get_master_ip_through_dns(domain)
if not dc_ip:
DnsNotWorkingDialog().exec_()
return
self.join_domain(dc_ip, str(self.admin_username_input.text()), str(self.admin_password_input.text()), self.force_ucs_dns_input.isChecked())
else:
MissingInputsDialog().exec_()
def get_domainname_or_dc_ip(self) -> Tuple[Optional[str], Optional[str]]:
input_text = self.domainname_or_ip_input.text()
domainname_qregex = QRegExp(self.regex_domainname)
ip_qregex = QRegExp('%s|%s' % (self.regex_ipv4, self.regex_ipv6))
if domainname_qregex.exactMatch(input_text):
return None, input_text
elif ip_qregex.exactMatch(input_text):
return input_text, None
else:
return None, None
def get_domainname_or_dc_ip(self) -> Tuple[Optional[str], Optional[str]]:
input_text = self.domainname_or_ip_input.text()
domainname_qregex = QRegExp(self.regex_domainname)
ip_qregex = QRegExp('%s|%s' % (self.regex_ipv4, self.regex_ipv6))
if domainname_qregex.exactMatch(input_text):
return None, input_text
elif ip_qregex.exactMatch(input_text):
return input_text, None
else:
return None, None
def join_domain(self, dc_ip: str, admin_username: str, admin_pw: str, force_ucs_dns: bool) -> None:
self.join_thread = JoinThread(dc_ip, admin_username, admin_pw, force_ucs_dns)
self.join_thread.join_started.connect(self.join_started)
self.join_thread.join_successful.connect(self.join_successful)
self.join_thread.join_failed['QString'].connect(self.join_failed)
self.join_thread.ssh_failed.connect(self.ssh_failed)
self.join_thread.dist_failed.connect(self.dist_failed)
self.join_thread.start()
def join_domain(self, dc_ip: str, admin_username: str, admin_pw: str, force_ucs_dns: bool) -> None:
self.join_thread = JoinThread(dc_ip, admin_username, admin_pw, force_ucs_dns)
self.join_thread.join_started.connect(self.join_started)
self.join_thread.join_successful.connect(self.join_successful)
self.join_thread.join_failed['QString'].connect(self.join_failed)
self.join_thread.ssh_failed.connect(self.ssh_failed)
self.join_thread.dist_failed.connect(self.dist_failed)
self.join_thread.start()
def join_started(self) -> None:
self.join_button.setText('Joining...')
self.join_button.setEnabled(False)
self.cancel_button.setEnabled(False)
def join_started(self) -> None:
self.join_button.setText('Joining...')
self.join_button.setEnabled(False)
self.cancel_button.setEnabled(False)
def join_successful(self) -> None:
self.join_button.setText('Join')
self.cancel_button.setText('Close')
self.cancel_button.setEnabled(True)
SuccessfulJoinDialog().exec_()
def join_successful(self) -> None:
self.join_button.setText('Join')
self.cancel_button.setText('Close')
self.cancel_button.setEnabled(True)
SuccessfulJoinDialog().exec_()
def join_failed(self, err: str) -> None:
self.join_button.setText('Join')
self.join_button.setEnabled(True)
self.cancel_button.setEnabled(True)
FailedJoinDialog(err).exec_()
def join_failed(self, err: str) -> None:
self.join_button.setText('Join')
self.join_button.setEnabled(True)
self.cancel_button.setEnabled(True)
FailedJoinDialog(err).exec_()
def ssh_failed(self) -> None:
self.join_button.setText('Join')
self.join_button.setEnabled(True)
self.cancel_button.setEnabled(True)
FailedSSHDialog().exec_()
def ssh_failed(self) -> None:
self.join_button.setText('Join')
self.join_button.setEnabled(True)
self.cancel_button.setEnabled(True)
FailedSSHDialog().exec_()
def dist_failed(self) -> None:
self.join_button.setText('Join')
self.join_button.setEnabled(False)
self.cancel_button.setEnabled(True)
FailedDistDialog().exec_()
def dist_failed(self) -> None:
self.join_button.setText('Join')
self.join_button.setEnabled(False)
self.cancel_button.setEnabled(True)
FailedDistDialog().exec_()
class SuccessfulJoinDialog(QMessageBox):
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('Successful Join')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText('The domain join was successful. Please reboot the system.')
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('Successful Join')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText('The domain join was successful. Please reboot the system.')
class FailedJoinDialog(QMessageBox):
def __init__(self, err: str) -> None:
super().__init__()
self.setWindowTitle('Failed Join')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setStyleSheet("QMessageBox { messagebox-text-interaction-flags: 5; }")
self.setText(
'The domain join failed: {} For further information look at {}'.format(err, LOG)
)
def __init__(self, err: str) -> None:
super().__init__()
self.setWindowTitle('Failed Join')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setStyleSheet("QMessageBox { messagebox-text-interaction-flags: 5; }")
self.setText(
'The domain join failed: {} For further information look at {}'.format(err, LOG)
)
class FailedSSHDialog(QMessageBox):
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('SSH Connection Failed')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText(
'The SSH connection failed. Please check the address/username/password.'
)
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('SSH Connection Failed')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText(
'The SSH connection failed. Please check the address/username/password.'
)
class FailedDistDialog(QMessageBox):
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('Distribution Check Failed')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText(
'The used distribution {} is not supported.'.format(get_distribution())
)
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('Distribution Check Failed')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText(
'The used distribution {} is not supported.'.format(get_distribution())
)
class MissingInputsDialog(QMessageBox):
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('Missing Inputs')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText(
'At least one input field is not filled in, or filled with an invalid input.'
' Please fill all three input fields according to their description.'
)
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('Missing Inputs')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText(
'At least one input field is not filled in, or filled with an invalid input.'
' Please fill all three input fields according to their description.'
)
class DnsNotWorkingDialog(QMessageBox):
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('DNS Not Working')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText(
'No DNS record for the DC master could be found. Please make sure '
'that the DC master is the DNS server for this computer or use this'
' tool with the IP address of the DC master.'
)
def __init__(self) -> None:
super().__init__()
self.setWindowTitle('DNS Not Working')
scriptDir = os.path.dirname(os.path.realpath(__file__))
self.setWindowIcon(QIcon(scriptDir + os.path.sep + 'univention_icon.svg'))
self.setText(
'No DNS record for the DC master could be found. Please make sure '
'that the DC master is the DNS server for this computer or use this'
' tool with the IP address of the DC master.'
)
class DomainnameDetectionThread(QThread):
domain = pyqtSignal('QString')
domain = pyqtSignal('QString')
def run(self) -> None:
domainname = get_ucs_domainname()
if domainname:
self.domain.emit(domainname)
def run(self) -> None:
domainname = get_ucs_domainname()
if domainname:
self.domain.emit(domainname)
class JoinThread(QThread):
join_started = pyqtSignal()
ssh_failed = pyqtSignal()
dist_failed = pyqtSignal()
join_failed = pyqtSignal('QString')
join_successful = pyqtSignal()
join_started = pyqtSignal()
ssh_failed = pyqtSignal()
dist_failed = pyqtSignal()
join_failed = pyqtSignal('QString')
join_successful = pyqtSignal()
def __init__(self, dc_ip: str, admin_username: str, admin_pw: str, force_ucs_dns: bool) -> None:
super().__init__()
self.dc_ip = dc_ip
self.admin_username = admin_username
self.admin_pw = admin_pw
self.force_ucs_dns = force_ucs_dns
def __init__(self, dc_ip: str, admin_username: str, admin_pw: str, force_ucs_dns: bool) -> None:
super().__init__()
self.dc_ip = dc_ip
self.admin_username = admin_username
self.admin_pw = admin_pw
self.force_ucs_dns = force_ucs_dns
def run(self) -> None:
self.join_started.emit()
try:
try:
distribution_joiner = self.get_joiner_for_this_distribution()
except SshException:
self.ssh_failed.emit()
return
except DistributionException:
self.dist_failed.emit()
return
distribution_joiner.check_if_join_is_possible_without_problems()
distribution_joiner.create_backup_of_config_files()
distribution_joiner.join_domain()
except Exception as e:
getLogger("userinfo").critical(e, exc_info=True)
self.join_failed.emit(str(e))
return
self.join_successful.emit()
def run(self) -> None:
self.join_started.emit()
try:
try:
distribution_joiner = self.get_joiner_for_this_distribution()
except SshException:
self.ssh_failed.emit()
return
except DistributionException:
self.dist_failed.emit()
return
distribution_joiner.check_if_join_is_possible_without_problems()
distribution_joiner.create_backup_of_config_files()
distribution_joiner.join_domain()
except Exception as e:
getLogger("userinfo").critical(e, exc_info=True)
self.join_failed.emit(str(e))
return
self.join_successful.emit()
def get_joiner_for_this_distribution(self) -> AbstractJoiner:
distribution = get_distribution()
try:
distribution_join_module = importlib.import_module('univention_domain_join.distributions.%s' % (distribution.lower(),))
self.check_if_ssh_works_with_given_account()
ucr_variables = self.get_ucr_variables_from_dc()
if not ucr_variables:
raise DomainJoinException()
return distribution_join_module.Joiner(ucr_variables, self.admin_username, self.admin_pw, self.dc_ip, False, self.force_ucs_dns)
except ImportError:
getLogger("userinfo").critical('The used distribution "%s" is not supported.' % (distribution,))
raise DistributionException()
def get_joiner_for_this_distribution(self) -> AbstractJoiner:
distribution = get_distribution()
try:
distribution_join_module = importlib.import_module('univention_domain_join.distributions.%s' % (distribution.lower(),))
self.check_if_ssh_works_with_given_account()
ucr_variables = self.get_ucr_variables_from_dc()
if not ucr_variables:
raise DomainJoinException()
return distribution_join_module.Joiner(ucr_variables, self.admin_username, self.admin_pw, self.dc_ip, False, self.force_ucs_dns)
except ImportError:
getLogger("userinfo").critical('The used distribution "%s" is not supported.' % (distribution,))
raise DistributionException()
def check_if_ssh_works_with_given_account(self) -> bool:
cmd = "true"
ssh_process = ssh(self.admin_username, self.admin_pw, self.dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
if ssh_process.returncode != 0:
if stderr.decode().strip().endswith(': No route to host'):
raise SshException('IP not reachable via SSH.')
else:
raise SshException('It\'s not possible to connect to the UCS DC via ssh, with the given credentials.')
return False
return True
def check_if_ssh_works_with_given_account(self) -> bool:
cmd = "true"
ssh_process = ssh(self.admin_username, self.admin_pw, self.dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
if ssh_process.returncode != 0:
if stderr.decode().strip().endswith(': No route to host'):
raise SshException('IP not reachable via SSH.')
else:
raise SshException('It\'s not possible to connect to the UCS DC via ssh, with the given credentials.')
return False
return True
def get_ucr_variables_from_dc(self) -> Optional[Dict[str, str]]:
cmd = "/usr/sbin/ucr shell"
ssh_process = ssh(self.admin_username, self.admin_pw, self.dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert ssh_process.stdout
ucr_variables = dict(
line.decode("utf-8", "replace").strip().split("=", 1)
for line in ssh_process.stdout
)
def get_ucr_variables_from_dc(self) -> Optional[Dict[str, str]]:
cmd = "/usr/sbin/ucr shell"
ssh_process = ssh(self.admin_username, self.admin_pw, self.dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert ssh_process.stdout
ucr_variables = dict(
line.decode("utf-8", "replace").strip().split("=", 1)
for line in ssh_process.stdout
)
_, stderr = ssh_process.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
if ssh_process.wait() != 0:
getLogger("userinfo").critical('Fetching the UCR variables from the UCS DC failed.')
return None
_, stderr = ssh_process.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
if ssh_process.wait() != 0:
getLogger("userinfo").critical('Fetching the UCR variables from the UCS DC failed.')
return None
ucr_variables.pop("hostname", None)
return ucr_variables
ucr_variables.pop("hostname", None)
return ucr_variables
if __name__ == '__main__':
check_if_run_as_root()
ruid = int(os.environ.get('PKEXEC_UID', 0)) or int(os.environ.get('SUDO_UID', 0))
if ruid:
os.setresuid(ruid, ruid, 0)
check_if_run_as_root()
ruid = int(os.environ.get('PKEXEC_UID', 0)) or int(os.environ.get('SUDO_UID', 0))
if ruid:
os.setresuid(ruid, ruid, 0)
set_up_logging(LOG)
app = QApplication.setSetuidAllowed(True)
app = QApplication(sys.argv)
form = DomainJoinGui()
form.show()
sys.exit(app.exec_())
set_up_logging(LOG)
app = QApplication.setSetuidAllowed(True)
app = QApplication(sys.argv)
form = DomainJoinGui()
form.show()
sys.exit(app.exec_())

View File

@ -10,29 +10,29 @@ long_description = file:README.md
long_description_content_type = "text/markdown"
license = GNU Affero General Public License v3
classifiers =
Development Status :: 5 - Production/Stable
Environment :: Console
Environment :: X11 Applications :: Qt
Intended Audience :: System Administrators
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
License :: OSI Approved :: GNU Affero General Public License v3
Natural Language :: English
Operating System :: POSIX :: Linux
Topic :: System :: Systems Administration :: Authentication/Directory :: LDAP
Development Status :: 5 - Production/Stable
Environment :: Console
Environment :: X11 Applications :: Qt
Intended Audience :: System Administrators
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
Programming Language :: Python :: 3.11
License :: OSI Approved :: GNU Affero General Public License v3
Natural Language :: English
Operating System :: POSIX :: Linux
Topic :: System :: Systems Administration :: Authentication/Directory :: LDAP
[options]
zip_safe = False
python_requires = >=3.7
packages = find:
install_requires =
dnspython
IPy
netifaces
python-ldap
dnspython
IPy
netifaces
python-ldap
[options.extras_require]
gui = PyQt5
@ -41,18 +41,18 @@ dev = PyQt5-stubs; flake8; isort; mypy
[flake8]
max-line-length = 220
ignore =
# W191 indentation contains tabs
W191,
# E501 line too long
E501,
# W504 line break after binary operator
W504,
# W191 indentation contains tabs
W191,
# E501 line too long
E501,
# W504 line break after binary operator
W504,
exclude =
.git,
__pycache__,
build,
dist,
venv,
.git,
__pycache__,
build,
dist,
venv,
[isort]
line_length = 220

View File

@ -6,14 +6,14 @@ from typing import Dict
class AbstractJoiner(object):
def __init__(self, ucr_variables: Dict[str, str], admin_username: str, admin_pw: str, dc_ip: str, skip_login_manager: bool, force_ucs_dns: bool) -> None:
raise NotImplementedError()
def __init__(self, ucr_variables: Dict[str, str], admin_username: str, admin_pw: str, dc_ip: str, skip_login_manager: bool, force_ucs_dns: bool) -> None:
raise NotImplementedError()
def check_if_join_is_possible_without_problems(self) -> None:
raise NotImplementedError()
def check_if_join_is_possible_without_problems(self) -> None:
raise NotImplementedError()
def create_backup_of_config_files(self) -> None:
raise NotImplementedError()
def create_backup_of_config_files(self) -> None:
raise NotImplementedError()
def join_domain(self) -> None:
raise NotImplementedError()
def join_domain(self) -> None:
raise NotImplementedError()

View File

@ -21,78 +21,78 @@ userinfo_logger = logging.getLogger('userinfo')
class DomainJoinException(Exception):
pass
pass
class DcResolveException(Exception):
pass
pass
class Joiner(AbstractJoiner):
def __init__(self, ucr_variables: Dict[str, str], admin_username: str, admin_pw: str, dc_ip: str, skip_login_manager: bool, force_ucs_dns: bool) -> None:
self.admin_username = admin_username
self.admin_pw = admin_pw
self.dc_ip = dc_ip
self.skip_login_manager = skip_login_manager
self.force_ucs_dns = force_ucs_dns
self.domain = ucr_variables['domainname']
self.nameservers = [
ucr_variables['nameserver1'] if ucr_variables['nameserver1'] != "''" else '',
ucr_variables['nameserver2'] if ucr_variables['nameserver2'] != "''" else '',
ucr_variables['nameserver3'] if ucr_variables['nameserver3'] != "''" else ''
]
self.ldap_master = ucr_variables['ldap_master']
self.ldap_base = ucr_variables['ldap_base']
self.ldap_server_name = ucr_variables['ldap_server_name']
self.kerberos_realm = ucr_variables['kerberos_realm']
def __init__(self, ucr_variables: Dict[str, str], admin_username: str, admin_pw: str, dc_ip: str, skip_login_manager: bool, force_ucs_dns: bool) -> None:
self.admin_username = admin_username
self.admin_pw = admin_pw
self.dc_ip = dc_ip
self.skip_login_manager = skip_login_manager
self.force_ucs_dns = force_ucs_dns
self.domain = ucr_variables['domainname']
self.nameservers = [
ucr_variables['nameserver1'] if ucr_variables['nameserver1'] != "''" else '',
ucr_variables['nameserver2'] if ucr_variables['nameserver2'] != "''" else '',
ucr_variables['nameserver3'] if ucr_variables['nameserver3'] != "''" else ''
]
self.ldap_master = ucr_variables['ldap_master']
self.ldap_base = ucr_variables['ldap_base']
self.ldap_server_name = ucr_variables['ldap_server_name']
self.kerberos_realm = ucr_variables['kerberos_realm']
def check_if_join_is_possible_without_problems(self) -> None:
if not self.skip_login_manager and LoginManagerConfigurator().configuration_conflicts():
userinfo_logger.critical(
'Joining the UCS domain is not safely possible.\n'
'Please resolve all problems and run this tool again.'
)
raise DomainJoinException()
def check_if_join_is_possible_without_problems(self) -> None:
if not self.skip_login_manager and LoginManagerConfigurator().configuration_conflicts():
userinfo_logger.critical(
'Joining the UCS domain is not safely possible.\n'
'Please resolve all problems and run this tool again.'
)
raise DomainJoinException()
def create_backup_of_config_files(self) -> None:
backup_dir = self.create_backup_dir()
if self.force_ucs_dns:
DnsConfigurator(self.nameservers, self.domain).backup(backup_dir)
LdapConfigurator().backup(backup_dir)
SssdConfigurator().backup(backup_dir)
PamConfigurator().backup(backup_dir)
if not self.skip_login_manager:
LoginManagerConfigurator().backup(backup_dir)
KerberosConfigurator().backup(backup_dir)
userinfo_logger.info('Created a backup of all configuration files, that will be modified at \'%s\'.' % backup_dir)
def create_backup_of_config_files(self) -> None:
backup_dir = self.create_backup_dir()
if self.force_ucs_dns:
DnsConfigurator(self.nameservers, self.domain).backup(backup_dir)
LdapConfigurator().backup(backup_dir)
SssdConfigurator().backup(backup_dir)
PamConfigurator().backup(backup_dir)
if not self.skip_login_manager:
LoginManagerConfigurator().backup(backup_dir)
KerberosConfigurator().backup(backup_dir)
userinfo_logger.info('Created a backup of all configuration files, that will be modified at \'%s\'.' % backup_dir)
@execute_as_root
def create_backup_dir(self) -> str:
backup_dir = os.path.join('/var/univention-backup', time.strftime("%Y%m%d%H%M%S_domain-join", time.gmtime()))
os.makedirs(backup_dir)
return backup_dir
@execute_as_root
def create_backup_dir(self) -> str:
backup_dir = os.path.join('/var/univention-backup', time.strftime("%Y%m%d%H%M%S_domain-join", time.gmtime()))
os.makedirs(backup_dir)
return backup_dir
def join_domain(self) -> None:
try:
if self.force_ucs_dns:
userinfo_logger.info('changing network/dns configuration as requested.')
DnsConfigurator(self.nameservers, self.domain).configure_dns()
# check if we can resolve the ldap_server_name and ldap_master
if not name_is_resolvable(self.ldap_master):
raise DcResolveException('The UCS master name %s can not be resolved, please check your DNS settings' % self.ldap_master)
if not name_is_resolvable(self.ldap_server_name):
raise DcResolveException('The UCS DC name %s can not be resolved, please check your DNS settings' % self.ldap_server_name)
ldap.authenticate_admin(self.dc_ip, self.admin_username, self.admin_pw)
admin_dn = LdapConfigurator().get_admin_dn(self.dc_ip, self.admin_username, self.admin_pw, self.ldap_base)
is_samba_dc = ldap.is_samba_dc(self.admin_username, self.admin_pw, self.dc_ip, admin_dn)
LdapConfigurator().configure_ldap(self.dc_ip, self.ldap_server_name, self.admin_username, self.admin_pw, self.ldap_base, admin_dn)
SssdConfigurator().setup_sssd(self.dc_ip, self.ldap_master, self.ldap_server_name, self.admin_username, self.admin_pw, self.ldap_base, self.kerberos_realm, admin_dn, is_samba_dc)
PamConfigurator().setup_pam()
if not self.skip_login_manager:
LoginManagerConfigurator().enable_login_with_foreign_usernames()
KerberosConfigurator().configure_kerberos(self.kerberos_realm, self.ldap_master, self.ldap_server_name, is_samba_dc, self.dc_ip)
# TODO: Stop avahi service to prevent problems with sssd?
userinfo_logger.info('The domain join was successful.')
userinfo_logger.info('Please reboot the system.')
finally:
ldap.cleanup_authentication(self.dc_ip, self.admin_username, self.admin_pw)
def join_domain(self) -> None:
try:
if self.force_ucs_dns:
userinfo_logger.info('changing network/dns configuration as requested.')
DnsConfigurator(self.nameservers, self.domain).configure_dns()
# check if we can resolve the ldap_server_name and ldap_master
if not name_is_resolvable(self.ldap_master):
raise DcResolveException('The UCS master name %s can not be resolved, please check your DNS settings' % self.ldap_master)
if not name_is_resolvable(self.ldap_server_name):
raise DcResolveException('The UCS DC name %s can not be resolved, please check your DNS settings' % self.ldap_server_name)
ldap.authenticate_admin(self.dc_ip, self.admin_username, self.admin_pw)
admin_dn = LdapConfigurator().get_admin_dn(self.dc_ip, self.admin_username, self.admin_pw, self.ldap_base)
is_samba_dc = ldap.is_samba_dc(self.admin_username, self.admin_pw, self.dc_ip, admin_dn)
LdapConfigurator().configure_ldap(self.dc_ip, self.ldap_server_name, self.admin_username, self.admin_pw, self.ldap_base, admin_dn)
SssdConfigurator().setup_sssd(self.dc_ip, self.ldap_master, self.ldap_server_name, self.admin_username, self.admin_pw, self.ldap_base, self.kerberos_realm, admin_dn, is_samba_dc)
PamConfigurator().setup_pam()
if not self.skip_login_manager:
LoginManagerConfigurator().enable_login_with_foreign_usernames()
KerberosConfigurator().configure_kerberos(self.kerberos_realm, self.ldap_master, self.ldap_server_name, is_samba_dc, self.dc_ip)
# TODO: Stop avahi service to prevent problems with sssd?
userinfo_logger.info('The domain join was successful.')
userinfo_logger.info('Please reboot the system.')
finally:
ldap.cleanup_authentication(self.dc_ip, self.admin_username, self.admin_pw)

View File

@ -17,255 +17,255 @@ userinfo_logger = logging.getLogger('userinfo')
class DnsConfigurationException(Exception):
pass
pass
class DnsConfigurator(object):
def __init__(self, nameservers: List[str], domain: str) -> None:
self.nameservers = nameservers
self.domain = domain
def __init__(self, nameservers: List[str], domain: str) -> None:
self.nameservers = nameservers
self.domain = domain
if nameservers[0] == '':
userinfo_logger.critical(
'No name servers are configured in the UCR of the DC master.\n'
'Please repair it, before running this tool again.'
)
raise DnsConfigurationException()
if domain == '':
userinfo_logger.critical(
'No domain name is configured in the UCR of the DC master.\n'
'Please repair it, before running this tool again.'
)
raise DnsConfigurationException()
if nameservers[0] == '':
userinfo_logger.critical(
'No name servers are configured in the UCR of the DC master.\n'
'Please repair it, before running this tool again.'
)
raise DnsConfigurationException()
if domain == '':
userinfo_logger.critical(
'No domain name is configured in the UCR of the DC master.\n'
'Please repair it, before running this tool again.'
)
raise DnsConfigurationException()
self.working_configurator: BaseDnsConfigurator = DnsConfiguratorNetworkManager() if DnsConfiguratorNetworkManager().works_on_this_system() else DnsConfiguratorTrusty()
self.working_configurator: BaseDnsConfigurator = DnsConfiguratorNetworkManager() if DnsConfiguratorNetworkManager().works_on_this_system() else DnsConfiguratorTrusty()
def backup(self, backup_dir: str) -> None:
self.working_configurator.backup(backup_dir)
def backup(self, backup_dir: str) -> None:
self.working_configurator.backup(backup_dir)
@execute_as_root
def configure_dns(self) -> None:
self.working_configurator.configure_dns(self.nameservers, self.domain)
if self.domain.endswith('.local'):
subprocess.check_output([
'sed', '-i', '-E',
r's/^(hosts: +.*)( mdns4_minimal)(.*)\[NOTFOUND=return\](.*)( dns)(.*)/\1\5\2\3[NOTFOUND=return]\4\6/',
'/etc/nsswitch.conf'
], stderr=subprocess.STDOUT)
self.check_if_dns_works()
@execute_as_root
def configure_dns(self) -> None:
self.working_configurator.configure_dns(self.nameservers, self.domain)
if self.domain.endswith('.local'):
subprocess.check_output([
'sed', '-i', '-E',
r's/^(hosts: +.*)( mdns4_minimal)(.*)\[NOTFOUND=return\](.*)( dns)(.*)/\1\5\2\3[NOTFOUND=return]\4\6/',
'/etc/nsswitch.conf'
], stderr=subprocess.STDOUT)
self.check_if_dns_works()
def check_if_dns_works(self) -> None:
resolver = dns.resolver.Resolver()
try:
resolver.query('_domaincontroller_master._tcp.%s.' % (self.domain,), 'SRV')
except dns.resolver.NXDOMAIN:
userinfo_logger.critical(
'Setting up DNS did not work. Try removing any DNS settings in '
'the network-manager and give this tool the IP address of the DC master.'
)
raise DnsConfigurationException()
def check_if_dns_works(self) -> None:
resolver = dns.resolver.Resolver()
try:
resolver.query('_domaincontroller_master._tcp.%s.' % (self.domain,), 'SRV')
except dns.resolver.NXDOMAIN:
userinfo_logger.critical(
'Setting up DNS did not work. Try removing any DNS settings in '
'the network-manager and give this tool the IP address of the DC master.'
)
raise DnsConfigurationException()
class BaseDnsConfigurator(object):
def backup(self, backup_dir: str) -> None:
raise NotImplementedError()
def backup(self, backup_dir: str) -> None:
raise NotImplementedError()
def configure_dns(self, nameservers: List[str], domain: str) -> None:
raise NotImplementedError()
def configure_dns(self, nameservers: List[str], domain: str) -> None:
raise NotImplementedError()
class DnsConfiguratorTrusty(BaseDnsConfigurator):
def __init__(self) -> None:
self.sub_configurators: Tuple[BaseDnsConfigurator, ...] = (DnsConfiguratorDHClient(), DnsConfiguratorOldNetworkManager(), DnsConfiguratorResolvconf())
def __init__(self) -> None:
self.sub_configurators: Tuple[BaseDnsConfigurator, ...] = (DnsConfiguratorDHClient(), DnsConfiguratorOldNetworkManager(), DnsConfiguratorResolvconf())
def backup(self, backup_dir: str) -> None:
for configurator in self.sub_configurators:
configurator.backup(backup_dir)
def backup(self, backup_dir: str) -> None:
for configurator in self.sub_configurators:
configurator.backup(backup_dir)
def configure_dns(self, nameservers: List[str], domain: str) -> None:
for configurator in self.sub_configurators:
configurator.configure_dns(nameservers, domain)
def configure_dns(self, nameservers: List[str], domain: str) -> None:
for configurator in self.sub_configurators:
configurator.configure_dns(nameservers, domain)
class DnsConfiguratorSystemd(BaseDnsConfigurator):
def works_on_this_system(self) -> bool:
cmd = ['service', 'systemd-resolved', 'status']
proc = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE
)
_, stderr = proc.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", cmd, proc.returncode, stderr.decode())
return proc.returncode == 0
def works_on_this_system(self) -> bool:
cmd = ['service', 'systemd-resolved', 'status']
proc = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE
)
_, stderr = proc.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", cmd, proc.returncode, stderr.decode())
return proc.returncode == 0
@execute_as_root
def backup(self, backup_dir: str) -> None:
if os.path.isfile('/etc/systemd/resolved.conf'):
userinfo_logger.warn('Warning: /etc/systemd/resolved.conf already exists.')
os.makedirs(os.path.join(backup_dir, 'etc/systemd'), exist_ok=True)
copyfile(
'/etc/systemd/resolved.conf',
os.path.join(backup_dir, 'etc/systemd/resolved.conf')
)
@execute_as_root
def backup(self, backup_dir: str) -> None:
if os.path.isfile('/etc/systemd/resolved.conf'):
userinfo_logger.warn('Warning: /etc/systemd/resolved.conf already exists.')
os.makedirs(os.path.join(backup_dir, 'etc/systemd'), exist_ok=True)
copyfile(
'/etc/systemd/resolved.conf',
os.path.join(backup_dir, 'etc/systemd/resolved.conf')
)
@execute_as_root
def configure_dns(self, nameservers: List[str], domain: str) -> None:
userinfo_logger.info('Writing /etc/systemd/resolved.conf')
with open('/etc/systemd/resolved.conf', 'w') as conf_file:
conf_file.write('[Resolve]\n')
conf_file.write('DNS=%s\n' % (' '.join(nameservers),))
conf_file.write('Domains=%s\n' % (domain,))
@execute_as_root
def configure_dns(self, nameservers: List[str], domain: str) -> None:
userinfo_logger.info('Writing /etc/systemd/resolved.conf')
with open('/etc/systemd/resolved.conf', 'w') as conf_file:
conf_file.write('[Resolve]\n')
conf_file.write('DNS=%s\n' % (' '.join(nameservers),))
conf_file.write('Domains=%s\n' % (domain,))
userinfo_logger.info('Restarting systemd-resolved.')
subprocess.check_output(['systemctl', 'restart', 'systemd-resolved'], stderr=subprocess.STDOUT)
userinfo_logger.info('Restarting systemd-resolved.')
subprocess.check_output(['systemctl', 'restart', 'systemd-resolved'], stderr=subprocess.STDOUT)
class DnsConfiguratorNetworkManager(BaseDnsConfigurator):
def works_on_this_system(self) -> bool:
# could also check lsb_release -sr here instead
proc = subprocess.Popen(
['nmcli', '-v'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = proc.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", proc.args, proc.returncode, stderr.decode())
if proc.returncode != 0:
return False
nmcli_version = stdout.split()[-1]
p = subprocess.Popen(
['dpkg', '--compare-versions', nmcli_version, 'gt', '1'],
)
return p.wait() == 0
def works_on_this_system(self) -> bool:
# could also check lsb_release -sr here instead
proc = subprocess.Popen(
['nmcli', '-v'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = proc.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", proc.args, proc.returncode, stderr.decode())
if proc.returncode != 0:
return False
nmcli_version = stdout.split()[-1]
p = subprocess.Popen(
['dpkg', '--compare-versions', nmcli_version, 'gt', '1'],
)
return p.wait() == 0
@execute_as_root
def backup(self, backup_dir: str) -> None:
# TODO: where does nmcli store the DNS settings?
return
@execute_as_root
def backup(self, backup_dir: str) -> None:
# TODO: where does nmcli store the DNS settings?
return
@execute_as_root
def configure_dns(self, nameservers: List[str], domain: str) -> None:
p = subprocess.Popen(
['nmcli', '-t', '-f', 'NAME,UUID,DEVICE', 'connection', 'show', '--active'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", p.args, p.returncode, stderr.decode())
if p.returncode != 0:
raise DnsConfigurationException()
for line in stdout.decode().splitlines():
conn_name, conn_uuid, conn_dev = line.split(':')
userinfo_logger.info('Configuring ipv4 DNS servers for %s.' % conn_dev)
subprocess.call([
'nmcli', 'connection', 'modify', conn_uuid,
'ipv4.dns', " ".join(filter(lambda x: x, nameservers)).encode(),
'ipv4.ignore-auto-dns', 'yes',
'ipv4.dns-search', domain.encode()
])
userinfo_logger.info('Applying new settings to %s.' % conn_dev)
subprocess.call(
['nmcli', 'connection', 'down', conn_uuid.encode()]
)
subprocess.call(
['nmcli', 'connection', 'up', conn_uuid.encode()]
)
@execute_as_root
def configure_dns(self, nameservers: List[str], domain: str) -> None:
p = subprocess.Popen(
['nmcli', '-t', '-f', 'NAME,UUID,DEVICE', 'connection', 'show', '--active'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", p.args, p.returncode, stderr.decode())
if p.returncode != 0:
raise DnsConfigurationException()
for line in stdout.decode().splitlines():
conn_name, conn_uuid, conn_dev = line.split(':')
userinfo_logger.info('Configuring ipv4 DNS servers for %s.' % conn_dev)
subprocess.call([
'nmcli', 'connection', 'modify', conn_uuid,
'ipv4.dns', " ".join(filter(lambda x: x, nameservers)).encode(),
'ipv4.ignore-auto-dns', 'yes',
'ipv4.dns-search', domain.encode()
])
userinfo_logger.info('Applying new settings to %s.' % conn_dev)
subprocess.call(
['nmcli', 'connection', 'down', conn_uuid.encode()]
)
subprocess.call(
['nmcli', 'connection', 'up', conn_uuid.encode()]
)
class DnsConfiguratorOldNetworkManager(BaseDnsConfigurator):
@execute_as_root
def backup(self, backup_dir: str) -> None:
p = subprocess.Popen(
['nmcli', '-t', '-f', 'NAME,UUID', 'connection', 'list'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", p.args, p.returncode, stderr.decode())
if p.returncode != 0:
raise DnsConfigurationException()
for line in stdout.splitlines():
conn_name, conn_uuid = line.decode().split(':')
fn = '/etc/NetworkManager/system-connections/%s' % conn_name
fn_backup = os.path.join(backup_dir, fn[1:])
if os.path.isfile(fn):
userinfo_logger.info('Backing up %s' % fn)
os.makedirs(os.path.join(backup_dir, 'etc/NetworkManager/system-connections'), exist_ok=True)
copyfile(
fn,
fn_backup
)
os.chmod(fn_backup, 0o600)
@execute_as_root
def backup(self, backup_dir: str) -> None:
p = subprocess.Popen(
['nmcli', '-t', '-f', 'NAME,UUID', 'connection', 'list'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", p.args, p.returncode, stderr.decode())
if p.returncode != 0:
raise DnsConfigurationException()
for line in stdout.splitlines():
conn_name, conn_uuid = line.decode().split(':')
fn = '/etc/NetworkManager/system-connections/%s' % conn_name
fn_backup = os.path.join(backup_dir, fn[1:])
if os.path.isfile(fn):
userinfo_logger.info('Backing up %s' % fn)
os.makedirs(os.path.join(backup_dir, 'etc/NetworkManager/system-connections'), exist_ok=True)
copyfile(
fn,
fn_backup
)
os.chmod(fn_backup, 0o600)
@execute_as_root
def configure_dns(self, nameservers: List[str], domain: str) -> None:
ns_string = ';'.join(filter(lambda x: x, nameservers)) + ';'
p = subprocess.Popen(
['nmcli', '-t', '-f', 'NAME,UUID', 'connection', 'list'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", p.args, p.returncode, stderr.decode())
if p.returncode != 0:
raise DnsConfigurationException()
for line in stdout.splitlines():
conn_name, conn_uuid = line.decode().split(':')
fn = '/etc/NetworkManager/system-connections/%s' % conn_name
if os.path.isfile(fn):
Config = configparser.ConfigParser()
Config.read(fn)
Config.set('ipv4', 'dns', ns_string)
Config.set('ipv4', 'dns-search', '')
Config.set('ipv4', 'ignore-auto-dns', 'true')
with open(fn, 'w') as f:
Config.write(f)
subprocess.check_output(['service', 'network-manager', 'restart'], stderr=subprocess.STDOUT)
@execute_as_root
def configure_dns(self, nameservers: List[str], domain: str) -> None:
ns_string = ';'.join(filter(lambda x: x, nameservers)) + ';'
p = subprocess.Popen(
['nmcli', '-t', '-f', 'NAME,UUID', 'connection', 'list'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", p.args, p.returncode, stderr.decode())
if p.returncode != 0:
raise DnsConfigurationException()
for line in stdout.splitlines():
conn_name, conn_uuid = line.decode().split(':')
fn = '/etc/NetworkManager/system-connections/%s' % conn_name
if os.path.isfile(fn):
Config = configparser.ConfigParser()
Config.read(fn)
Config.set('ipv4', 'dns', ns_string)
Config.set('ipv4', 'dns-search', '')
Config.set('ipv4', 'ignore-auto-dns', 'true')
with open(fn, 'w') as f:
Config.write(f)
subprocess.check_output(['service', 'network-manager', 'restart'], stderr=subprocess.STDOUT)
class DnsConfiguratorDHClient(BaseDnsConfigurator):
@execute_as_root
def backup(self, backup_dir: str) -> None:
if os.path.isfile('/etc/dhcp/dhclient.conf'):
os.makedirs(os.path.join(backup_dir, 'etc/dhcp'), exist_ok=True)
copyfile(
'/etc/dhcp/dhclient.conf',
os.path.join(backup_dir, 'etc/dhcp/dhclient.conf')
)
@execute_as_root
def backup(self, backup_dir: str) -> None:
if os.path.isfile('/etc/dhcp/dhclient.conf'):
os.makedirs(os.path.join(backup_dir, 'etc/dhcp'), exist_ok=True)
copyfile(
'/etc/dhcp/dhclient.conf',
os.path.join(backup_dir, 'etc/dhcp/dhclient.conf')
)
@execute_as_root
def configure_dns(self, nameservers: List[str], domain: str) -> None:
ns_string = " ".join(filter(lambda x: x, nameservers))
p = subprocess.Popen([
'grep', '-q', '^prepend domain-name-servers %s' % ns_string,
'/etc/dhcp/dhclient.conf'
], stderr=subprocess.PIPE)
_, stderr = p.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", p.args, p.returncode, stderr.decode())
if p.returncode == 0:
userinfo_logger.info('"prepend domain-name-servers" already in /etc/dhcp/dhclient.conf')
return
userinfo_logger.info('Adjusting /etc/dhcp/dhclient.conf')
with open('/etc/dhcp/dhclient.conf', 'a') as conf_file:
conf_file.write('\nprepend domain-name-servers %s\n' % (ns_string,))
@execute_as_root
def configure_dns(self, nameservers: List[str], domain: str) -> None:
ns_string = " ".join(filter(lambda x: x, nameservers))
p = subprocess.Popen([
'grep', '-q', '^prepend domain-name-servers %s' % ns_string,
'/etc/dhcp/dhclient.conf'
], stderr=subprocess.PIPE)
_, stderr = p.communicate()
logging.getLogger('debugging').debug("%r returned %d: %s", p.args, p.returncode, stderr.decode())
if p.returncode == 0:
userinfo_logger.info('"prepend domain-name-servers" already in /etc/dhcp/dhclient.conf')
return
userinfo_logger.info('Adjusting /etc/dhcp/dhclient.conf')
with open('/etc/dhcp/dhclient.conf', 'a') as conf_file:
conf_file.write('\nprepend domain-name-servers %s\n' % (ns_string,))
class DnsConfiguratorResolvconf(BaseDnsConfigurator):
@execute_as_root
def backup(self, backup_dir: str) -> None:
if os.path.isfile('/etc/resolvconf/resolv.conf.d/base'):
userinfo_logger.warn('Warning: /etc/resolvconf/resolv.conf.d/base already exists.')
os.makedirs(os.path.join(backup_dir, 'etc/resolvconf/resolv.conf.d'), exist_ok=True)
copyfile(
'/etc/resolvconf/resolv.conf.d/base',
os.path.join(backup_dir, 'etc/resolvconf/resolv.conf.d/base')
)
@execute_as_root
def backup(self, backup_dir: str) -> None:
if os.path.isfile('/etc/resolvconf/resolv.conf.d/base'):
userinfo_logger.warn('Warning: /etc/resolvconf/resolv.conf.d/base already exists.')
os.makedirs(os.path.join(backup_dir, 'etc/resolvconf/resolv.conf.d'), exist_ok=True)
copyfile(
'/etc/resolvconf/resolv.conf.d/base',
os.path.join(backup_dir, 'etc/resolvconf/resolv.conf.d/base')
)
@execute_as_root
def configure_dns(self, nameservers: List[str], domain: str) -> None:
userinfo_logger.info('Writing /etc/resolvconf/resolv.conf.d/base')
with open('/etc/resolvconf/resolv.conf.d/base', 'w') as conf_file:
for nameserver in nameservers:
if nameserver != '':
conf_file.write('nameserver %s\n' % (nameserver,))
conf_file.write('domain %s' % (domain,))
@execute_as_root
def configure_dns(self, nameservers: List[str], domain: str) -> None:
userinfo_logger.info('Writing /etc/resolvconf/resolv.conf.d/base')
with open('/etc/resolvconf/resolv.conf.d/base', 'w') as conf_file:
for nameserver in nameservers:
if nameserver != '':
conf_file.write('nameserver %s\n' % (nameserver,))
conf_file.write('domain %s' % (domain,))
userinfo_logger.info('Applying new resolvconf settings.')
subprocess.check_output(['service', 'resolvconf', 'stop'], stderr=subprocess.STDOUT)
subprocess.check_output(['service', 'resolvconf', 'start'], stderr=subprocess.STDOUT)
userinfo_logger.info('Applying new resolvconf settings.')
subprocess.check_output(['service', 'resolvconf', 'stop'], stderr=subprocess.STDOUT)
subprocess.check_output(['service', 'resolvconf', 'start'], stderr=subprocess.STDOUT)

View File

@ -13,64 +13,64 @@ userinfo_logger = logging.getLogger('userinfo')
class ConflictChecker(object):
def config_file_exists(self) -> bool:
if os.path.isfile('/etc/krb5.conf'):
userinfo_logger.warn('Warning: /etc/krb5.conf already exists.')
return True
return False
def config_file_exists(self) -> bool:
if os.path.isfile('/etc/krb5.conf'):
userinfo_logger.warn('Warning: /etc/krb5.conf already exists.')
return True
return False
class KerberosConfigurator(ConflictChecker):
@execute_as_root
def backup(self, backup_dir: str) -> None:
if self.config_file_exists():
os.makedirs(os.path.join(backup_dir, 'etc'), exist_ok=True)
copyfile(
'/etc/krb5.conf',
os.path.join(backup_dir, 'etc/krb5.conf')
)
@execute_as_root
def backup(self, backup_dir: str) -> None:
if self.config_file_exists():
os.makedirs(os.path.join(backup_dir, 'etc'), exist_ok=True)
copyfile(
'/etc/krb5.conf',
os.path.join(backup_dir, 'etc/krb5.conf')
)
def configure_kerberos(self, kerberos_realm: str, ldap_master: str, ldap_server_name: str, is_samba_dc: bool, dc_ip: str) -> None:
self.write_config_file(kerberos_realm, ldap_master, ldap_server_name, is_samba_dc)
self.synchronize_time_with_master(dc_ip)
def configure_kerberos(self, kerberos_realm: str, ldap_master: str, ldap_server_name: str, is_samba_dc: bool, dc_ip: str) -> None:
self.write_config_file(kerberos_realm, ldap_master, ldap_server_name, is_samba_dc)
self.synchronize_time_with_master(dc_ip)
@execute_as_root
def write_config_file(self, kerberos_realm: str, ldap_master: str, ldap_server_name: str, is_samba_dc: bool) -> None:
userinfo_logger.info('Writing /etc/krb5.conf ')
if is_samba_dc:
kpasswd_name = ldap_server_name
else:
kpasswd_name = ldap_master
config = \
'[libdefaults]\n' \
' default_realm = %(kerberos_realm)s\n' \
' kdc_timesync = 1\n' \
' ccache_type = 4\n' \
' forwardable = true\n' \
' proxiable = true\n' \
' default_tkt_enctypes = arcfour-hmac-md5 des-cbc-md5 des3-hmac-sha1 des-cbc-crc des-cbc-md4 des3-cbc-sha1 aes128-cts-hmac-sha1-96 aes256-cts-hmac-sha1-96\n' \
' permitted_enctypes = des3-hmac-sha1 des-cbc-crc des-cbc-md4 des-cbc-md5 des3-cbc-sha1 arcfour-hmac-md5 aes128-cts-hmac-sha1-96 aes256-cts-hmac-sha1-96\n' \
' allow_weak_crypto=true\n' \
' rdns = false\n' \
'\n' \
'[realms]\n' \
'%(kerberos_realm)s = {\n' \
' kdc = %(ldap_server_name)s\n' \
' admin_server = %(ldap_server_name)s\n' \
' kpasswd_server = %(kpasswd_name)s\n' \
'}\n' \
% {
'kerberos_realm': kerberos_realm,
'ldap_server_name': ldap_server_name,
'kpasswd_name': kpasswd_name
}
with open('/etc/krb5.conf', 'w') as conf_file:
conf_file.write(config)
@execute_as_root
def write_config_file(self, kerberos_realm: str, ldap_master: str, ldap_server_name: str, is_samba_dc: bool) -> None:
userinfo_logger.info('Writing /etc/krb5.conf ')
if is_samba_dc:
kpasswd_name = ldap_server_name
else:
kpasswd_name = ldap_master
config = \
'[libdefaults]\n' \
' default_realm = %(kerberos_realm)s\n' \
' kdc_timesync = 1\n' \
' ccache_type = 4\n' \
' forwardable = true\n' \
' proxiable = true\n' \
' default_tkt_enctypes = arcfour-hmac-md5 des-cbc-md5 des3-hmac-sha1 des-cbc-crc des-cbc-md4 des3-cbc-sha1 aes128-cts-hmac-sha1-96 aes256-cts-hmac-sha1-96\n' \
' permitted_enctypes = des3-hmac-sha1 des-cbc-crc des-cbc-md4 des-cbc-md5 des3-cbc-sha1 arcfour-hmac-md5 aes128-cts-hmac-sha1-96 aes256-cts-hmac-sha1-96\n' \
' allow_weak_crypto=true\n' \
' rdns = false\n' \
'\n' \
'[realms]\n' \
'%(kerberos_realm)s = {\n' \
' kdc = %(ldap_server_name)s\n' \
' admin_server = %(ldap_server_name)s\n' \
' kpasswd_server = %(kpasswd_name)s\n' \
'}\n' \
% {
'kerberos_realm': kerberos_realm,
'ldap_server_name': ldap_server_name,
'kpasswd_name': kpasswd_name
}
with open('/etc/krb5.conf', 'w') as conf_file:
conf_file.write(config)
@execute_as_root
def synchronize_time_with_master(self, dc_ip: str) -> None:
userinfo_logger.info('Synchronizing time with the DC')
subprocess.check_output(
['ntpdate', '-b', '-u', '-t', '5', dc_ip],
stderr=subprocess.STDOUT
)
@execute_as_root
def synchronize_time_with_master(self, dc_ip: str) -> None:
userinfo_logger.info('Synchronizing time with the DC')
subprocess.check_output(
['ntpdate', '-b', '-u', '-t', '5', dc_ip],
stderr=subprocess.STDOUT
)

View File

@ -18,129 +18,129 @@ log = getLogger("debugging")
class LdapConfigutationException(Exception):
pass
pass
class ConflictChecker(object):
def ldap_conf_exists(self) -> bool:
if os.path.isfile('/etc/ldap/ldap.conf'):
userinfo_logger.warn('Warning: /etc/ldap/ldap.conf already exists.')
return True
return False
def ldap_conf_exists(self) -> bool:
if os.path.isfile('/etc/ldap/ldap.conf'):
userinfo_logger.warn('Warning: /etc/ldap/ldap.conf already exists.')
return True
return False
class LdapConfigurator(ConflictChecker):
@execute_as_root
def backup(self, backup_dir: str) -> None:
if self.ldap_conf_exists():
os.makedirs(os.path.join(backup_dir, 'etc/ldap'), exist_ok=True)
copyfile(
'/etc/ldap/ldap.conf',
os.path.join(backup_dir, 'etc/ldap/ldap.conf')
)
@execute_as_root
def backup(self, backup_dir: str) -> None:
if self.ldap_conf_exists():
os.makedirs(os.path.join(backup_dir, 'etc/ldap'), exist_ok=True)
copyfile(
'/etc/ldap/ldap.conf',
os.path.join(backup_dir, 'etc/ldap/ldap.conf')
)
def configure_ldap(self, dc_ip: str, ldap_server_name: str, admin_username: str, admin_pw: str, ldap_base: str, admin_dn: str) -> None:
RootCertificateProvider().provide_ucs_root_certififcate(dc_ip)
password = self.random_password()
self.modify_old_entry_or_add_machine_to_ldap(password, dc_ip, admin_username, admin_pw, ldap_base, admin_dn)
self.create_ldap_conf_file(ldap_server_name, ldap_base)
self.create_machine_secret_file(password)
def configure_ldap(self, dc_ip: str, ldap_server_name: str, admin_username: str, admin_pw: str, ldap_base: str, admin_dn: str) -> None:
RootCertificateProvider().provide_ucs_root_certififcate(dc_ip)
password = self.random_password()
self.modify_old_entry_or_add_machine_to_ldap(password, dc_ip, admin_username, admin_pw, ldap_base, admin_dn)
self.create_ldap_conf_file(ldap_server_name, ldap_base)
self.create_machine_secret_file(password)
def modify_old_entry_or_add_machine_to_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str, admin_dn: str) -> str:
try:
udm_type, dn = get_machines_udm(dc_ip, admin_username, admin_pw, admin_dn)
log.info("Found existing LDAP entry %r of type %r", dn, udm_type)
except LookupError:
dn = self.add_machine_to_ldap(password, dc_ip, admin_username, admin_pw, ldap_base, admin_dn)
log.info("Created LDAP entry %r", dn)
else:
self.modify_machine_in_ldap(password, dc_ip, admin_username, admin_pw, admin_dn, udm_type, dn)
log.info("Modified LDAP entry %r", dn)
return dn
def modify_old_entry_or_add_machine_to_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str, admin_dn: str) -> str:
try:
udm_type, dn = get_machines_udm(dc_ip, admin_username, admin_pw, admin_dn)
log.info("Found existing LDAP entry %r of type %r", dn, udm_type)
except LookupError:
dn = self.add_machine_to_ldap(password, dc_ip, admin_username, admin_pw, ldap_base, admin_dn)
log.info("Created LDAP entry %r", dn)
else:
self.modify_machine_in_ldap(password, dc_ip, admin_username, admin_pw, admin_dn, udm_type, dn)
log.info("Modified LDAP entry %r", dn)
return dn
def modify_machine_in_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str, udm_type: str, dn: str) -> None:
userinfo_logger.info('Updating old LDAP entry for this machine on the UCS DC')
def modify_machine_in_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str, udm_type: str, dn: str) -> None:
userinfo_logger.info('Updating old LDAP entry for this machine on the UCS DC')
release_id = get_distribution()
release = get_release()
release_id = get_distribution()
release = get_release()
cmd = [
'/usr/sbin/udm',
udm_type,
'modify',
'--binddn', admin_dn,
'--bindpwdfile', PW(admin_username),
'--dn', dn,
'--set', 'password=%s' % (password,),
'--set', 'operatingSystem=%s' % (release_id,),
'--set', 'operatingSystemVersion=%s' % (release,)
]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate()
if ssh_process.returncode != 0:
userinfo_logger.critical('Updating the old LDAP entry for this computer failed.')
log.critical("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
raise LdapConfigutationException()
cmd = [
'/usr/sbin/udm',
udm_type,
'modify',
'--binddn', admin_dn,
'--bindpwdfile', PW(admin_username),
'--dn', dn,
'--set', 'password=%s' % (password,),
'--set', 'operatingSystem=%s' % (release_id,),
'--set', 'operatingSystemVersion=%s' % (release,)
]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate()
if ssh_process.returncode != 0:
userinfo_logger.critical('Updating the old LDAP entry for this computer failed.')
log.critical("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
raise LdapConfigutationException()
def add_machine_to_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str, admin_dn: str) -> str:
userinfo_logger.info('Adding LDAP entry for this machine on the UCS DC')
hostname = subprocess.check_output(['hostname', '-s']).strip().decode()
release_id = get_distribution()
release = get_release()
# TODO: Also add MAC address. Which NIC's address should be used?
udm_command = [
'/usr/sbin/udm', 'computers/ubuntu', 'create',
'--binddn', admin_dn,
'--bindpwdfile', PW(admin_username),
'--position', 'cn=computers,%s' % (ldap_base,),
'--set', 'name=%s' % (hostname,),
'--set', 'password=%s' % (password,),
'--set', 'operatingSystem=%s' % (release_id,),
'--set', 'operatingSystemVersion=%s' % (release,)
]
ssh_process = ssh(admin_username, admin_pw, dc_ip, udm_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = ssh_process.communicate()
if ssh_process.returncode != 0 or stderr.decode().startswith('E: '):
userinfo_logger.critical('Adding an LDAP object for this computer didn\'t work.')
userinfo_logger.critical(stderr.decode())
raise LdapConfigutationException()
prefix, _, dn = stdout.decode().strip().partition(": ")
assert prefix == "Object created"
return dn
def add_machine_to_ldap(self, password: str, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str, admin_dn: str) -> str:
userinfo_logger.info('Adding LDAP entry for this machine on the UCS DC')
hostname = subprocess.check_output(['hostname', '-s']).strip().decode()
release_id = get_distribution()
release = get_release()
# TODO: Also add MAC address. Which NIC's address should be used?
udm_command = [
'/usr/sbin/udm', 'computers/ubuntu', 'create',
'--binddn', admin_dn,
'--bindpwdfile', PW(admin_username),
'--position', 'cn=computers,%s' % (ldap_base,),
'--set', 'name=%s' % (hostname,),
'--set', 'password=%s' % (password,),
'--set', 'operatingSystem=%s' % (release_id,),
'--set', 'operatingSystemVersion=%s' % (release,)
]
ssh_process = ssh(admin_username, admin_pw, dc_ip, udm_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = ssh_process.communicate()
if ssh_process.returncode != 0 or stderr.decode().startswith('E: '):
userinfo_logger.critical('Adding an LDAP object for this computer didn\'t work.')
userinfo_logger.critical(stderr.decode())
raise LdapConfigutationException()
prefix, _, dn = stdout.decode().strip().partition(": ")
assert prefix == "Object created"
return dn
def get_admin_dn(self, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str) -> str:
userinfo_logger.info('Getting the DN of the Administrator ')
ldap_command = ['ldapwhoami', '-QY', 'GSSAPI']
ssh_process = ssh(admin_username, admin_pw, dc_ip, ldap_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = ssh_process.communicate()
if ssh_process.returncode != 0:
userinfo_logger.critical('get admin DN failed with: {}'.format(stderr.decode()))
raise LdapConfigutationException('get admin DN failed with: {}'.format(stderr.decode()))
dn, _, admin_dn = stdout.decode().strip().partition(':')
assert dn == "dn", stdout
return admin_dn
def get_admin_dn(self, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str) -> str:
userinfo_logger.info('Getting the DN of the Administrator ')
ldap_command = ['ldapwhoami', '-QY', 'GSSAPI']
ssh_process = ssh(admin_username, admin_pw, dc_ip, ldap_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = ssh_process.communicate()
if ssh_process.returncode != 0:
userinfo_logger.critical('get admin DN failed with: {}'.format(stderr.decode()))
raise LdapConfigutationException('get admin DN failed with: {}'.format(stderr.decode()))
dn, _, admin_dn = stdout.decode().strip().partition(':')
assert dn == "dn", stdout
return admin_dn
@execute_as_root
def create_ldap_conf_file(self, ldap_server_name: str, ldap_base: str) -> None:
userinfo_logger.info('Writing /etc/ldap/ldap.conf ')
ldap_conf = \
"TLS_CACERT /etc/univention/ssl/ucsCA/CAcert.pem\n" \
"URI ldap://%s:7389\n" \
"BASE %s\n" % (ldap_server_name, ldap_base)
@execute_as_root
def create_ldap_conf_file(self, ldap_server_name: str, ldap_base: str) -> None:
userinfo_logger.info('Writing /etc/ldap/ldap.conf ')
ldap_conf = \
"TLS_CACERT /etc/univention/ssl/ucsCA/CAcert.pem\n" \
"URI ldap://%s:7389\n" \
"BASE %s\n" % (ldap_server_name, ldap_base)
with open('/etc/ldap/ldap.conf', 'w') as conf_file:
conf_file.write(ldap_conf)
with open('/etc/ldap/ldap.conf', 'w') as conf_file:
conf_file.write(ldap_conf)
@execute_as_root
def create_machine_secret_file(self, password: str) -> None:
userinfo_logger.info('Writing /etc/machine.secret ')
with open('/etc/machine.secret', 'w') as secret_file:
secret_file.write(password)
os.chmod('/etc/machine.secret', stat.S_IREAD)
@execute_as_root
def create_machine_secret_file(self, password: str) -> None:
userinfo_logger.info('Writing /etc/machine.secret ')
with open('/etc/machine.secret', 'w') as secret_file:
secret_file.write(password)
os.chmod('/etc/machine.secret', stat.S_IREAD)
def random_password(self, length: int = 20) -> str:
chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!"#$%&\'()*+,-./:;<=>?@[]^_`{|}~'
password = ''
for _ in range(length):
password += chars[ord(os.urandom(1)) % len(chars)]
return password
def random_password(self, length: int = 20) -> str:
chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!"#$%&\'()*+,-./:;<=>?@[]^_`{|}~'
password = ''
for _ in range(length):
password += chars[ord(os.urandom(1)) % len(chars)]
return password

View File

@ -13,79 +13,79 @@ userinfo_logger = logging.getLogger('userinfo')
class ConflictChecker(object):
def configuration_conflicts(self) -> bool:
login_manager = self.determin_used_login_manager()
if login_manager in ['lightdm', 'gdm3', 'sddm']:
return False
elif login_manager == 'lightdm_account_service':
userinfo_logger.error('Error: The login won\'t work with your system, because you are using an incompatible login theme.')
userinfo_logger.error(' Please go to "System Settings" -> "Login Screen (LightDM)" and set your login theme to "Classic".')
else:
userinfo_logger.error('Error: Can\'t enable login with the login manager of your system.')
userinfo_logger.error(' Please use LightDM, SDDM or GDM for full compatibility with UCS.')
userinfo_logger.error(' This error can be avoided by using the --skip-login-manager parameter.')
return True
def configuration_conflicts(self) -> bool:
login_manager = self.determin_used_login_manager()
if login_manager in ['lightdm', 'gdm3', 'sddm']:
return False
elif login_manager == 'lightdm_account_service':
userinfo_logger.error('Error: The login won\'t work with your system, because you are using an incompatible login theme.')
userinfo_logger.error(' Please go to "System Settings" -> "Login Screen (LightDM)" and set your login theme to "Classic".')
else:
userinfo_logger.error('Error: Can\'t enable login with the login manager of your system.')
userinfo_logger.error(' Please use LightDM, SDDM or GDM for full compatibility with UCS.')
userinfo_logger.error(' This error can be avoided by using the --skip-login-manager parameter.')
return True
def determin_used_login_manager(self) -> str:
with open('/etc/X11/default-display-manager', 'r') as login_manager_file:
login_manager = os.path.basename(login_manager_file.read()).strip()
def determin_used_login_manager(self) -> str:
with open('/etc/X11/default-display-manager', 'r') as login_manager_file:
login_manager = os.path.basename(login_manager_file.read()).strip()
# The lightdm config files will be ignored with a certain setup (e.g. in
# Kubuntu 14.04), so check for that.
if login_manager == 'lightdm' and self.kde_greeter_is_installed():
if not self.theme_with_accountsservice_is_ok():
return 'lightdm_account_service'
# The lightdm config files will be ignored with a certain setup (e.g. in
# Kubuntu 14.04), so check for that.
if login_manager == 'lightdm' and self.kde_greeter_is_installed():
if not self.theme_with_accountsservice_is_ok():
return 'lightdm_account_service'
return login_manager
return login_manager
def lightdm_config_file_exists(self) -> bool:
if os.path.isfile('/etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf'):
userinfo_logger.warn('Warning: /etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf already exists.')
return True
return False
def lightdm_config_file_exists(self) -> bool:
if os.path.isfile('/etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf'):
userinfo_logger.warn('Warning: /etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf already exists.')
return True
return False
def kde_greeter_is_installed(self) -> bool:
return 0 == subprocess.call(
['dpkg', '-s', 'lightdm-kde-greeter'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
def kde_greeter_is_installed(self) -> bool:
return 0 == subprocess.call(
['dpkg', '-s', 'lightdm-kde-greeter'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
def theme_with_accountsservice_is_ok(self) -> bool:
if os.path.isfile('/etc/lightdm/lightdm-kde-greeter.conf'):
with open('/etc/lightdm/lightdm-kde-greeter.conf', 'r') as greeter_config_file:
for line in greeter_config_file:
if 'theme-name' in line:
greeter = line.split('=', 1)[-1].strip()
if greeter == 'classic':
return True
return False
def theme_with_accountsservice_is_ok(self) -> bool:
if os.path.isfile('/etc/lightdm/lightdm-kde-greeter.conf'):
with open('/etc/lightdm/lightdm-kde-greeter.conf', 'r') as greeter_config_file:
for line in greeter_config_file:
if 'theme-name' in line:
greeter = line.split('=', 1)[-1].strip()
if greeter == 'classic':
return True
return False
class LoginManagerConfigurator(ConflictChecker):
@execute_as_root
def backup(self, backup_dir: str) -> None:
if self.lightdm_config_file_exists():
os.makedirs(os.path.join(backup_dir, 'etc/lightdm/lightdm.conf.d'), exist_ok=True)
copyfile(
'/etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf',
os.path.join(backup_dir, 'etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf')
)
@execute_as_root
def backup(self, backup_dir: str) -> None:
if self.lightdm_config_file_exists():
os.makedirs(os.path.join(backup_dir, 'etc/lightdm/lightdm.conf.d'), exist_ok=True)
copyfile(
'/etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf',
os.path.join(backup_dir, 'etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf')
)
def enable_login_with_foreign_usernames(self) -> None:
login_manager = self.determin_used_login_manager()
if login_manager == 'lightdm':
self.enable_login_with_foreign_usernames_for_lightdm()
def enable_login_with_foreign_usernames(self) -> None:
login_manager = self.determin_used_login_manager()
if login_manager == 'lightdm':
self.enable_login_with_foreign_usernames_for_lightdm()
@execute_as_root
def enable_login_with_foreign_usernames_for_lightdm(self) -> None:
userinfo_logger.info('Writing /etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf ')
@execute_as_root
def enable_login_with_foreign_usernames_for_lightdm(self) -> None:
userinfo_logger.info('Writing /etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf ')
lightdm_config = \
'[SeatDefaults]\n' \
'greeter-show-manual-login=true\n' \
'greeter-hide-users=true\n'
lightdm_config = \
'[SeatDefaults]\n' \
'greeter-show-manual-login=true\n' \
'greeter-hide-users=true\n'
os.makedirs('/etc/lightdm/lightdm.conf.d', exist_ok=True)
with open('/etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf', 'w') as conf_file:
conf_file.write(lightdm_config)
os.makedirs('/etc/lightdm/lightdm.conf.d', exist_ok=True)
with open('/etc/lightdm/lightdm.conf.d/99-show-manual-userlogin.conf', 'w') as conf_file:
conf_file.write(lightdm_config)

View File

@ -13,108 +13,108 @@ userinfo_logger = logging.getLogger('userinfo')
class ConflictChecker(object):
def home_dir_conf_file_exists(self) -> bool:
if os.path.isfile('/usr/share/pam-configs/ucs_mkhomedir'):
userinfo_logger.warn('Warning: /usr/share/pam-configs/ucs_mkhomedir already exists.')
return True
return False
def home_dir_conf_file_exists(self) -> bool:
if os.path.isfile('/usr/share/pam-configs/ucs_mkhomedir'):
userinfo_logger.warn('Warning: /usr/share/pam-configs/ucs_mkhomedir already exists.')
return True
return False
def group_conf_file_exists(self) -> bool:
if os.path.isfile('/usr/share/pam-configs/local_groups'):
userinfo_logger.warn('Warning: /usr/share/pam-configs/local_groups already exists.')
return True
return False
def group_conf_file_exists(self) -> bool:
if os.path.isfile('/usr/share/pam-configs/local_groups'):
userinfo_logger.warn('Warning: /usr/share/pam-configs/local_groups already exists.')
return True
return False
class PamConfigurator(ConflictChecker):
@execute_as_root
def backup(self, backup_dir: str) -> None:
copy_home_dir_conf = self.home_dir_conf_file_exists()
copy_group_conf = self.group_conf_file_exists()
if copy_home_dir_conf or copy_group_conf:
os.makedirs(os.path.join(backup_dir, 'usr/share/pam-configs'), exist_ok=True)
if copy_home_dir_conf:
copyfile(
'/usr/share/pam-configs/ucs_mkhomedir',
os.path.join(backup_dir, 'usr/share/pam-configs/ucs_mkhomedir')
)
if copy_group_conf:
copyfile(
'/usr/share/pam-configs/local_groups',
os.path.join(backup_dir, 'usr/share/pam-configs/local_groups')
)
os.makedirs(os.path.join(backup_dir, 'etc/security'), exist_ok=True)
copyfile(
'/etc/security/group.conf',
os.path.join(backup_dir, 'etc/security/group.conf')
)
@execute_as_root
def backup(self, backup_dir: str) -> None:
copy_home_dir_conf = self.home_dir_conf_file_exists()
copy_group_conf = self.group_conf_file_exists()
if copy_home_dir_conf or copy_group_conf:
os.makedirs(os.path.join(backup_dir, 'usr/share/pam-configs'), exist_ok=True)
if copy_home_dir_conf:
copyfile(
'/usr/share/pam-configs/ucs_mkhomedir',
os.path.join(backup_dir, 'usr/share/pam-configs/ucs_mkhomedir')
)
if copy_group_conf:
copyfile(
'/usr/share/pam-configs/local_groups',
os.path.join(backup_dir, 'usr/share/pam-configs/local_groups')
)
os.makedirs(os.path.join(backup_dir, 'etc/security'), exist_ok=True)
copyfile(
'/etc/security/group.conf',
os.path.join(backup_dir, 'etc/security/group.conf')
)
def setup_pam(self) -> None:
self.configure_home_dir_creation()
self.add_users_to_requiered_system_groups()
self.update_pam()
def setup_pam(self) -> None:
self.configure_home_dir_creation()
self.add_users_to_requiered_system_groups()
self.update_pam()
@execute_as_root
def configure_home_dir_creation(self) -> None:
userinfo_logger.info('Writing /usr/share/pam-configs/ucs_mkhomedir ')
@execute_as_root
def configure_home_dir_creation(self) -> None:
userinfo_logger.info('Writing /usr/share/pam-configs/ucs_mkhomedir ')
home_dir_conf = \
'Name: activate mkhomedir\n' \
'Default: yes\n' \
'Priority: 900\n'\
'Session-Type: Additional\n' \
'Session:\n' \
' required pam_mkhomedir.so umask=0022 skel=/etc/skel\n'
with open('/usr/share/pam-configs/ucs_mkhomedir', 'w') as conf_file:
conf_file.write(home_dir_conf)
home_dir_conf = \
'Name: activate mkhomedir\n' \
'Default: yes\n' \
'Priority: 900\n'\
'Session-Type: Additional\n' \
'Session:\n' \
' required pam_mkhomedir.so umask=0022 skel=/etc/skel\n'
with open('/usr/share/pam-configs/ucs_mkhomedir', 'w') as conf_file:
conf_file.write(home_dir_conf)
def add_users_to_requiered_system_groups(self) -> None:
self.add_groups_to_group_conf()
self.write_pam_group_conf()
def add_users_to_requiered_system_groups(self) -> None:
self.add_groups_to_group_conf()
self.write_pam_group_conf()
@execute_as_root
def add_groups_to_group_conf(self) -> None:
if self.group_conf_already_ok():
return
@execute_as_root
def add_groups_to_group_conf(self) -> None:
if self.group_conf_already_ok():
return
userinfo_logger.info('Adding groups to /etc/security/group.conf ')
userinfo_logger.info('Adding groups to /etc/security/group.conf ')
# TODO: Would additional groups be appropriate here?
with open('/etc/security/group.conf', 'a') as groups_file:
groups_file.write(
'*;*;*;Al0000-2400;audio,cdrom,dialout,floppy,plugdev,adm\n'
)
# TODO: Would additional groups be appropriate here?
with open('/etc/security/group.conf', 'a') as groups_file:
groups_file.write(
'*;*;*;Al0000-2400;audio,cdrom,dialout,floppy,plugdev,adm\n'
)
def group_conf_already_ok(self) -> bool:
with open('/etc/security/group.conf', 'r') as groups_file:
for line in groups_file:
if '*;*;*;Al0000-2400;audio,cdrom,dialout,floppy,plugdev,adm\n' in line:
return True
return False
def group_conf_already_ok(self) -> bool:
with open('/etc/security/group.conf', 'r') as groups_file:
for line in groups_file:
if '*;*;*;Al0000-2400;audio,cdrom,dialout,floppy,plugdev,adm\n' in line:
return True
return False
@execute_as_root
def write_pam_group_conf(self) -> None:
userinfo_logger.info('Adding groups to /usr/share/pam-configs/local_groups ')
@execute_as_root
def write_pam_group_conf(self) -> None:
userinfo_logger.info('Adding groups to /usr/share/pam-configs/local_groups ')
group_conf = \
'Name: activate /etc/security/group.conf\n' \
'Default: yes\n' \
'Priority: 900\n' \
'Auth-Type: Primary\n' \
'Auth:\n' \
' required pam_group.so use_first_pass\n'
group_conf = \
'Name: activate /etc/security/group.conf\n' \
'Default: yes\n' \
'Priority: 900\n' \
'Auth-Type: Primary\n' \
'Auth:\n' \
'\t required pam_group.so use_first_pass\n'
with open('/usr/share/pam-configs/local_groups', 'w') as conf_file:
conf_file.write(group_conf)
with open('/usr/share/pam-configs/local_groups', 'w') as conf_file:
conf_file.write(group_conf)
@execute_as_root
def update_pam(self) -> None:
userinfo_logger.info('Updating PAM')
@execute_as_root
def update_pam(self) -> None:
userinfo_logger.info('Updating PAM')
env = os.environ.copy()
env['DEBIAN_FRONTEND'] = 'noninteractive'
subprocess.check_output(
['pam-auth-update', '--force'],
env=env, stderr=subprocess.STDOUT
)
env = os.environ.copy()
env['DEBIAN_FRONTEND'] = 'noninteractive'
subprocess.check_output(
['pam-auth-update', '--force'],
env=env, stderr=subprocess.STDOUT
)

View File

@ -12,36 +12,36 @@ userinfo_logger = logging.getLogger('userinfo')
class RootCertificateProvider(object):
def provide_ucs_root_certififcate(self, dc_ip: str) -> None:
if not self.ucs_root_certificate_available_locally():
self.download_ucs_root_certificate(dc_ip)
self.add_certificate_to_certificate_store()
def provide_ucs_root_certififcate(self, dc_ip: str) -> None:
if not self.ucs_root_certificate_available_locally():
self.download_ucs_root_certificate(dc_ip)
self.add_certificate_to_certificate_store()
def ucs_root_certificate_available_locally(self) -> bool:
return os.path.isfile('/etc/univention/ssl/ucsCA/CAcert.pem') and \
os.path.isfile('/usr/local/share/ca-certificates/UCSdomain.crt')
def ucs_root_certificate_available_locally(self) -> bool:
return os.path.isfile('/etc/univention/ssl/ucsCA/CAcert.pem') and \
os.path.isfile('/usr/local/share/ca-certificates/UCSdomain.crt')
@execute_as_root
def download_ucs_root_certificate(self, dc_ip: str) -> None:
userinfo_logger.info('Downloading the UCS root certificate to /etc/univention/ssl/ucsCA/CAcert.pem')
@execute_as_root
def download_ucs_root_certificate(self, dc_ip: str) -> None:
userinfo_logger.info('Downloading the UCS root certificate to /etc/univention/ssl/ucsCA/CAcert.pem')
os.makedirs('/etc/univention/ssl/ucsCA', exist_ok=True)
subprocess.check_output(
[
'wget',
'--no-check-certificate',
'-O', '/etc/univention/ssl/ucsCA/CAcert.pem',
'http://%s/ucs-root-ca.crt' % (dc_ip,)
],
stderr=subprocess.STDOUT
)
os.makedirs('/etc/univention/ssl/ucsCA', exist_ok=True)
subprocess.check_output(
[
'wget',
'--no-check-certificate',
'-O', '/etc/univention/ssl/ucsCA/CAcert.pem',
'http://%s/ucs-root-ca.crt' % (dc_ip,)
],
stderr=subprocess.STDOUT
)
@execute_as_root
def add_certificate_to_certificate_store(self) -> None:
userinfo_logger.info('Adding the UCS root certificate to the certificate store')
@execute_as_root
def add_certificate_to_certificate_store(self) -> None:
userinfo_logger.info('Adding the UCS root certificate to the certificate store')
os.symlink('/etc/univention/ssl/ucsCA/CAcert.pem', '/usr/local/share/ca-certificates/UCSdomain.crt')
subprocess.check_output(
['update-ca-certificates'],
stderr=subprocess.STDOUT
)
os.symlink('/etc/univention/ssl/ucsCA/CAcert.pem', '/usr/local/share/ca-certificates/UCSdomain.crt')
subprocess.check_output(
['update-ca-certificates'],
stderr=subprocess.STDOUT
)

View File

@ -16,94 +16,94 @@ userinfo_logger = logging.getLogger('userinfo')
class ConflictChecker(object):
def sssd_conf_file_exists(self) -> bool:
if os.path.isfile('/etc/sssd/sssd.conf'):
userinfo_logger.warn('Warning: /etc/sssd/sssd.conf already exists.')
return True
return False
def sssd_conf_file_exists(self) -> bool:
if os.path.isfile('/etc/sssd/sssd.conf'):
userinfo_logger.warn('Warning: /etc/sssd/sssd.conf already exists.')
return True
return False
class SssdConfigurator(ConflictChecker):
@execute_as_root
def backup(self, backup_dir: str) -> None:
if self.sssd_conf_file_exists():
os.makedirs(os.path.join(backup_dir, 'etc/sssd'), exist_ok=True)
copyfile(
'/etc/sssd/sssd.conf',
os.path.join(backup_dir, 'etc/sssd/sssd.conf')
)
@execute_as_root
def backup(self, backup_dir: str) -> None:
if self.sssd_conf_file_exists():
os.makedirs(os.path.join(backup_dir, 'etc/sssd'), exist_ok=True)
copyfile(
'/etc/sssd/sssd.conf',
os.path.join(backup_dir, 'etc/sssd/sssd.conf')
)
@execute_as_root
def setup_sssd(self, dc_ip: str, ldap_master: str, ldap_server_name: str, admin_username: str, admin_pw: str, ldap_base: str, kerberos_realm: str, admin_dn: str, is_samba_dc: bool) -> None:
self.ldap_password = open('/etc/machine.secret').read().strip()
RootCertificateProvider().provide_ucs_root_certififcate(dc_ip)
self.write_sssd_conf(dc_ip, ldap_master, ldap_server_name, admin_username, admin_pw, ldap_base, kerberos_realm, admin_dn, is_samba_dc)
self.configure_sssd()
self.restart_sssd()
@execute_as_root
def setup_sssd(self, dc_ip: str, ldap_master: str, ldap_server_name: str, admin_username: str, admin_pw: str, ldap_base: str, kerberos_realm: str, admin_dn: str, is_samba_dc: bool) -> None:
self.ldap_password = open('/etc/machine.secret').read().strip()
RootCertificateProvider().provide_ucs_root_certififcate(dc_ip)
self.write_sssd_conf(dc_ip, ldap_master, ldap_server_name, admin_username, admin_pw, ldap_base, kerberos_realm, admin_dn, is_samba_dc)
self.configure_sssd()
self.restart_sssd()
@execute_as_root
def write_sssd_conf(self, dc_ip: str, ldap_master: str, ldap_server_name: str, admin_username: str, admin_pw: str, ldap_base: str, kerberos_realm: str, admin_dn: str, is_samba_dc: bool) -> None:
userinfo_logger.info('Writing /etc/sssd/sssd.conf ')
if is_samba_dc:
kpasswd_server = ldap_server_name
else:
kpasswd_server = ldap_master
sssd_conf = \
'[sssd]\n' \
'config_file_version = 2\n' \
'reconnection_retries = 3\n' \
'sbus_timeout = 30\n' \
'services = nss, pam, sudo\n' \
'domains = %(kerberos_realm)s\n' \
'\n' \
'[nss]\n' \
'reconnection_retries = 3\n' \
'\n' \
'[pam]\n' \
'reconnection_retries = 3\n' \
'\n' \
'[domain/%(kerberos_realm)s]\n' \
'auth_provider = krb5\n' \
'krb5_realm = %(kerberos_realm)s\n' \
'krb5_server = %(ldap_server_name)s\n' \
'krb5_kpasswd = %(kpasswd_server)s\n' \
'id_provider = ldap\n' \
'ldap_uri = ldap://%(ldap_server_name)s:7389\n' \
'ldap_search_base = %(ldap_base)s\n' \
'ldap_tls_reqcert = never\n' \
'ldap_tls_cacert = /etc/univention/ssl/ucsCA/CAcert.pem\n' \
'cache_credentials = true\n' \
'enumerate = true\n' \
'ldap_default_bind_dn = %(machines_ldap_dn)s\n' \
'ldap_default_authtok_type = password\n' \
'ldap_default_authtok = %(ldap_password)s\n' \
% {
'kerberos_realm': kerberos_realm,
'kpasswd_server': kpasswd_server,
'ldap_base': ldap_base,
'ldap_server_name': ldap_server_name,
'ldap_password': self.ldap_password,
'machines_ldap_dn': get_machines_udm(dc_ip, admin_username, admin_pw, admin_dn)[1],
}
with open('/etc/sssd/sssd.conf', 'w') as conf_file:
conf_file.write(sssd_conf)
os.chmod('/etc/sssd/sssd.conf', stat.S_IREAD | stat.S_IWRITE)
@execute_as_root
def write_sssd_conf(self, dc_ip: str, ldap_master: str, ldap_server_name: str, admin_username: str, admin_pw: str, ldap_base: str, kerberos_realm: str, admin_dn: str, is_samba_dc: bool) -> None:
userinfo_logger.info('Writing /etc/sssd/sssd.conf ')
if is_samba_dc:
kpasswd_server = ldap_server_name
else:
kpasswd_server = ldap_master
sssd_conf = \
'[sssd]\n' \
'config_file_version = 2\n' \
'reconnection_retries = 3\n' \
'sbus_timeout = 30\n' \
'services = nss, pam, sudo\n' \
'domains = %(kerberos_realm)s\n' \
'\n' \
'[nss]\n' \
'reconnection_retries = 3\n' \
'\n' \
'[pam]\n' \
'reconnection_retries = 3\n' \
'\n' \
'[domain/%(kerberos_realm)s]\n' \
'auth_provider = krb5\n' \
'krb5_realm = %(kerberos_realm)s\n' \
'krb5_server = %(ldap_server_name)s\n' \
'krb5_kpasswd = %(kpasswd_server)s\n' \
'id_provider = ldap\n' \
'ldap_uri = ldap://%(ldap_server_name)s:7389\n' \
'ldap_search_base = %(ldap_base)s\n' \
'ldap_tls_reqcert = never\n' \
'ldap_tls_cacert = /etc/univention/ssl/ucsCA/CAcert.pem\n' \
'cache_credentials = true\n' \
'enumerate = true\n' \
'ldap_default_bind_dn = %(machines_ldap_dn)s\n' \
'ldap_default_authtok_type = password\n' \
'ldap_default_authtok = %(ldap_password)s\n' \
% {
'kerberos_realm': kerberos_realm,
'kpasswd_server': kpasswd_server,
'ldap_base': ldap_base,
'ldap_server_name': ldap_server_name,
'ldap_password': self.ldap_password,
'machines_ldap_dn': get_machines_udm(dc_ip, admin_username, admin_pw, admin_dn)[1],
}
with open('/etc/sssd/sssd.conf', 'w') as conf_file:
conf_file.write(sssd_conf)
os.chmod('/etc/sssd/sssd.conf', stat.S_IREAD | stat.S_IWRITE)
@execute_as_root
def configure_sssd(self) -> None:
userinfo_logger.info('Configuring auth config profile for sssd')
@execute_as_root
def configure_sssd(self) -> None:
userinfo_logger.info('Configuring auth config profile for sssd')
subprocess.check_output(
['pam-auth-update', '--enable', 'mkhomedir'],
stderr=subprocess.STDOUT
)
subprocess.check_output(
['pam-auth-update', '--enable', 'mkhomedir'],
stderr=subprocess.STDOUT
)
@execute_as_root
def restart_sssd(self) -> None:
userinfo_logger.info('Restarting sssd')
@execute_as_root
def restart_sssd(self) -> None:
userinfo_logger.info('Restarting sssd')
subprocess.check_output(
['service', 'sssd', 'restart'],
stderr=subprocess.STDOUT
)
subprocess.check_output(
['service', 'sssd', 'restart'],
stderr=subprocess.STDOUT
)

View File

@ -6,8 +6,8 @@ import subprocess
def get_distribution() -> str:
return subprocess.check_output(['lsb_release', '-is']).strip().decode()
return subprocess.check_output(['lsb_release', '-is']).strip().decode()
def get_release() -> str:
return subprocess.check_output(['lsb_release', '-rs']).strip().decode()
return subprocess.check_output(['lsb_release', '-rs']).strip().decode()

View File

@ -12,112 +12,112 @@ import netifaces
def get_master_ip_through_dns(domain: str) -> str:
resolver = dns.resolver.Resolver()
try:
response = resolver.query('_domaincontroller_master._tcp.%s.' % (domain,), 'SRV')
master_fqdn = response[0].target.canonicalize().split(1)[0].to_text()
except Exception:
return ""
return socket.gethostbyname(master_fqdn)
resolver = dns.resolver.Resolver()
try:
response = resolver.query('_domaincontroller_master._tcp.%s.' % (domain,), 'SRV')
master_fqdn = response[0].target.canonicalize().split(1)[0].to_text()
except Exception:
return ""
return socket.gethostbyname(master_fqdn)
def get_ucs_domainname() -> str:
domainname = get_ucs_domainname_via_local_configuration()
if not domainname:
domainname = get_ucs_domainname_via_reverse_dns()
if not domainname:
domainname = get_ucs_domainname_of_dns_server()
return domainname
domainname = get_ucs_domainname_via_local_configuration()
if not domainname:
domainname = get_ucs_domainname_via_reverse_dns()
if not domainname:
domainname = get_ucs_domainname_of_dns_server()
return domainname
def get_ucs_domainname_via_local_configuration() -> str:
try:
domainname = socket.getfqdn().split('.', 1)[1]
except Exception:
return ""
return domainname
try:
domainname = socket.getfqdn().split('.', 1)[1]
except Exception:
return ""
return domainname
def get_ucs_domainname_via_reverse_dns() -> str:
return ips2name(get_all_ip_addresses())
return ips2name(get_all_ip_addresses())
def get_ucs_domainname_of_dns_server() -> str:
return ips2name(get_nameservers())
return ips2name(get_nameservers())
def get_nameservers() -> Set[str]:
output = subprocess.check_output(['resolvectl'])
output = subprocess.check_output(['resolvectl'])
nameservers = set()
last_line_was_dns_servers_line = False
for line in output.decode().splitlines():
if last_line_was_dns_servers_line and is_only_ip(line):
nameservers.add(line.strip())
nameservers = set()
last_line_was_dns_servers_line = False
for line in output.decode().splitlines():
if last_line_was_dns_servers_line and is_only_ip(line):
nameservers.add(line.strip())
if 'DNS Servers:' in line:
last_line_was_dns_servers_line = True
nameservers.add(line.split('DNS Servers:')[1].strip())
else:
last_line_was_dns_servers_line = False
return nameservers
if 'DNS Servers:' in line:
last_line_was_dns_servers_line = True
nameservers.add(line.split('DNS Servers:')[1].strip())
else:
last_line_was_dns_servers_line = False
return nameservers
def is_only_ip(line: str) -> bool:
try:
IPy.IP(line.strip())
return True
except ValueError:
return False
try:
IPy.IP(line.strip())
return True
except ValueError:
return False
def get_all_ip_addresses() -> List[str]:
ip_addresses = []
for interface in netifaces.interfaces():
# Skip the loopback device.
if interface == 'lo':
continue
ip_addresses += get_ipv4_addresses(interface)
ip_addresses += get_ipv6_addresses(interface)
return ip_addresses
ip_addresses = []
for interface in netifaces.interfaces():
# Skip the loopback device.
if interface == 'lo':
continue
ip_addresses += get_ipv4_addresses(interface)
ip_addresses += get_ipv6_addresses(interface)
return ip_addresses
def get_ipv4_addresses(interface: str) -> List[str]:
short_addresses = []
if netifaces.AF_INET in netifaces.ifaddresses(interface):
ipv4_addresses = netifaces.ifaddresses(interface)[netifaces.AF_INET]
for ipv4_address in ipv4_addresses:
short_addresses.append(ipv4_address['addr'])
return short_addresses
short_addresses = []
if netifaces.AF_INET in netifaces.ifaddresses(interface):
ipv4_addresses = netifaces.ifaddresses(interface)[netifaces.AF_INET]
for ipv4_address in ipv4_addresses:
short_addresses.append(ipv4_address['addr'])
return short_addresses
def get_ipv6_addresses(interface: str) -> List[str]:
short_addresses = []
if netifaces.AF_INET6 in netifaces.ifaddresses(interface):
ipv6_addresses = netifaces.ifaddresses(interface)[netifaces.AF_INET6]
for ipv6_address in ipv6_addresses:
# Skip link-local addresses (see https://superuser.com/a/99753 ).
if '%' not in ipv6_address['addr']:
short_addresses.append(ipv6_address['addr'])
return short_addresses
short_addresses = []
if netifaces.AF_INET6 in netifaces.ifaddresses(interface):
ipv6_addresses = netifaces.ifaddresses(interface)[netifaces.AF_INET6]
for ipv6_address in ipv6_addresses:
# Skip link-local addresses (see https://superuser.com/a/99753 ).
if '%' not in ipv6_address['addr']:
short_addresses.append(ipv6_address['addr'])
return short_addresses
def ips2name(addrs: Iterable[str]) -> str:
for addr in addrs:
fqdn = socket.getfqdn(addr)
domainname = get_ucs_domainname_from_fqdn(fqdn)
if domainname:
return domainname
return ""
for addr in addrs:
fqdn = socket.getfqdn(addr)
domainname = get_ucs_domainname_from_fqdn(fqdn)
if domainname:
return domainname
return ""
def get_ucs_domainname_from_fqdn(fqdn: str) -> str:
try:
domainname = fqdn.split('.', 1)[1]
# Check if the _domaincontroller_master._tcp record exists, to ensure
# that this is an UCS domain.
resolver = dns.resolver.Resolver()
resolver.query('_domaincontroller_master._tcp.%s' % (domainname,), 'SRV')
except Exception:
return ""
return domainname
try:
domainname = fqdn.split('.', 1)[1]
# Check if the _domaincontroller_master._tcp record exists, to ensure
# that this is an UCS domain.
resolver = dns.resolver.Resolver()
resolver.query('_domaincontroller_master._tcp.%s' % (domainname,), 'SRV')
except Exception:
return ""
return domainname

View File

@ -13,38 +13,38 @@ F = TypeVar('F', bound=Callable[..., Any])
def execute_as_root(func: F) -> F:
@wraps(func)
def root_wrapper(*args: Any, **kwargs: Any) -> Any:
(ruid, euid, suid) = os.getresuid()
os.setresuid(0, 0, suid)
try:
return_value = func(*args, **kwargs)
finally:
os.setresuid(ruid, euid, suid)
return return_value
return cast(F, root_wrapper)
@wraps(func)
def root_wrapper(*args: Any, **kwargs: Any) -> Any:
(ruid, euid, suid) = os.getresuid()
os.setresuid(0, 0, suid)
try:
return_value = func(*args, **kwargs)
finally:
os.setresuid(ruid, euid, suid)
return return_value
return cast(F, root_wrapper)
def name_is_resolvable(name: str) -> bool:
try:
return bool(socket.getaddrinfo(name, 22, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP))
except Exception:
return False
try:
return bool(socket.getaddrinfo(name, 22, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP))
except Exception:
return False
def ssh(username: str, password: str, hostname: str, command: Union[str, List[str]], **kwargs: Any) -> subprocess.Popen:
cmd = [
"sshpass",
"-e", # Password is passed as env-var "SSHPASS"
"ssh",
"-F", "none",
# "-o", "BatchMode=yes", # conflicts with `sshpass`
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-l", username,
hostname,
command if isinstance(command, str) else " ".join(quote(arg) for arg in command),
]
env = dict(kwargs.pop("env", os.environ), SSHPASS=password)
proc = subprocess.Popen(cmd, env=env, **kwargs)
return proc
cmd = [
"sshpass",
"-e", # Password is passed as env-var "SSHPASS"
"ssh",
"-F", "none",
# "-o", "BatchMode=yes", # conflicts with `sshpass`
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-l", username,
hostname,
command if isinstance(command, str) else " ".join(quote(arg) for arg in command),
]
env = dict(kwargs.pop("env", os.environ), SSHPASS=password)
proc = subprocess.Popen(cmd, env=env, **kwargs)
return proc

View File

@ -16,50 +16,50 @@ log = getLogger('debugging')
def authenticate_admin(dc_ip: str, admin_username: str, admin_pw: str) -> None:
cmd = ['sh', '-e', '-u', '-c', 'cat >"$0"; chmod 600 "$0"; kinit --password-file="$0"', PW(admin_username)]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate(admin_pw.encode())
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
cmd = ['sh', '-e', '-u', '-c', 'cat >"$0"; chmod 600 "$0"; kinit --password-file="$0"', PW(admin_username)]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate(admin_pw.encode())
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
def cleanup_authentication(dc_ip: str, admin_username: str, admin_pw: str) -> None:
cmd = ['rm', '-f', PW(admin_username)]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate()
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
cmd = ['rm', '-f', PW(admin_username)]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate()
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
def is_samba_dc(admin_username: str, admin_pw: str, dc_ip: str, admin_dn: str) -> bool:
cmd = ['ldapsearch', '-QLLL', filter_format('(&(aRecord=%s)(univentionService=Samba 4))', [dc_ip]), '1.1']
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = ssh_process.communicate()
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
return bool(stdout.lstrip())
cmd = ['ldapsearch', '-QLLL', filter_format('(&(aRecord=%s)(univentionService=Samba 4))', [dc_ip]), '1.1']
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = ssh_process.communicate()
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
return bool(stdout.lstrip())
def get_machines_udm(dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> Tuple[str, str]:
for udm_type in ['computers/ubuntu', 'computers/linux', 'computers/ucc']:
try:
dn = get_machines_ldap_dn_given_the_udm_type(udm_type, dc_ip, admin_username, admin_pw, admin_dn)
return (udm_type, dn)
except LookupError:
pass
raise LookupError(dc_ip)
for udm_type in ['computers/ubuntu', 'computers/linux', 'computers/ucc']:
try:
dn = get_machines_ldap_dn_given_the_udm_type(udm_type, dc_ip, admin_username, admin_pw, admin_dn)
return (udm_type, dn)
except LookupError:
pass
raise LookupError(dc_ip)
def get_machines_ldap_dn_given_the_udm_type(udm_type: str, dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> str:
hostname = gethostname()
cmd = ['/usr/sbin/udm', udm_type, 'list', '--binddn', admin_dn, '--bindpwdfile', PW(admin_username), '--filter', 'name=%s' % (hostname,)]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert ssh_process.stdout
for line in ssh_process.stdout:
key, _, val = line.decode().partition(': ')
if key == "DN":
return val.strip()
_, stderr = ssh_process.communicate()
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
raise LookupError(hostname)
hostname = gethostname()
cmd = ['/usr/sbin/udm', udm_type, 'list', '--binddn', admin_dn, '--bindpwdfile', PW(admin_username), '--filter', 'name=%s' % (hostname,)]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert ssh_process.stdout
for line in ssh_process.stdout:
key, _, val = line.decode().partition(': ')
if key == "DN":
return val.strip()
_, stderr = ssh_process.communicate()
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
raise LookupError(hostname)