From cdeb604b10bbac24aa13fc6ada2bccdab1049e81 Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Tue, 19 Feb 2019 11:28:50 -0800 Subject: [PATCH] ssh/orch: add ssh orchestrator Signed-off-by: Noah Watkins --- doc/mgr/index.rst | 1 + doc/mgr/orchestrator_cli.rst | 8 +- doc/mgr/ssh.rst | 45 ++ src/pybind/mgr/ssh/.gitignore | 1 + src/pybind/mgr/ssh/README.md | 93 ++++ src/pybind/mgr/ssh/Vagrantfile | 39 ++ src/pybind/mgr/ssh/__init__.py | 1 + src/pybind/mgr/ssh/ceph.repo | 23 + src/pybind/mgr/ssh/module.py | 780 +++++++++++++++++++++++++++++++++ src/pybind/mgr/ssh/remotes.py | 81 ++++ 10 files changed, 1068 insertions(+), 4 deletions(-) create mode 100644 doc/mgr/ssh.rst create mode 100644 src/pybind/mgr/ssh/.gitignore create mode 100644 src/pybind/mgr/ssh/README.md create mode 100644 src/pybind/mgr/ssh/Vagrantfile create mode 100644 src/pybind/mgr/ssh/__init__.py create mode 100644 src/pybind/mgr/ssh/ceph.repo create mode 100644 src/pybind/mgr/ssh/module.py create mode 100644 src/pybind/mgr/ssh/remotes.py diff --git a/doc/mgr/index.rst b/doc/mgr/index.rst index 337127c05fc5d..3e913eff15253 100644 --- a/doc/mgr/index.rst +++ b/doc/mgr/index.rst @@ -45,3 +45,4 @@ sensible. DeepSea plugin Insights plugin Ansible plugin + SSH orchestrator diff --git a/doc/mgr/orchestrator_cli.rst b/doc/mgr/orchestrator_cli.rst index 8cc19c59923e2..f1c0090cb3c3e 100644 --- a/doc/mgr/orchestrator_cli.rst +++ b/doc/mgr/orchestrator_cli.rst @@ -247,13 +247,13 @@ This is an overview of the current implementation status of the orchestrators. host add ⚪ ⚪ ⚪ ✔️ host ls ⚪ ⚪ ⚪ ✔️ host rm ⚪ ⚪ ⚪ ✔️ - mgr update ⚪ ⚪ ⚪ ⚪ - mon update ⚪ ⚪ ⚪ ⚪ - osd create ✔️ ✔️ ⚪ ⚪ + mgr update ⚪ ⚪ ⚪ ✔️ + mon update ⚪ ⚪ ⚪ ✔️ + osd create ✔️ ✔️ ⚪ ✔️ osd device {ident,fault}-{on,off} ⚪ ⚪ ⚪ ⚪ osd rm ✔️ ⚪ ⚪ ⚪ device {ident,fault}-(on,off} ⚪ ⚪ ⚪ ⚪ - device ls ✔️ ✔️ ✔️ ⚪ + device ls ✔️ ✔️ ✔️ ✔️ service ls ⚪ ✔️ ✔️ ⚪ service status ⚪ ✔️ ✔️ ⚪ service-instance status ⚪ ⚪ ⚪ ⚪ diff --git a/doc/mgr/ssh.rst b/doc/mgr/ssh.rst new file mode 100644 index 0000000000000..1d1e96631bdf2 --- /dev/null +++ b/doc/mgr/ssh.rst @@ -0,0 +1,45 @@ +================ +SSH orchestrator +================ + +The SSH orchestrator is an orchestrator module that does not rely on a separate +system such as Rook or Ansible, but rather manages nodes in a cluster by +establishing an SSH connection and issuing explicit management commands. + +Orchestrator modules only provide services to other modules, which in turn +provide user interfaces. To try out the SSH module, you might like +to use the :ref:`Orchestrator CLI ` module. + +Requirements +------------ + +- The Python `remoto` library version 0.35 or newer + +Configuration +------------- + +The SSH orchestrator can be configured to use an SSH configuration file. This is +useful for specifying private keys and other SSH connection options. + +:: + + # ceph config set mgr mgr/ssh/ssh_config_file /path/to/config + +An SSH configuration file can be provided without requiring an accessible file +system path as the method above does. + +:: + + # ceph ssh set-ssh-config -i /path/to/config + +To clear this value use the command: + +:: + + # ceph ssh clear-ssh-config + +Development +----------- + +Instructions for setting up a development environment can be found in the Ceph +source tree at `src/pybind/mgr/ssh/README.md`. diff --git a/src/pybind/mgr/ssh/.gitignore b/src/pybind/mgr/ssh/.gitignore new file mode 100644 index 0000000000000..8000dd9db47c0 --- /dev/null +++ b/src/pybind/mgr/ssh/.gitignore @@ -0,0 +1 @@ +.vagrant diff --git a/src/pybind/mgr/ssh/README.md b/src/pybind/mgr/ssh/README.md new file mode 100644 index 0000000000000..10f268cd278b0 --- /dev/null +++ b/src/pybind/mgr/ssh/README.md @@ -0,0 +1,93 @@ +# dev environment setup + +1. start vms with _only_ the ceph packages installed + +In `src/pybind/mgr/ssh` run `vagrant up` to create a cluster with a monitor, +manager, and osd nodes. The osd node will have two small extra disks attached. + +2. generate an `ssh_config` file for the vm hosts + +Execute `vagrant ssh-config > /path/to/ssh_config` to generate a ssh +configuration file that contains hosts, usernames, and keys that will be used by +the bootstrap cluster / ssh orchestrator to establish ssh connections to the +vagrant vms. + +3. install ssh orchestrator dependencies + +The primary dependency is the `remoto` package that contains a Python SSH client +for connecting to remote nodes and executing commands. + +Install with `dnf install python3-remoto`. The version must be >= 0.0.35. At the +time of writing this version is being packaged and is not available. To install +from source: + +``` +git clone https://github.com/ceph/remoto +cd remoto +python3 setup.py sdist +pip3 install --prefix=/usr dist/remoto-0.0.35.tar.gz +``` + +4. start the bootstrap cluster (in this case a `vstart.sh` cluster) + +Start with a network binding to which the vms can route traffic: + + `vstart.sh -n -i 192.168.121.1` + +The following is a manual method for finding this address. TODO: documenting a +automated/deterministic method would be very helpful. + +First, ensure that your firewall settings permit each VM to communicate with the +host. On Fedora, the `trusted` profile is sufficient: `firewall-cmd +--set-default-zone trusted` and also allows traffic on Ceph ports. Then ssh into +one of the vm nodes and ping the default gateway, which happens to be setup as +the host machine. + +``` +[nwatkins@smash ssh]$ vagrant ssh mon0 -c "getent hosts gateway" +192.168.121.1 gateway +``` + +5. setup the ssh orchestrator backend + +Enable and configure the ssh orchestrator as the active backend: + +``` +ceph mgr module enable ssh +ceph orchestrator set backend ssh + +# optional: this document assumes the orchestrator CLI is enabled +ceph mgr module enable orchestrator_cli +``` + +Configure the ssh orchestrator by setting the `ssh_config` option to point at +the ssh configuration file generated above: + +``` +ceph config set mgr mgr/ssh/ssh_config_file /path/to/config +``` + +The setting can be confirmed by retrieving the configuration settings: + +``` +[nwatkins@smash build]$ ceph config get mgr. +WHO MASK LEVEL OPTION VALUE RO +mgr advanced mgr/orchestrator_cli/orchestrator ssh * +mgr advanced mgr/ssh/ssh_config_file /home/nwatkins/src/ceph/src/pybind/mgr/ssh/config * +``` + +An SSH config file can also be provided through standard input that avoids the +need to have an accessible file path. Use the following command: + + +``` +ceph ssh set-ssh-config -i +``` + +The next set of instructions we should move to the docs folder + +ceph orchestrator host add osd0 +ceph orchestrator host add mgr0 +ceph orchestrator host add mon0 +ceph orchestrator device ls +ceph orchestrator mgr update 3 mgr0 mgr1 diff --git a/src/pybind/mgr/ssh/Vagrantfile b/src/pybind/mgr/ssh/Vagrantfile new file mode 100644 index 0000000000000..0a2a63891ca13 --- /dev/null +++ b/src/pybind/mgr/ssh/Vagrantfile @@ -0,0 +1,39 @@ +# vi: set ft=ruby : + +NUM_DAEMONS = ENV["NUM_DAEMONS"] ? ENV["NUM_DAEMONS"].to_i : 1 + +Vagrant.configure("2") do |config| + config.vm.synced_folder ".", "/vagrant", disabled: true + config.vm.network "private_network", type: "dhcp" + config.vm.box = "centos/7" + + (0..NUM_DAEMONS - 1).each do |i| + config.vm.define "mon#{i}" do |mon| + mon.vm.hostname = "mon#{i}" + end + config.vm.define "mgr#{i}" do |mgr| + mgr.vm.hostname = "mgr#{i}" + end + config.vm.define "osd#{i}" do |osd| + osd.vm.hostname = "osd#{i}" + osd.vm.provider :libvirt do |libvirt| + libvirt.storage :file, :size => '5G' + libvirt.storage :file, :size => '5G' + end + end + end + + config.vm.provision "shell" do |s| + ssh_pub_key = File.readlines("#{Dir.home}/.ssh/id_rsa.pub").first.strip + s.inline = "echo #{ssh_pub_key} >> /home/vagrant/.ssh/authorized_keys" + end + + config.vm.provision "shell", inline: <<-SHELL + sudo yum install -y yum-utils + sudo yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm + sudo rpm --import 'https://download.ceph.com/keys/release.asc' + curl -L https://shaman.ceph.com/api/repos/ceph/master/latest/centos/7/repo/ | sudo tee /etc/yum.repos.d/shaman.repo + sudo yum install -y ceph python36 + sudo ln -s /usr/bin/python36 /usr/bin/python3 + SHELL +end diff --git a/src/pybind/mgr/ssh/__init__.py b/src/pybind/mgr/ssh/__init__.py new file mode 100644 index 0000000000000..3f41e016e73f4 --- /dev/null +++ b/src/pybind/mgr/ssh/__init__.py @@ -0,0 +1 @@ +from .module import SSHOrchestrator diff --git a/src/pybind/mgr/ssh/ceph.repo b/src/pybind/mgr/ssh/ceph.repo new file mode 100644 index 0000000000000..6f710e7ce2b6f --- /dev/null +++ b/src/pybind/mgr/ssh/ceph.repo @@ -0,0 +1,23 @@ +[ceph] +name=Ceph packages for $basearch +baseurl=https://download.ceph.com/rpm-mimic/el7/$basearch +enabled=1 +priority=2 +gpgcheck=1 +gpgkey=https://download.ceph.com/keys/release.asc + +[ceph-noarch] +name=Ceph noarch packages +baseurl=https://download.ceph.com/rpm-mimic/el7/noarch +enabled=1 +priority=2 +gpgcheck=1 +gpgkey=https://download.ceph.com/keys/release.asc + +[ceph-source] +name=Ceph source packages +baseurl=https://download.ceph.com/rpm-mimic/el7/SRPMS +enabled=0 +priority=2 +gpgcheck=1 +gpgkey=https://download.ceph.com/keys/release.asc diff --git a/src/pybind/mgr/ssh/module.py b/src/pybind/mgr/ssh/module.py new file mode 100644 index 0000000000000..b89e14640542a --- /dev/null +++ b/src/pybind/mgr/ssh/module.py @@ -0,0 +1,780 @@ +import json +import errno +import six +import os +import datetime +import tempfile +import multiprocessing.pool + +from mgr_module import MgrModule +import orchestrator + +from . import remotes + +try: + import remoto + import remoto.process +except ImportError as e: + remoto = None + remoto_import_error = str(e) + +DATEFMT = '%Y-%m-%d %H:%M:%S.%f' + +# high-level TODO: +# - bring over some of the protections from ceph-deploy that guard against +# multiple bootstrapping / initialization + +class SSHReadCompletion(orchestrator.ReadCompletion): + def __init__(self, result): + if isinstance(result, multiprocessing.pool.AsyncResult): + self._result = [result] + else: + self._result = result + assert isinstance(self._result, list) + + @property + def result(self): + return list(map(lambda r: r.get(), self._result)) + + @property + def is_complete(self): + return all(map(lambda r: r.ready(), self._result)) + +class SSHReadCompletionReady(SSHReadCompletion): + def __init__(self, result): + self._result = result + + @property + def result(self): + return self._result + + @property + def is_complete(self): + return True + +class SSHWriteCompletion(orchestrator.WriteCompletion): + def __init__(self, result): + if isinstance(result, multiprocessing.pool.AsyncResult): + self._result = [result] + else: + self._result = result + assert isinstance(self._result, list) + + @property + def result(self): + return list(map(lambda r: r.get(), self._result)) + + @property + def is_persistent(self): + return all(map(lambda r: r.ready(), self._result)) + + @property + def is_effective(self): + return all(map(lambda r: r.ready(), self._result)) + + @property + def is_errored(self): + for r in self._result: + if not r.ready(): + return False + if not r.successful(): + return True + return False + +class SSHWriteCompletionReady(SSHWriteCompletion): + def __init__(self, result): + self._result = result + + @property + def result(self): + return self._result + + @property + def is_persistent(self): + return True + + @property + def is_effective(self): + return True + + @property + def is_errored(self): + return False + +class SSHConnection(object): + """ + Tie tempfile lifetime (e.g. ssh_config) to a remoto connection. + """ + def __init__(self): + self.conn = None + self.temp_file = None + + # proxy to the remoto connection + def __getattr__(self, name): + return getattr(self.conn, name) + +class SSHOrchestrator(MgrModule, orchestrator.Orchestrator): + + _STORE_HOST_PREFIX = "host" + _DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN = 10 + + MODULE_OPTIONS = [ + {'name': 'ssh_config_file'}, + {'name': 'inventory_cache_timeout_min'}, + ] + + COMMANDS = [ + { + 'cmd': 'ssh set-ssh-config', + 'desc': 'Set the ssh_config file (use -i )', + 'perm': 'rw' + }, + { + 'cmd': 'ssh clear-ssh-config', + 'desc': 'Clear the ssh_config file', + 'perm': 'rw' + }, + ] + + def __init__(self, *args, **kwargs): + super(SSHOrchestrator, self).__init__(*args, **kwargs) + self._cluster_fsid = None + self._worker_pool = multiprocessing.pool.ThreadPool(1) + + def handle_command(self, inbuf, command): + if command["prefix"] == "ssh set-ssh-config": + return self._set_ssh_config(inbuf, command) + elif command["prefix"] == "ssh clear-ssh-config": + return self._clear_ssh_config(inbuf, command) + else: + raise NotImplementedError(cmd["prefix"]) + + @staticmethod + def can_run(): + if remoto is not None: + return True, "" + else: + return False, "loading remoto library:{}".format( + remoto_import_error) + + def available(self): + """ + The SSH orchestrator is always available. + """ + return self.can_run() + + def wait(self, completions): + self.log.info("wait: completions={}".format(completions)) + + complete = True + for c in completions: + if not isinstance(c, SSHReadCompletion) and \ + not isinstance(c, SSHWriteCompletion): + raise TypeError("unexpected completion: {}".format(c.__class__)) + + if c.is_complete: + continue + + complete = False + + return complete + + @staticmethod + def time_from_string(timestr): + # drop the 'Z' timezone indication, it's always UTC + timestr = timestr.rstrip('Z') + return datetime.datetime.strptime(timestr, DATEFMT) + + def _get_cluster_fsid(self): + """ + Fetch and cache the cluster fsid. + """ + if not self._cluster_fsid: + self._cluster_fsid = self.get("mon_map")["fsid"] + assert isinstance(self._cluster_fsid, six.string_types) + return self._cluster_fsid + + def _require_hosts(self, hosts): + """ + Raise an error if any of the given hosts are unregistered. + """ + if isinstance(hosts, six.string_types): + hosts = [hosts] + unregistered_hosts = [] + for host in hosts: + key = self._hostname_to_store_key(host) + if not self.get_store(key): + unregistered_hosts.append(host) + if unregistered_hosts: + raise RuntimeError("Host(s) {} not registered".format( + ", ".join(map(lambda h: "'{}'".format(h), + unregistered_hosts)))) + + def _set_ssh_config(self, inbuf, command): + """ + Set an ssh_config file provided from stdin + + TODO: + - validation + """ + if len(inbuf) == 0: + return errno.EINVAL, "", "empty ssh config provided" + self.set_store("ssh_config", inbuf) + return 0, "", "" + + def _clear_ssh_config(self, inbuf, command): + """ + Clear the ssh_config file provided from stdin + """ + self.set_store("ssh_config", None) + self.ssh_config_tmp = None + return 0, "", "" + + def _get_connection(self, host): + """ + Setup a connection for running commands on remote host. + """ + ssh_options = None + + conn = SSHConnection() + + ssh_config = self.get_store("ssh_config") + if ssh_config is not None: + conn.temp_file = tempfile.NamedTemporaryFile() + conn.temp_file.write(ssh_config.encode('utf-8')) + conn.temp_file.flush() # make visible to other processes + ssh_config_fname = conn.temp_file.name + else: + ssh_config_fname = self.get_localized_module_option("ssh_config_file") + + if ssh_config_fname: + if not os.path.isfile(ssh_config_fname): + raise Exception("ssh_config \"{}\" does not exist".format(ssh_config_fname)) + ssh_options = "-F {}".format(ssh_config_fname) + + self.log.info("opening connection to host '{}' with ssh " + "options '{}'".format(host, ssh_options)) + + conn.conn = remoto.Connection(host, + logger=self.log, + detect_sudo=True, + ssh_options=ssh_options) + + conn.conn.import_module(remotes) + + return conn + + def _executable_path(self, conn, executable): + """ + Remote validator that accepts a connection object to ensure that a certain + executable is available returning its full path if so. + + Otherwise an exception with thorough details will be raised, informing the + user that the executable was not found. + """ + executable_path = conn.remote_module.which(executable) + if not executable_path: + raise RuntimeError("Executable '{}' not found on host '{}'".format( + executable, conn.hostname)) + self.log.info("Found executable '{}' at path '{}'".format(executable, + executable_path)) + return executable_path + + def _build_ceph_conf(self): + """ + Build a minimal `ceph.conf` containing the current monitor hosts. + + Notes: + - ceph-volume complains if no section header (e.g. global) exists + - other ceph cli tools complained about no EOF newline + + TODO: + - messenger v2 syntax? + """ + mon_map = self.get("mon_map") + mon_addrs = map(lambda m: m["addr"], mon_map["mons"]) + mon_hosts = ", ".join(mon_addrs) + return "[global]\nmon host = {}\n".format(mon_hosts) + + def _ensure_ceph_conf(self, conn, network=False): + """ + Install ceph.conf on remote node if it doesn't exist. + """ + conf = self._build_ceph_conf() + if network: + conf += "public_network = {}\n".format(network) + conn.remote_module.write_conf("/etc/ceph/ceph.conf", conf) + + def _get_bootstrap_key(self, service_type): + """ + Fetch a bootstrap key for a service type. + + :param service_type: name (e.g. mds, osd, mon, ...) + """ + identity_dict = { + 'admin' : 'client.admin', + 'mds' : 'client.bootstrap-mds', + 'mgr' : 'client.bootstrap-mgr', + 'osd' : 'client.bootstrap-osd', + 'rgw' : 'client.bootstrap-rgw', + 'mon' : 'mon.' + } + + identity = identity_dict[service_type] + + ret, out, err = self.mon_command({ + "prefix": "auth get", + "entity": identity + }) + + if ret == -errno.ENOENT: + raise RuntimeError("Entity '{}' not found: '{}'".format(identity, err)) + elif ret != 0: + raise RuntimeError("Error retrieving key for '{}' ret {}: '{}'".format( + identity, ret, err)) + + return out + + def _bootstrap_mgr(self, conn): + """ + Bootstrap a manager. + + 1. install a copy of ceph.conf + 2. install the manager bootstrap key + + :param conn: remote host connection + """ + self._ensure_ceph_conf(conn) + keyring = self._get_bootstrap_key("mgr") + keyring_path = "/var/lib/ceph/bootstrap-mgr/ceph.keyring" + conn.remote_module.write_keyring(keyring_path, keyring) + return keyring_path + + def _bootstrap_osd(self, conn): + """ + Bootstrap an osd. + + 1. install a copy of ceph.conf + 2. install the osd bootstrap key + + :param conn: remote host connection + """ + self._ensure_ceph_conf(conn) + keyring = self._get_bootstrap_key("osd") + keyring_path = "/var/lib/ceph/bootstrap-osd/ceph.keyring" + conn.remote_module.write_keyring(keyring_path, keyring) + return keyring_path + + def _hostname_to_store_key(self, host): + return "{}.{}".format(self._STORE_HOST_PREFIX, host) + + def _get_hosts(self, wanted=None): + if wanted: + hosts_info = [] + for host in wanted: + key = self._hostname_to_store_key(host) + info = self.get_store(key) + if info: + hosts_info.append((key, info)) + else: + hosts_info = six.iteritems(self.get_store_prefix(self._STORE_HOST_PREFIX)) + + return list(map(lambda kv: (kv[0], json.loads(kv[1])), hosts_info)) + + def add_host(self, host): + """ + Add a host to be managed by the orchestrator. + + :param host: host name + :param labels: host labels + """ + def run(host): + key = self._hostname_to_store_key(host) + self.set_store(key, json.dumps({ + "host": host, + "inventory": None, + "last_inventory_refresh": None + })) + return "Added host '{}'".format(host) + + return SSHWriteCompletion( + self._worker_pool.apply_async(run, (host,))) + + def remove_host(self, host): + """ + Remove a host from orchestrator management. + + :param host: host name + """ + def run(host): + key = self._hostname_to_store_key(host) + self.set_store(key, None) + return "Removed host '{}'".format(host) + + return SSHWriteCompletion( + self._worker_pool.apply_async(run, (host,))) + + def get_hosts(self): + """ + Return a list of hosts managed by the orchestrator. + + Notes: + - skip async: manager reads from cache. + + TODO: + - InventoryNode probably needs to be able to report labels + """ + nodes = [] + for key, host_info in self._get_hosts(): + node = orchestrator.InventoryNode(host_info["host"], []) + nodes.append(node) + return SSHReadCompletionReady(nodes) + + def _get_device_inventory(self, host): + """ + Query storage devices on a remote node. + + :return: list of InventoryDevice + """ + conn = self._get_connection(host) + + try: + ceph_volume_executable = self._executable_path(conn, 'ceph-volume') + command = [ + ceph_volume_executable, + "inventory", + "--format=json" + ] + + out, err, code = remoto.process.check(conn, command) + host_devices = json.loads(out[0]) + return host_devices + + except Exception as ex: + self.log.exception(ex) + raise + + finally: + conn.exit() + + def get_inventory(self, node_filter=None, refresh=False): + """ + Return the storage inventory of nodes matching the given filter. + + :param node_filter: node filter + + TODO: + - add filtering by label + """ + if node_filter: + hosts = node_filter.nodes + self._require_hosts(hosts) + hosts = self._get_hosts(hosts) + else: + # this implies the returned hosts are registered + hosts = self._get_hosts() + + def run(key, host_info): + updated = False + host = host_info["host"] + + if not host_info["inventory"]: + self.log.info("caching inventory for '{}'".format(host)) + host_info["inventory"] = self._get_device_inventory(host) + updated = True + else: + timeout_min = int(self.get_module_option( + "inventory_cache_timeout_min", + self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN)) + + cutoff = datetime.datetime.utcnow() - datetime.timedelta( + minutes=timeout_min) + + last_update = self.time_from_string(host_info["last_inventory_refresh"]) + + if last_update < cutoff or refresh: + self.log.info("refresh stale inventory for '{}'".format(host)) + host_info["inventory"] = self._get_device_inventory(host) + updated = True + else: + self.log.info("reading cached inventory for '{}'".format(host)) + pass + + if updated: + now = datetime.datetime.utcnow() + now = now.strftime(DATEFMT) + host_info["last_inventory_refresh"] = now + self.set_store(key, json.dumps(host_info)) + + devices = list(map(lambda di: + orchestrator.InventoryDevice.from_ceph_volume_inventory(di), + host_info["inventory"])) + + return orchestrator.InventoryNode(host, devices) + + results = [] + for key, host_info in hosts: + result = self._worker_pool.apply_async(run, (key, host_info)) + results.append(result) + + return SSHReadCompletion(results) + + def _create_osd(self, host, drive_group): + conn = self._get_connection(host) + try: + devices = drive_group.data_devices.paths + self._bootstrap_osd(conn) + + for device in devices: + ceph_volume_executable = self._executable_path(conn, "ceph-volume") + command = [ + ceph_volume_executable, + "lvm", + "create", + "--cluster-fsid", self._get_cluster_fsid(), + "--{}".format(drive_group.objectstore), + "--data", device + ] + remoto.process.run(conn, command) + + return "Created osd on host '{}'".format(host) + + except: + raise + + finally: + conn.exit() + + def create_osds(self, drive_group, all_hosts=None): + """ + Create a new osd. + + The orchestrator CLI currently handles a narrow form of drive + specification defined by a single block device using bluestore. + + :param spec: osd specification + + TODO: + - support full drive_group specification + - support batch creation + """ + assert len(drive_group.hosts(all_hosts)) == 1 + assert len(drive_group.data_devices.paths) > 0 + assert all(map(lambda p: isinstance(p, six.string_types), + drive_group.data_devices.paths)) + + host = drive_group.hosts(all_hosts)[0] + self._require_hosts(host) + + result = self._worker_pool.apply_async(self._create_osd, (host, + drive_group)) + + return SSHWriteCompletion(result) + + def _create_mon(self, host, network): + """ + Create a new monitor on the given host. + """ + self.log.info("create_mon({}:{}): starting".format(host, network)) + + conn = self._get_connection(host) + + try: + self._ensure_ceph_conf(conn, network) + + uid = conn.remote_module.path_getuid("/var/lib/ceph") + gid = conn.remote_module.path_getgid("/var/lib/ceph") + + # install client admin key on target mon host + admin_keyring = self._get_bootstrap_key("admin") + admin_keyring_path = '/etc/ceph/ceph.client.admin.keyring' + conn.remote_module.write_keyring(admin_keyring_path, admin_keyring, uid, gid) + + mon_path = "/var/lib/ceph/mon/ceph-{name}".format(name=host) + conn.remote_module.create_mon_path(mon_path, uid, gid) + + # bootstrap key + conn.remote_module.safe_makedirs("/var/lib/ceph/tmp") + monitor_keyring = self._get_bootstrap_key("mon") + mon_keyring_path = "/var/lib/ceph/tmp/ceph-{name}.mon.keyring".format(name=host) + conn.remote_module.write_file( + mon_keyring_path, + monitor_keyring, + 0o600, + None, + uid, + gid + ) + + # monitor map + monmap_path = "/var/lib/ceph/tmp/ceph.{name}.monmap".format(name=host) + remoto.process.run(conn, + ['ceph', 'mon', 'getmap', '-o', monmap_path], + ) + + user_args = [] + if uid != 0: + user_args = user_args + [ '--setuser', str(uid) ] + if gid != 0: + user_args = user_args + [ '--setgroup', str(gid) ] + + remoto.process.run(conn, + ['ceph-mon', '--mkfs', '-i', host, + '--monmap', monmap_path, '--keyring', mon_keyring_path + ] + user_args + ) + + remoto.process.run(conn, + ['systemctl', 'enable', 'ceph.target'], + timeout=7, + ) + + remoto.process.run(conn, + ['systemctl', 'enable', 'ceph-mon@{name}'.format(name=host)], + timeout=7, + ) + + remoto.process.run(conn, + ['systemctl', 'start', 'ceph-mon@{name}'.format(name=host)], + timeout=7, + ) + + return "Created mon on host '{}'".format(host) + + except Exception as e: + self.log.error("create_mon({}:{}): error: {}".format(host, network, e)) + raise + + finally: + self.log.info("create_mon({}:{}): finished".format(host, network)) + conn.exit() + + def update_mons(self, num, hosts): + """ + Adjust the number of cluster monitors. + """ + # current support limited to adding monitors. + mon_map = self.get("mon_map") + num_mons = len(mon_map["mons"]) + if num == num_mons: + return SSHWriteCompletionReady("The requested number of monitors exist.") + if num < num_mons: + raise NotImplementedError("Removing monitors is not supported.") + + # check that all the hostnames are registered + self._require_hosts(map(lambda h: h[0], hosts)) + + # current support requires a network to be specified + for host, network in hosts: + if not network: + raise RuntimeError("Host '{}' missing network " + "part".format(host)) + + # explicit placement: enough hosts provided? + num_new_mons = num - num_mons + if len(hosts) < num_new_mons: + raise RuntimeError("Error: {} hosts provided, expected {}".format( + len(hosts), num_new_mons)) + + self.log.info("creating {} monitors on hosts: '{}'".format( + num_new_mons, ",".join(map(lambda h: ":".join(h), hosts)))) + + # TODO: we may want to chain the creation of the monitors so they join + # the quroum one at a time. + results = [] + for host, network in hosts: + result = self._worker_pool.apply_async(self._create_mon, (host, + network)) + results.append(result) + + return SSHWriteCompletion(results) + + def _create_mgr(self, host): + """ + Create a new manager instance on a host. + """ + self.log.info("create_mgr({}): starting".format(host)) + + conn = self._get_connection(host) + + try: + bootstrap_keyring_path = self._bootstrap_mgr(conn) + + mgr_path = "/var/lib/ceph/mgr/ceph-{name}".format(name=host) + conn.remote_module.safe_makedirs(mgr_path) + keyring_path = os.path.join(mgr_path, "keyring") + + command = [ + 'ceph', + '--name', 'client.bootstrap-mgr', + '--keyring', bootstrap_keyring_path, + 'auth', 'get-or-create', 'mgr.{name}'.format(name=host), + 'mon', 'allow profile mgr', + 'osd', 'allow *', + 'mds', 'allow *', + '-o', + keyring_path + ] + + out, err, ret = remoto.process.check(conn, command) + if ret != 0: + raise Exception("oops") + + remoto.process.run(conn, + ['systemctl', 'enable', 'ceph-mgr@{name}'.format(name=host)], + timeout=7 + ) + + remoto.process.run(conn, + ['systemctl', 'start', 'ceph-mgr@{name}'.format(name=host)], + timeout=7 + ) + + remoto.process.run(conn, + ['systemctl', 'enable', 'ceph.target'], + timeout=7 + ) + + return "Created mgr on host '{}'".format(host) + + except Exception as e: + self.log.error("create_mgr({}): error: {}".format(host, e)) + raise + + finally: + self.log.info("create_mgr({}): finished".format(host)) + conn.exit() + + def update_mgrs(self, num, hosts): + """ + Adjust the number of cluster managers. + """ + # current support limited to adding managers. + mgr_map = self.get("mgr_map") + num_mgrs = 1 if mgr_map["active_name"] else 0 + num_mgrs += len(mgr_map["standbys"]) + if num == num_mgrs: + return SSHWriteCompletionReady("The requested number of managers exist.") + if num < num_mgrs: + raise NotImplementedError("Removing managers is not supported") + + # check that all the hosts are registered + hosts = list(set(hosts)) + self._require_hosts(hosts) + + # we assume explicit placement by which there are the same number of + # hosts specified as the size of increase in number of daemons. + num_new_mgrs = num - num_mgrs + if len(hosts) < num_new_mgrs: + raise RuntimeError("Error: {} hosts provided, expected {}".format( + len(hosts), num_new_mgrs)) + + self.log.info("creating {} managers on hosts: '{}'".format( + num_new_mgrs, ",".join(hosts))) + + results = [] + for i in range(num_new_mgrs): + result = self._worker_pool.apply_async(self._create_mgr, (hosts[i],)) + results.append(result) + + return SSHWriteCompletion(results) diff --git a/src/pybind/mgr/ssh/remotes.py b/src/pybind/mgr/ssh/remotes.py new file mode 100644 index 0000000000000..da057e83363b8 --- /dev/null +++ b/src/pybind/mgr/ssh/remotes.py @@ -0,0 +1,81 @@ +# ceph-deploy ftw +import os +import errno +import tempfile +import shutil + +def safe_makedirs(path, uid=-1, gid=-1): + """ create path recursively if it doesn't exist """ + try: + os.makedirs(path) + except OSError as e: + if e.errno == errno.EEXIST: + pass + else: + raise + else: + os.chown(path, uid, gid) + +def write_conf(path, conf): + if not os.path.exists(path): + dirpath = os.path.dirname(path) + if os.path.exists(dirpath): + with open(path, "w") as f: + f.write(conf) + os.chmod(path, 0o644) + else: + raise RuntimeError( + "{0} does not exist".format(dirpath)) + +def write_keyring(path, key, overwrite=False, uid=-1, gid=-1): + dirname = os.path.dirname(path) + if not os.path.exists(dirname): + safe_makedirs(dirname, uid, gid) + if not overwrite and os.path.exists(path): + return + with open(path, "wb") as f: + f.write(key.encode('utf-8')) + +def create_mon_path(path, uid=-1, gid=-1): + """create the mon path if it does not exist""" + if not os.path.exists(path): + os.makedirs(path) + os.chown(path, uid, gid); + +def write_file(path, content, mode=0o644, directory=None, uid=-1, gid=-1): + if directory: + if path.startswith("/"): + path = path[1:] + path = os.path.join(directory, path) + if os.path.exists(path): + # Delete file in case we are changing its mode + os.unlink(path) + with os.fdopen(os.open(path, os.O_WRONLY | os.O_CREAT, mode), 'wb') as f: + f.write(content.encode('utf-8')) + os.chown(path, uid, gid) + +def path_getuid(path): + return os.stat(path).st_uid + +def path_getgid(path): + return os.stat(path).st_gid + +def which(executable): + """find the location of an executable""" + locations = ( + '/usr/local/bin', + '/bin', + '/usr/bin', + '/usr/local/sbin', + '/usr/sbin', + '/sbin', + ) + + for location in locations: + executable_path = os.path.join(location, executable) + if os.path.exists(executable_path) and os.path.isfile(executable_path): + return executable_path + +if __name__ == '__channelexec__': + for item in channel: + channel.send(eval(item)) -- 2.39.5