refactor: Extract ssh code handling

Pass password via environment variable instead of STDIN.

Move quoting there.

Really disable using `$HOME/.ssh/known_hosts` file.

Disable local SSH configuration.
This commit is contained in:
Philipp Hahn 2022-10-11 13:02:18 +02:00
parent 13792516de
commit ab02efbec8
5 changed files with 93 additions and 87 deletions

View File

@ -42,7 +42,7 @@ from typing import Dict
from univention_domain_join.distributions import AbstractJoiner
from univention_domain_join.utils.distributions import get_distribution
from univention_domain_join.utils.domain import get_master_ip_through_dns, get_ucs_domainname
from univention_domain_join.utils.general import execute_as_root
from univention_domain_join.utils.general import execute_as_root, ssh
def check_if_run_as_root() -> None:
@ -101,29 +101,28 @@ def get_admin_password(admin_username: str) -> str:
def check_if_ssh_works_with_given_account(dc_ip: str, admin_username: str, admin_pw: str) -> None:
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), 'echo foo'],
stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
ssh_process.communicate(admin_pw.encode())
cmd = "true"
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
ssh_process.communicate()
if ssh_process.returncode != 0:
getLogger("userinfo").critical('It\'s not possible to connect to the UCS DC via ssh, with the given credentials.')
exit(1)
def get_ucr_variables_from_dc(dc_ip: str, admin_username: str, admin_pw: str) -> Dict[str, str]:
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), '/usr/sbin/ucr shell | grep -v ^hostname='],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
cmd = "/usr/sbin/ucr shell"
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
assert ssh_process.stdout
ucr_variables = dict(
line.decode("utf-8", "replace").strip().split("=", 1)
for line in ssh_process.stdout
)
stdout, stderr = ssh_process.communicate(admin_pw.encode())
if ssh_process.returncode != 0:
if ssh_process.wait() != 0:
getLogger("userinfo").critical('Fetching the UCR variables from the UCS DC failed.')
exit(1)
ucr_variables = {}
for raw_ucr_variable in stdout.splitlines():
key, value = raw_ucr_variable.decode('utf-8', 'replace').strip().split('=', 1)
ucr_variables[key] = value
ucr_variables.pop("hostname", None)
return ucr_variables

View File

@ -44,7 +44,7 @@ from PyQt5.QtWidgets import QAction, QApplication, QBoxLayout, QCheckBox, QFrame
from univention_domain_join.distributions import AbstractJoiner
from univention_domain_join.utils.distributions import get_distribution
from univention_domain_join.utils.domain import get_master_ip_through_dns, get_ucs_domainname
from univention_domain_join.utils.general import execute_as_root
from univention_domain_join.utils.general import execute_as_root, ssh
LOG = '/var/log/univention/domain-join-gui.log'
@ -476,11 +476,9 @@ class JoinThread(QThread):
raise DistributionException()
def check_if_ssh_works_with_given_account(self) -> bool:
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (self.admin_username, self.dc_ip), 'echo foo'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = ssh_process.communicate(self.admin_pw.encode())
cmd = "true"
ssh_process = ssh(self.admin_username, self.admin_pw, self.dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate()
if ssh_process.returncode != 0:
if stderr.decode().strip().endswith(': No route to host'):
raise SshException('IP not reachable via SSH.')
@ -490,18 +488,19 @@ class JoinThread(QThread):
return True
def get_ucr_variables_from_dc(self) -> Optional[Dict[str, str]]:
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (self.admin_username, self.dc_ip), '/usr/sbin/ucr shell | grep -v ^hostname='],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
cmd = "/usr/sbin/ucr shell"
ssh_process = ssh(self.admin_username, self.admin_pw, self.dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
assert ssh_process.stdout
ucr_variables = dict(
line.decode("utf-8", "replace").strip().split("=", 1)
for line in ssh_process.stdout
)
stdout, stderr = ssh_process.communicate(self.admin_pw.encode())
if ssh_process.returncode != 0:
if ssh_process.wait() != 0:
getLogger("userinfo").critical('Fetching the UCR variables from the UCS DC failed.')
return None
ucr_variables = {}
for raw_ucr_variable in stdout.decode('utf-8', 'replace').splitlines():
key, value = raw_ucr_variable.strip().split('=', 1)
ucr_variables[key] = value
ucr_variables.pop("hostname", None)
return ucr_variables

View File

@ -31,14 +31,13 @@
import logging
import os
import pipes
import stat
import subprocess
from shutil import copyfile
from univention_domain_join.join_steps.root_certificate_provider import RootCertificateProvider
from univention_domain_join.utils.general import execute_as_root
from univention_domain_join.utils.ldap import get_machines_ldap_dn, get_machines_udm_type
from univention_domain_join.utils.general import execute_as_root, ssh
from univention_domain_join.utils.ldap import PW, get_machines_ldap_dn, get_machines_udm_type
userinfo_logger = logging.getLogger('userinfo')
@ -90,19 +89,15 @@ class LdapConfigurator(ConflictChecker):
'/usr/sbin/udm',
get_machines_udm_type(dc_ip, admin_username, admin_pw, admin_dn),
'modify',
'--binddn', '%s' % (admin_dn,),
'--bindpwdfile', '/dev/shm/%sdomain-join' % (admin_username,),
'--binddn', admin_dn,
'--bindpwdfile', PW(admin_username),
'--dn', get_machines_ldap_dn(dc_ip, admin_username, admin_pw, admin_dn),
'--set', 'password=%s' % (password,),
'--set', 'operatingSystem=%s' % (release_id,),
'--set', 'operatingSystemVersion=%s' % (release,)
]
escaped_udm_command = ' '.join([pipes.quote(x) for x in udm_command])
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), escaped_udm_command],
stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
ssh_process.communicate(admin_pw.encode())
ssh_process = ssh(admin_username, admin_pw, dc_ip, udm_command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
ssh_process.communicate()
if ssh_process.returncode != 0:
userinfo_logger.critical('Updating the old LDAP entry for this computer failed.')
raise LdapConfigutationException()
@ -115,20 +110,16 @@ class LdapConfigurator(ConflictChecker):
# TODO: Also add MAC address. Which NIC's address should be used?
udm_command = [
'/usr/sbin/udm', 'computers/ubuntu', 'create',
'--binddn', '%s' % (admin_dn,),
'--bindpwdfile', '/dev/shm/%sdomain-join' % (admin_username,),
'--binddn', admin_dn,
'--bindpwdfile', PW(admin_username),
'--position', 'cn=computers,%s' % (ldap_base,),
'--set', 'name=%s' % (hostname,),
'--set', 'password=%s' % (password,),
'--set', 'operatingSystem=%s' % (release_id,),
'--set', 'operatingSystemVersion=%s' % (release,)
]
escaped_udm_command = ' '.join([pipes.quote(x) for x in udm_command])
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), escaped_udm_command],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
stdout, _ = ssh_process.communicate(admin_pw.encode())
ssh_process = ssh(admin_username, admin_pw, dc_ip, udm_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, _ = ssh_process.communicate()
if ssh_process.returncode != 0 or stdout.decode().startswith('E: '):
userinfo_logger.critical('Adding an LDAP object for this computer didn\'t work.')
userinfo_logger.critical(stdout.decode())
@ -136,12 +127,9 @@ class LdapConfigurator(ConflictChecker):
def get_admin_dn(self, dc_ip: str, admin_username: str, admin_pw: str, ldap_base: str) -> str:
userinfo_logger.info('Getting the DN of the Administrator ')
ldap_command = 'ldapsearch -QLLL -Y GSSAPI uid=%s dn' % (pipes.quote(admin_username),)
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), ldap_command],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
stdout, _ = ssh_process.communicate(admin_pw.encode())
ldap_command = ['ldapsearch', '-QLLLY', 'GSSAPI', 'uid=%s' % (admin_username,), 'dn']
ssh_process = ssh(admin_username, admin_pw, dc_ip, ldap_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, _ = ssh_process.communicate()
if ssh_process.returncode != 0:
userinfo_logger.critical('get admin DN failed with: {}'.format(stdout.decode()))
raise LdapConfigutationException('get admin DN failed with: {}'.format(stdout.decode()))

View File

@ -31,8 +31,10 @@
import os
import socket
import subprocess
from functools import wraps
from typing import Any, Callable, TypeVar, cast
from pipes import quote
from typing import Any, Callable, List, TypeVar, Union, cast
F = TypeVar('F', bound=Callable[..., Any])
@ -55,3 +57,21 @@ def name_is_resolvable(name: str) -> bool:
return bool(socket.getaddrinfo(name, 22, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP))
except Exception:
return False
def ssh(username: str, password: str, hostname: str, command: Union[str, List[str]], **kwargs: Any) -> subprocess.Popen:
cmd = [
"sshpass",
"-e", # Password is passed as env-var "SSHPASS"
"ssh",
"-F", "none",
# "-o", "BatchMode=yes", # conflicts with `sshpass`
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-l", username,
hostname,
command if isinstance(command, str) else " ".join(quote(arg) for arg in command),
]
env = dict(kwargs.pop("env", os.environ), SSHPASS=password)
proc = subprocess.Popen(cmd, env=env, **kwargs)
return proc

View File

@ -29,39 +29,40 @@
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
import pipes
import subprocess
from logging import getLogger
from socket import gethostname
from ldap.filter import filter_format
from univention_domain_join.utils.general import ssh
PW = "/dev/shm/{}domain-join".format
log = getLogger('debugging')
def authenticate_admin(dc_ip: str, admin_username: str, admin_pw: str) -> None:
ldap_command = ' echo {1} > /dev/shm/{0}domain-join; chmod 600 /dev/shm/{0}domain-join; kinit --password-file=/dev/shm/{0}domain-join {0}'.format(pipes.quote(admin_username), pipes.quote(admin_pw))
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), ldap_command],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = ssh_process.communicate(admin_pw.encode())
cmd = ['sh', '-e', '-u', '-c', 'cat >"$0"; chmod 600 "$0"; kinit --password-file="$0"', PW(admin_username)]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate(admin_pw.encode())
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
def cleanup_authentication(dc_ip: str, admin_username: str, admin_pw: str) -> None:
ldap_command = 'rm -f /dev/shm/{0}domain-join; kdestroy'.format(pipes.quote(admin_username))
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), ldap_command],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = ssh_process.communicate(admin_pw.encode())
cmd = ['rm', '-f', PW(admin_username)]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
_, stderr = ssh_process.communicate()
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
def is_samba_dc(admin_username: str, admin_pw: str, dc_ip: str, admin_dn: str) -> bool:
ldap_command = ['ldapsearch', '-QLLL', filter_format('aRecord=%s', [dc_ip]), 'univentionService']
escaped_ldap_command = ' '.join([pipes.quote(x) for x in ldap_command])
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), escaped_ldap_command],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = ssh_process.communicate(admin_pw.encode())
cmd = ['ldapsearch', '-QLLL', filter_format('aRecord=%s', [dc_ip]), 'univentionService']
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = ssh_process.communicate()
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
for line in stdout.decode().splitlines():
if line.endswith('Samba 4'):
return True
@ -89,15 +90,14 @@ def get_machines_udm_type(dc_ip: str, admin_username: str, admin_pw: str, admin_
def get_machines_ldap_dn_given_the_udm_type(udm_type: str, dc_ip: str, admin_username: str, admin_pw: str, admin_dn: str) -> str:
hostname = gethostname()
udm_command = ['/usr/sbin/udm', udm_type, 'list', '--binddn', admin_dn, '--bindpwdfile', '/dev/shm/%sdomain-join' % (admin_username,), '--filter', 'name=%s' % (hostname,)]
escaped_udm_command = ' '.join([pipes.quote(x) for x in udm_command])
ssh_process = subprocess.Popen(
['sshpass', '-d0', 'ssh', '-o', 'StrictHostKeyChecking=no', '%s@%s' % (admin_username, dc_ip), escaped_udm_command],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = ssh_process.communicate(admin_pw.encode())
for line in stdout.splitlines():
if b"dn:" == line[0:3].lower():
machines_ldap_dn = line[3:].strip()
return machines_ldap_dn.decode()
cmd = ['/usr/sbin/udm', udm_type, 'list', '--binddn', admin_dn, '--bindpwdfile', PW(admin_username), '--filter', 'name=%s' % (hostname,)]
ssh_process = ssh(admin_username, admin_pw, dc_ip, cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert ssh_process.stdout
for line in ssh_process.stdout:
key, _, val = line.decode().partition(': ')
if key == "DN":
return val.strip()
_, stderr = ssh_process.communicate()
if ssh_process.returncode or stderr:
log.debug("%r returned %d: %s", cmd, ssh_process.returncode, stderr.decode())
raise LookupError(hostname)