from collections import defaultdict
from configparser import ConfigParser
from functools import wraps
-from tempfile import TemporaryDirectory
+from tempfile import TemporaryDirectory, NamedTemporaryFile
from threading import Event
import string
import datetime
import os
import random
-import tempfile
import multiprocessing.pool
import subprocess
from prettytable import PrettyTable
from . import remotes
from . import utils
+from . import ssh
from .migrations import Migrations
from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
RbdMirrorService, CrashService, CephadmService, CephfsMirrorService
self.run = True
self.event = Event()
+ self.ssh = ssh.SSHManager(self)
+
if self.get_store('pause'):
self.paused = True
else:
self.default_registry = ''
self.autotune_memory_target_ratio = 0.0
self.autotune_interval = 0
-
- self._cons: Dict[str, Tuple[remoto.backends.BaseConnection,
- remoto.backends.LegacyModuleExecute]] = {}
+ self.ssh_user: Optional[str] = None
+ self._ssh_options: Optional[str] = None
+ self.tkey = NamedTemporaryFile()
+ self.ssh_config_fname: Optional[str] = None
+ self.ssh_config: Optional[str] = None
+ self._temp_files: List = []
+ self.ssh_key: Optional[str] = None
+ self.ssh_pub: Optional[str] = None
self.notify('mon_map', None)
self.config_notify()
self._worker_pool = multiprocessing.pool.ThreadPool(10)
- self._reconfig_ssh()
+ self.ssh._reconfig_ssh()
CephadmOrchestrator.instance = self
continue
return name
- def _reconfig_ssh(self) -> None:
- temp_files = [] # type: list
- ssh_options = [] # type: List[str]
-
- # ssh_config
- ssh_config_fname = self.ssh_config_file
- ssh_config = self.get_store("ssh_config")
- if ssh_config is not None or ssh_config_fname is None:
- if not ssh_config:
- ssh_config = DEFAULT_SSH_CONFIG
- f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-')
- os.fchmod(f.fileno(), 0o600)
- f.write(ssh_config.encode('utf-8'))
- f.flush() # make visible to other processes
- temp_files += [f]
- ssh_config_fname = f.name
- if ssh_config_fname:
- self.validate_ssh_config_fname(ssh_config_fname)
- ssh_options += ['-F', ssh_config_fname]
- self.ssh_config = ssh_config
-
- # identity
- ssh_key = self.get_store("ssh_identity_key")
- ssh_pub = self.get_store("ssh_identity_pub")
- self.ssh_pub = ssh_pub
- self.ssh_key = ssh_key
- if ssh_key and ssh_pub:
- tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-')
- tkey.write(ssh_key.encode('utf-8'))
- os.fchmod(tkey.fileno(), 0o600)
- tkey.flush() # make visible to other processes
- tpub = open(tkey.name + '.pub', 'w')
- os.fchmod(tpub.fileno(), 0o600)
- tpub.write(ssh_pub)
- tpub.flush() # make visible to other processes
- temp_files += [tkey, tpub]
- ssh_options += ['-i', tkey.name]
-
- self._temp_files = temp_files
- if ssh_options:
- self._ssh_options = ' '.join(ssh_options) # type: Optional[str]
- else:
- self._ssh_options = None
-
- if self.mode == 'root':
- self.ssh_user = self.get_store('ssh_user', default='root')
- elif self.mode == 'cephadm-package':
- self.ssh_user = 'cephadm'
-
- self._reset_cons()
-
def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None:
if ssh_config is None or len(ssh_config.strip()) == 0:
raise OrchestratorValidationError('ssh_config cannot be empty')
raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format(
ssh_config_fname))
- def _reset_con(self, host: str) -> None:
- conn, r = self._cons.get(host, (None, None))
- if conn:
- self.log.debug('_reset_con close %s' % host)
- conn.exit()
- del self._cons[host]
-
- def _reset_cons(self) -> None:
- for host, conn_and_r in self._cons.items():
- self.log.debug('_reset_cons close %s' % host)
- conn, r = conn_and_r
- conn.exit()
- self._cons = {}
-
def offline_hosts_remove(self, host: str) -> None:
if host in self.offline_hosts:
self.offline_hosts.remove(host)
def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None:
self.set_store(what, new)
- self._reconfig_ssh()
+ self.ssh._reconfig_ssh()
if self.cache.get_hosts():
# Can't check anything without hosts
host = self.cache.get_hosts()[0]
if r is not None:
# connection failed reset user
self.set_store(what, old)
- self._reconfig_ssh()
+ self.ssh._reconfig_ssh()
raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host))
self.log.info(f'Set ssh {what}')
self.set_store("ssh_config", None)
self.ssh_config_tmp = None
self.log.info('Cleared ssh_config')
- self._reconfig_ssh()
+ self.ssh._reconfig_ssh()
return 0, "", ""
@orchestrator._cli_read_command('cephadm get-ssh-config')
tmp_dir.cleanup()
self.set_store('ssh_identity_key', secret)
self.set_store('ssh_identity_pub', pub)
- self._reconfig_ssh()
+ self.ssh._reconfig_ssh()
return 0, '', ''
@orchestrator._cli_write_command(
"""Clear cluster SSH key"""
self.set_store('ssh_identity_key', None)
self.set_store('ssh_identity_pub', None)
- self._reconfig_ssh()
+ self.ssh._reconfig_ssh()
self.log.info('Cleared cluster SSH key')
return 0, '', ''
self.inventory.rm_host(host)
self.cache.rm_host(host)
- self._reset_con(host)
+ self.ssh._reset_con(host)
self.event.set() # refresh stray health check
self.log.info('Removed host %s' % host)
return "Removed host '{}'".format(host)
def update_host_addr(self, host: str, addr: str) -> str:
self._check_valid_addr(host, addr)
self.inventory.set_addr(host, addr)
- self._reset_con(host)
+ self.ssh._reset_con(host)
self.event.set() # refresh stray health check
self.log.info('Set host %s addr to %s' % (host, addr))
return "Updated host '{}' addr to '{}'".format(host, addr)
import logging
import os
+from tempfile import NamedTemporaryFile
from contextlib import contextmanager
from io import StringIO
from shlex import quote
asyncssh_logger = logging.getLogger('asyncssh')
asyncssh_logger.propagate = False
+DEFAULT_SSH_CONFIG = """
+Host *
+ User root
+ StrictHostKeyChecking no
+ UserKnownHostsFile /dev/null
+ ConnectTimeout=30
+"""
+
class SSHManager:
def __init__(self, mgr: "CephadmOrchestrator"):
logger.exception(msg)
raise OrchestratorError(msg)
+ def _reset_con(self, host: str) -> None:
+ conn = self.cons.get(host)
+ if conn:
+ logger.debug(f'_reset_con close {host}')
+ conn.close()
+ del self.cons[host]
+
+ def _reset_cons(self) -> None:
+ for host, conn in self.cons.items():
+ logger.debug(f'_reset_cons close {host}')
+ conn.close()
+ self.cons = {}
+
+ def _reconfig_ssh(self) -> None:
+ temp_files = [] # type: list
+ ssh_options = [] # type: List[str]
+
+ # ssh_config
+ self.mgr.ssh_config_fname = self.mgr.ssh_config_file
+ ssh_config = self.mgr.get_store("ssh_config")
+ if ssh_config is not None or self.mgr.ssh_config_fname is None:
+ if not ssh_config:
+ ssh_config = DEFAULT_SSH_CONFIG
+ f = NamedTemporaryFile(prefix='cephadm-conf-')
+ os.fchmod(f.fileno(), 0o600)
+ f.write(ssh_config.encode('utf-8'))
+ f.flush() # make visible to other processes
+ temp_files += [f]
+ self.mgr.ssh_config_fname = f.name
+ if self.mgr.ssh_config_fname:
+ self.mgr.validate_ssh_config_fname(self.mgr.ssh_config_fname)
+ ssh_options += ['-F', self.mgr.ssh_config_fname]
+ self.mgr.ssh_config = ssh_config
+
+ # identity
+ ssh_key = self.mgr.get_store("ssh_identity_key")
+ ssh_pub = self.mgr.get_store("ssh_identity_pub")
+ self.mgr.ssh_pub = ssh_pub
+ self.mgr.ssh_key = ssh_key
+ if ssh_key and ssh_pub:
+ self.mgr.tkey = NamedTemporaryFile(prefix='cephadm-identity-')
+ self.mgr.tkey.write(ssh_key.encode('utf-8'))
+ os.fchmod(self.mgr.tkey.fileno(), 0o600)
+ self.mgr.tkey.flush() # make visible to other processes
+ tpub = open(self.mgr.tkey.name + '.pub', 'w')
+ os.fchmod(tpub.fileno(), 0o600)
+ tpub.write(ssh_pub)
+ tpub.flush() # make visible to other processes
+ temp_files += [self.mgr.tkey, tpub]
+ ssh_options += ['-i', self.mgr.tkey.name]
+
+ self.mgr._temp_files = temp_files
+ if ssh_options:
+ self.mgr._ssh_options = ' '.join(ssh_options)
+ else:
+ self.mgr._ssh_options = None
+
+ if self.mgr.mode == 'root':
+ self.mgr.ssh_user = self.mgr.get_store('ssh_user', default='root')
+ elif self.mgr.mode == 'cephadm-package':
+ self.mgr.ssh_user = 'cephadm'
+
+ self._reset_cons()