From 8d2c726e3c1953b1b21dd365d3963078f551747b Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Thu, 4 Mar 2021 00:07:43 -0500 Subject: [PATCH] pybind/mirroring: introduce peer_bootstrap {create|import} commands Signed-off-by: Venky Shankar --- .../mgr/mirroring/fs/snapshot_mirror.py | 154 +++++++++++++++--- src/pybind/mgr/mirroring/fs/utils.py | 16 +- src/pybind/mgr/mirroring/module.py | 25 ++- 3 files changed, 168 insertions(+), 27 deletions(-) diff --git a/src/pybind/mgr/mirroring/fs/snapshot_mirror.py b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py index 44aafb49237..2b1a633e40a 100644 --- a/src/pybind/mgr/mirroring/fs/snapshot_mirror.py +++ b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py @@ -1,3 +1,4 @@ +import base64 import errno import json import logging @@ -275,6 +276,8 @@ class FSPolicy: return 0, json.dumps(res, indent=4, sort_keys=True), '' class FSSnapshotMirror: + PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer" + def __init__(self, mgr): self.mgr = mgr self.rados = mgr.rados @@ -291,6 +294,10 @@ class FSSnapshotMirror: self.fs_map = self.mgr.get('fs_map') self.refresh_pool_policy_locked() + @staticmethod + def make_spec(client_name, cluster_name): + return f'{client_name}@{cluster_name}' + @staticmethod def split_spec(spec): try: @@ -314,6 +321,35 @@ class FSSnapshotMirror: return fs['id'] return None + @staticmethod + def peer_config_key(filesystem, peer_uuid): + return f'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}' + + def config_set(self, key, val=None): + """set or remove a key from mon config store""" + if val: + cmd = {'prefix': 'config-key set', + 'key': key, 'val': val} + else: + cmd = {'prefix': 'config-key rm', + 'key': key} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0: + log.error(f'mon command to set/remove config-key {key} failed: {err}') + raise Exception(-errno.EINVAL) + + def config_get(self, key): + """fetch a config key value from mon config store""" + cmd = {'prefix': 'config-key get', 'key': key} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0 and not r == -errno.ENOENT: + log.error(f'mon command to get config-key {key} failed: {err}') + raise Exception(-errno.EINVAL) + val = {} + if r == 0: + val = json.loads(outs) + return val + def filesystem_exist(self, filesystem): for fs in self.fs_map['filesystems']: if fs['mdsmap']['fs_name'] == filesystem: @@ -330,6 +366,15 @@ class FSSnapshotMirror: return fs['mirror_info']['peers'] return None + def peer_exists(self, filesystem, remote_cluster_spec, remote_fs_name): + peers = self.get_filesystem_peers(filesystem) + for _, rem in peers.items(): + remote = rem['remote'] + spec = FSSnapshotMirror.make_spec(remote['client_name'], remote['cluster_name']) + if spec == remote_cluster_spec and remote['fs_name'] == remote_fs_name: + return True + return False + def get_mirror_info(self, remote_fs): try: val = remote_fs.getxattr('/', 'ceph.mirror.info') @@ -371,30 +416,36 @@ class FSSnapshotMirror: def purge_mirror_info(self, local_fs_name, peer_uuid): log.debug(f'local fs={local_fs_name} peer_uuid={peer_uuid}') + # resolve the peer to its spec rem = self.resolve_peer(local_fs_name, peer_uuid) + if not rem: + return log.debug(f'peer_uuid={peer_uuid} resolved to {rem}') - if rem: - client_name = rem['client_name'] - cluster_name = rem['cluster_name'] - client_name, cluster_name = FSSnapshotMirror.split_spec(f'{client_name}@{cluster_name}') - remote_cluster, remote_fs = connect_to_filesystem(client_name, - cluster_name, - rem['fs_name'], 'remote') - try: - remote_fs.removexattr('/', 'ceph.mirror.info') - except cephfs.Error as e: - if not e.errno == errno.ENOENT: - log.error('error removing mirror info') - raise Exception(-e.errno) - finally: - disconnect_from_filesystem(cluster_name, rem['fs_name'], remote_cluster, remote_fs) + _, client_name = rem['client_name'].split('.') - def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name): + # fetch auth details from config store + remote_conf = self.config_get(FSSnapshotMirror.peer_config_key(local_fs_name, peer_uuid)) + remote_cluster, remote_fs = connect_to_filesystem(client_name, + rem['cluster_name'], + rem['fs_name'], 'remote', conf_dct=remote_conf) + try: + remote_fs.removexattr('/', 'ceph.mirror.info') + except cephfs.Error as e: + if not e.errno == errno.ENOENT: + log.error('error removing mirror info') + raise Exception(-e.errno) + finally: + disconnect_from_filesystem(rem['cluster_name'], rem['fs_name'], remote_cluster, remote_fs) + + def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name, remote_conf={}): log.debug(f'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}') client_name, cluster_name = FSSnapshotMirror.split_spec(remote_cluster_spec) - remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name, - remote_fs_name, 'remote') + remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name, remote_fs_name, + 'remote', conf_dct=remote_conf) + if 'fsid' in remote_conf: + if not remote_cluster.get_fsid() == remote_conf['fsid']: + raise MirrorException(-errno.EINVAL, 'FSID mismatch between bootstrap token and remote cluster') local_fsid = FSSnapshotMirror.get_filesystem_id(local_fs_name, self.fs_map) if local_fsid is None: @@ -485,7 +536,7 @@ class FSSnapshotMirror: except Exception as e: return e.args[0], '', 'failed to disable mirroring' - def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name): + def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name, remote_conf): try: if remote_fs_name == None: remote_fs_name = filesystem @@ -493,14 +544,29 @@ class FSSnapshotMirror: fspolicy = self.pool_policy.get(filesystem, None) if not fspolicy: raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored') - self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name) + ### peer updates for key, site-name are not yet supported + if self.peer_exists(filesystem, remote_cluster_spec, remote_fs_name): + return 0, json.dumps({}), '' + # _own_ the peer + self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name, remote_conf) + # unique peer uuid + peer_uuid = str(uuid.uuid4()) + config_key = FSSnapshotMirror.peer_config_key(filesystem, peer_uuid) + if remote_conf.get('mon_host') and remote_conf.get('key'): + self.config_set(config_key, json.dumps(remote_conf)) cmd = {'prefix': 'fs mirror peer_add', 'fs_name': filesystem, + 'uuid': peer_uuid, 'remote_cluster_spec': remote_cluster_spec, 'remote_fs_name': remote_fs_name} r, outs, err = self.mgr.mon_command(cmd) if r < 0: log.error(f'mon command to add peer failed: {err}') + try: + log.debug(f'cleaning up config-key for {peer_uuid}') + self.config_set(config_key) + except: + pass raise Exception(-errno.EINVAL) return 0, json.dumps({}), '' except MirrorException as me: @@ -530,12 +596,60 @@ class FSSnapshotMirror: if r < 0: log.error(f'mon command to remove peer failed: {err}') raise Exception(-errno.EINVAL) + self.config_set(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid)) return 0, json.dumps({}), '' except MirrorException as me: return me.args[0], '', me.args[1] except Exception as e: return e.args[0], '', 'failed to remove peer' + def peer_bootstrap_create(self, fs_name, client_name, site_name): + """create a bootstrap token for this peer filesystem""" + try: + with self.lock: + cmd = {'prefix': 'fs authorize', + 'filesystem': fs_name, + 'entity': client_name, + 'caps': ['/', 'rwps']} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0: + log.error(f'mon command to create peer user failed: {err}') + raise Exception(-errno.EINVAL) + cmd = {'prefix': 'auth get', + 'entity': client_name, + 'format': 'json'} + r, outs, err = self.mgr.mon_command(cmd) + if r < 0: + log.error(f'mon command to fetch keyring failed: {err}') + raise Exception(-errno.EINVAL) + outs = json.loads(outs) + outs0 = outs[0] + token_dct = {'fsid': self.mgr.rados.get_fsid(), + 'filesystem': fs_name, + 'user': outs0['entity'], + 'site_name': site_name, + 'key': outs0['key'], + 'mon_host': self.mgr.rados.conf_get('mon_host')} + token_str = json.dumps(token_dct).encode('utf-8') + encoded_token = base64.b64encode(token_str) + return 0, json.dumps({'token': encoded_token.decode('utf-8')}), '' + except MirrorException as me: + return me.args[0], '', me.args[1] + except Exception as e: + return e.args[0], '', 'failed to bootstrap peer' + + def peer_bootstrap_import(self, filesystem, token): + try: + token_str = base64.b64decode(token) + token_dct = json.loads(token_str.decode('utf-8')) + except: + return -errno.EINVAL, '', 'failed to parse token' + client_name = token_dct.pop('user') + cluster_name = token_dct.pop('site_name') + remote_fs_name = token_dct.pop('filesystem') + remote_cluster_spec = f'{client_name}@{cluster_name}' + return self.peer_add(filesystem, remote_cluster_spec, remote_fs_name, token_dct) + @staticmethod def norm_path(dir_path): if not os.path.isabs(dir_path): diff --git a/src/pybind/mgr/mirroring/fs/utils.py b/src/pybind/mgr/mirroring/fs/utils.py index 625e155d651..b9fa59051eb 100644 --- a/src/pybind/mgr/mirroring/fs/utils.py +++ b/src/pybind/mgr/mirroring/fs/utils.py @@ -15,11 +15,17 @@ DIRECTORY_MAP_PREFIX = "dir_map_" log = logging.getLogger(__name__) -def connect_to_cluster(client_name, cluster_name, desc=''): +def connect_to_cluster(client_name, cluster_name, conf_dct, desc=''): try: log.debug(f'connecting to {desc} cluster: {client_name}/{cluster_name}') - r_rados = rados.Rados(rados_id=client_name, clustername=cluster_name) - r_rados.conf_read_file() + mon_host = conf_dct.get('mon_host', '') + cephx_key = conf_dct.get('key', '') + if mon_host and cephx_key: + r_rados = rados.Rados(rados_id=client_name, conf={'mon_host': mon_host, + 'key': cephx_key}) + else: + r_rados = rados.Rados(rados_id=client_name, clustername=cluster_name) + r_rados.conf_read_file() r_rados.connect() log.debug(f'connected to {desc} cluster') return r_rados @@ -38,9 +44,9 @@ def disconnect_from_cluster(cluster_name, cluster): except Exception as e: log.error(f'error disconnecting: {e}') -def connect_to_filesystem(client_name, cluster_name, fs_name, desc): +def connect_to_filesystem(client_name, cluster_name, fs_name, desc, conf_dct={}): try: - cluster = connect_to_cluster(client_name, cluster_name, desc) + cluster = connect_to_cluster(client_name, cluster_name, conf_dct, desc) log.debug(f'connecting to {desc} filesystem: {fs_name}') fs = cephfs.LibCephFS(rados_inst=cluster) log.debug('CephFS initializing...') diff --git a/src/pybind/mgr/mirroring/module.py b/src/pybind/mgr/mirroring/module.py index 0ff3c084b9d..8086324f73e 100644 --- a/src/pybind/mgr/mirroring/module.py +++ b/src/pybind/mgr/mirroring/module.py @@ -30,10 +30,16 @@ class Module(MgrModule): def snapshot_mirorr_peer_add(self, fs_name: str, remote_cluster_spec: str, - remote_fs_name: Optional[str] = None): + remote_fs_name: Optional[str] = None, + remote_mon_host: Optional[str] = None, + cephx_key: Optional[str] = None): """Add a remote filesystem peer""" + conf = {} + if remote_mon_host and cephx_key: + conf['mon_host'] = remote_mon_host + conf['key'] = cephx_key return self.fs_snapshot_mirror.peer_add(fs_name, remote_cluster_spec, - remote_fs_name) + remote_fs_name, remote_conf=conf) @CLIWriteCommand('fs snapshot mirror peer_remove') def snapshot_mirror_peer_remove(self, @@ -42,6 +48,21 @@ class Module(MgrModule): """Remove a filesystem peer""" return self.fs_snapshot_mirror.peer_remove(fs_name, peer_uuid) + @CLIWriteCommand('fs snapshot mirror peer_bootstrap create') + def snapshot_mirror_peer_bootstrap_create(self, + fs_name: str, + client_name: str, + site_name: str): + """Bootstrap a filesystem peer""" + return self.fs_snapshot_mirror.peer_bootstrap_create(fs_name, client_name, site_name) + + @CLIWriteCommand('fs snapshot mirror peer_bootstrap import') + def snapshot_mirror_peer_bootstrap_import(self, + fs_name: str, + token: str): + """Import a bootstrap token""" + return self.fs_snapshot_mirror.peer_bootstrap_import(fs_name, token) + @CLIWriteCommand('fs snapshot mirror add') def snapshot_mirror_add_dir(self, fs_name: str, -- 2.39.5