]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mirroring: introduce peer_bootstrap {create|import} commands
authorVenky Shankar <vshankar@redhat.com>
Thu, 4 Mar 2021 05:07:43 +0000 (00:07 -0500)
committerVenky Shankar <vshankar@redhat.com>
Thu, 11 Mar 2021 08:41:52 +0000 (03:41 -0500)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/pybind/mgr/mirroring/fs/snapshot_mirror.py
src/pybind/mgr/mirroring/fs/utils.py
src/pybind/mgr/mirroring/module.py

index 44aafb492376f3a4f1e8879032eff503c88df38c..2b1a633e40a072c10b9243b3d665b155c290464b 100644 (file)
@@ -1,3 +1,4 @@
+import base64
 import errno
 import json
 import logging
@@ -275,6 +276,8 @@ class FSPolicy:
             return 0, json.dumps(res, indent=4, sort_keys=True), ''
 
 class FSSnapshotMirror:
+    PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer"
+
     def __init__(self, mgr):
         self.mgr = mgr
         self.rados = mgr.rados
@@ -291,6 +294,10 @@ class FSSnapshotMirror:
                 self.fs_map = self.mgr.get('fs_map')
                 self.refresh_pool_policy_locked()
 
+    @staticmethod
+    def make_spec(client_name, cluster_name):
+        return f'{client_name}@{cluster_name}'
+
     @staticmethod
     def split_spec(spec):
         try:
@@ -314,6 +321,35 @@ class FSSnapshotMirror:
                 return fs['id']
         return None
 
+    @staticmethod
+    def peer_config_key(filesystem, peer_uuid):
+        return f'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}'
+
+    def config_set(self, key, val=None):
+        """set or remove a key from mon config store"""
+        if val:
+            cmd = {'prefix': 'config-key set',
+                   'key': key, 'val': val}
+        else:
+            cmd = {'prefix': 'config-key rm',
+                   'key': key}
+        r, outs, err = self.mgr.mon_command(cmd)
+        if r < 0:
+            log.error(f'mon command to set/remove config-key {key} failed: {err}')
+            raise Exception(-errno.EINVAL)
+
+    def config_get(self, key):
+        """fetch a config key value from mon config store"""
+        cmd = {'prefix': 'config-key get', 'key': key}
+        r, outs, err = self.mgr.mon_command(cmd)
+        if r < 0 and not r == -errno.ENOENT:
+            log.error(f'mon command to get config-key {key} failed: {err}')
+            raise Exception(-errno.EINVAL)
+        val = {}
+        if r == 0:
+            val = json.loads(outs)
+        return val
+
     def filesystem_exist(self, filesystem):
         for fs in self.fs_map['filesystems']:
             if fs['mdsmap']['fs_name'] == filesystem:
@@ -330,6 +366,15 @@ class FSSnapshotMirror:
                 return fs['mirror_info']['peers']
         return None
 
+    def peer_exists(self, filesystem, remote_cluster_spec, remote_fs_name):
+        peers = self.get_filesystem_peers(filesystem)
+        for _, rem in peers.items():
+            remote = rem['remote']
+            spec = FSSnapshotMirror.make_spec(remote['client_name'], remote['cluster_name'])
+            if spec == remote_cluster_spec and remote['fs_name'] == remote_fs_name:
+                return True
+        return False
+
     def get_mirror_info(self, remote_fs):
         try:
             val = remote_fs.getxattr('/', 'ceph.mirror.info')
@@ -371,30 +416,36 @@ class FSSnapshotMirror:
 
     def purge_mirror_info(self, local_fs_name, peer_uuid):
         log.debug(f'local fs={local_fs_name} peer_uuid={peer_uuid}')
+        # resolve the peer to its spec
         rem = self.resolve_peer(local_fs_name, peer_uuid)
+        if not rem:
+            return
         log.debug(f'peer_uuid={peer_uuid} resolved to {rem}')
-        if rem:
-            client_name = rem['client_name']
-            cluster_name = rem['cluster_name']
-            client_name, cluster_name = FSSnapshotMirror.split_spec(f'{client_name}@{cluster_name}')
-            remote_cluster, remote_fs = connect_to_filesystem(client_name,
-                                                              cluster_name,
-                                                              rem['fs_name'], 'remote')
-            try:
-                remote_fs.removexattr('/', 'ceph.mirror.info')
-            except cephfs.Error as e:
-                if not e.errno == errno.ENOENT:
-                    log.error('error removing mirror info')
-                    raise Exception(-e.errno)
-            finally:
-                disconnect_from_filesystem(cluster_name, rem['fs_name'], remote_cluster, remote_fs)
+        _, client_name = rem['client_name'].split('.')
 
-    def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name):
+        # fetch auth details from config store
+        remote_conf = self.config_get(FSSnapshotMirror.peer_config_key(local_fs_name, peer_uuid))
+        remote_cluster, remote_fs = connect_to_filesystem(client_name,
+                                                          rem['cluster_name'],
+                                                          rem['fs_name'], 'remote', conf_dct=remote_conf)
+        try:
+            remote_fs.removexattr('/', 'ceph.mirror.info')
+        except cephfs.Error as e:
+            if not e.errno == errno.ENOENT:
+                log.error('error removing mirror info')
+                raise Exception(-e.errno)
+        finally:
+            disconnect_from_filesystem(rem['cluster_name'], rem['fs_name'], remote_cluster, remote_fs)
+
+    def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name, remote_conf={}):
         log.debug(f'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}')
 
         client_name, cluster_name = FSSnapshotMirror.split_spec(remote_cluster_spec)
-        remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name,
-                                                          remote_fs_name, 'remote')
+        remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name, remote_fs_name,
+                                                          'remote', conf_dct=remote_conf)
+        if 'fsid' in remote_conf:
+            if not remote_cluster.get_fsid() == remote_conf['fsid']:
+                raise MirrorException(-errno.EINVAL, 'FSID mismatch between bootstrap token and remote cluster')
 
         local_fsid = FSSnapshotMirror.get_filesystem_id(local_fs_name, self.fs_map)
         if local_fsid is None:
@@ -485,7 +536,7 @@ class FSSnapshotMirror:
         except Exception as e:
             return e.args[0], '', 'failed to disable mirroring'
 
-    def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name):
+    def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name, remote_conf):
         try:
             if remote_fs_name == None:
                 remote_fs_name = filesystem
@@ -493,14 +544,29 @@ class FSSnapshotMirror:
                 fspolicy = self.pool_policy.get(filesystem, None)
                 if not fspolicy:
                     raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
-                self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name)
+                ### peer updates for key, site-name are not yet supported
+                if self.peer_exists(filesystem, remote_cluster_spec, remote_fs_name):
+                    return 0, json.dumps({}), ''
+                # _own_ the peer
+                self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name, remote_conf)
+                # unique peer uuid
+                peer_uuid = str(uuid.uuid4())
+                config_key = FSSnapshotMirror.peer_config_key(filesystem, peer_uuid)
+                if remote_conf.get('mon_host') and remote_conf.get('key'):
+                    self.config_set(config_key, json.dumps(remote_conf))
                 cmd = {'prefix': 'fs mirror peer_add',
                        'fs_name': filesystem,
+                       'uuid': peer_uuid,
                        'remote_cluster_spec': remote_cluster_spec,
                        'remote_fs_name': remote_fs_name}
                 r, outs, err = self.mgr.mon_command(cmd)
                 if r < 0:
                     log.error(f'mon command to add peer failed: {err}')
+                    try:
+                        log.debug(f'cleaning up config-key for {peer_uuid}')
+                        self.config_set(config_key)
+                    except:
+                        pass
                     raise Exception(-errno.EINVAL)
                 return 0, json.dumps({}), ''
         except MirrorException as me:
@@ -530,12 +596,60 @@ class FSSnapshotMirror:
                 if r < 0:
                     log.error(f'mon command to remove peer failed: {err}')
                     raise Exception(-errno.EINVAL)
+                self.config_set(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid))
                 return 0, json.dumps({}), ''
         except MirrorException as me:
             return me.args[0], '', me.args[1]
         except Exception as e:
             return e.args[0], '', 'failed to remove peer'
 
+    def peer_bootstrap_create(self, fs_name, client_name, site_name):
+        """create a bootstrap token for this peer filesystem"""
+        try:
+            with self.lock:
+                cmd = {'prefix': 'fs authorize',
+                       'filesystem': fs_name,
+                       'entity': client_name,
+                       'caps': ['/', 'rwps']}
+                r, outs, err = self.mgr.mon_command(cmd)
+                if r < 0:
+                    log.error(f'mon command to create peer user failed: {err}')
+                    raise Exception(-errno.EINVAL)
+                cmd = {'prefix': 'auth get',
+                       'entity': client_name,
+                       'format': 'json'}
+                r, outs, err = self.mgr.mon_command(cmd)
+                if r < 0:
+                    log.error(f'mon command to fetch keyring failed: {err}')
+                    raise Exception(-errno.EINVAL)
+                outs = json.loads(outs)
+                outs0 = outs[0]
+                token_dct = {'fsid': self.mgr.rados.get_fsid(),
+                             'filesystem': fs_name,
+                             'user': outs0['entity'],
+                             'site_name': site_name,
+                             'key': outs0['key'],
+                             'mon_host': self.mgr.rados.conf_get('mon_host')}
+                token_str = json.dumps(token_dct).encode('utf-8')
+                encoded_token = base64.b64encode(token_str)
+                return 0, json.dumps({'token': encoded_token.decode('utf-8')}), ''
+        except MirrorException as me:
+            return me.args[0], '', me.args[1]
+        except Exception as e:
+            return e.args[0], '', 'failed to bootstrap peer'
+
+    def peer_bootstrap_import(self, filesystem, token):
+        try:
+            token_str = base64.b64decode(token)
+            token_dct = json.loads(token_str.decode('utf-8'))
+        except:
+            return -errno.EINVAL, '', 'failed to parse token'
+        client_name = token_dct.pop('user')
+        cluster_name = token_dct.pop('site_name')
+        remote_fs_name = token_dct.pop('filesystem')
+        remote_cluster_spec = f'{client_name}@{cluster_name}'
+        return self.peer_add(filesystem, remote_cluster_spec, remote_fs_name, token_dct)
+
     @staticmethod
     def norm_path(dir_path):
         if not os.path.isabs(dir_path):
index 625e155d651a708cf671155a338d36e2f15cc10c..b9fa59051eb918a3cc3e9ad668b3ffb722051414 100644 (file)
@@ -15,11 +15,17 @@ DIRECTORY_MAP_PREFIX = "dir_map_"
 
 log = logging.getLogger(__name__)
 
-def connect_to_cluster(client_name, cluster_name, desc=''):
+def connect_to_cluster(client_name, cluster_name, conf_dct, desc=''):
     try:
         log.debug(f'connecting to {desc} cluster: {client_name}/{cluster_name}')
-        r_rados = rados.Rados(rados_id=client_name, clustername=cluster_name)
-        r_rados.conf_read_file()
+        mon_host = conf_dct.get('mon_host', '')
+        cephx_key = conf_dct.get('key', '')
+        if mon_host and cephx_key:
+            r_rados = rados.Rados(rados_id=client_name, conf={'mon_host': mon_host,
+                                                              'key': cephx_key})
+        else:
+            r_rados = rados.Rados(rados_id=client_name, clustername=cluster_name)
+            r_rados.conf_read_file()
         r_rados.connect()
         log.debug(f'connected to {desc} cluster')
         return r_rados
@@ -38,9 +44,9 @@ def disconnect_from_cluster(cluster_name, cluster):
     except Exception as e:
         log.error(f'error disconnecting: {e}')
 
-def connect_to_filesystem(client_name, cluster_name, fs_name, desc):
+def connect_to_filesystem(client_name, cluster_name, fs_name, desc, conf_dct={}):
     try:
-        cluster = connect_to_cluster(client_name, cluster_name, desc)
+        cluster = connect_to_cluster(client_name, cluster_name, conf_dct, desc)
         log.debug(f'connecting to {desc} filesystem: {fs_name}')
         fs = cephfs.LibCephFS(rados_inst=cluster)
         log.debug('CephFS initializing...')
index 0ff3c084b9de506b9312a879f4a88fedab6ac61a..8086324f73ed6c3ca80f7efd2248ce144d9554a6 100644 (file)
@@ -30,10 +30,16 @@ class Module(MgrModule):
     def snapshot_mirorr_peer_add(self,
                                  fs_name: str,
                                  remote_cluster_spec: str,
-                                 remote_fs_name: Optional[str] = None):
+                                 remote_fs_name: Optional[str] = None,
+                                 remote_mon_host: Optional[str] = None,
+                                 cephx_key: Optional[str] = None):
         """Add a remote filesystem peer"""
+        conf = {}
+        if remote_mon_host and cephx_key:
+            conf['mon_host'] = remote_mon_host
+            conf['key'] = cephx_key
         return self.fs_snapshot_mirror.peer_add(fs_name, remote_cluster_spec,
-                                                remote_fs_name)
+                                                remote_fs_name, remote_conf=conf)
 
     @CLIWriteCommand('fs snapshot mirror peer_remove')
     def snapshot_mirror_peer_remove(self,
@@ -42,6 +48,21 @@ class Module(MgrModule):
         """Remove a filesystem peer"""
         return self.fs_snapshot_mirror.peer_remove(fs_name, peer_uuid)
 
+    @CLIWriteCommand('fs snapshot mirror peer_bootstrap create')
+    def snapshot_mirror_peer_bootstrap_create(self,
+                                              fs_name: str,
+                                              client_name: str,
+                                              site_name: str):
+        """Bootstrap a filesystem peer"""
+        return self.fs_snapshot_mirror.peer_bootstrap_create(fs_name, client_name, site_name)
+
+    @CLIWriteCommand('fs snapshot mirror peer_bootstrap import')
+    def snapshot_mirror_peer_bootstrap_import(self,
+                                              fs_name: str,
+                                              token: str):
+        """Import a bootstrap token"""
+        return self.fs_snapshot_mirror.peer_bootstrap_import(fs_name, token)
+
     @CLIWriteCommand('fs snapshot mirror add')
     def snapshot_mirror_add_dir(self,
                                 fs_name: str,