+import base64
import errno
import json
import logging
return 0, json.dumps(res, indent=4, sort_keys=True), ''
class FSSnapshotMirror:
+ PEER_CONFIG_KEY_PREFIX = "cephfs/mirror/peer"
+
def __init__(self, mgr):
self.mgr = mgr
self.rados = mgr.rados
self.fs_map = self.mgr.get('fs_map')
self.refresh_pool_policy_locked()
+ @staticmethod
+ def make_spec(client_name, cluster_name):
+ return f'{client_name}@{cluster_name}'
+
@staticmethod
def split_spec(spec):
try:
return fs['id']
return None
+ @staticmethod
+ def peer_config_key(filesystem, peer_uuid):
+ return f'{FSSnapshotMirror.PEER_CONFIG_KEY_PREFIX}/{filesystem}/{peer_uuid}'
+
+ def config_set(self, key, val=None):
+ """set or remove a key from mon config store"""
+ if val:
+ cmd = {'prefix': 'config-key set',
+ 'key': key, 'val': val}
+ else:
+ cmd = {'prefix': 'config-key rm',
+ 'key': key}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0:
+ log.error(f'mon command to set/remove config-key {key} failed: {err}')
+ raise Exception(-errno.EINVAL)
+
+ def config_get(self, key):
+ """fetch a config key value from mon config store"""
+ cmd = {'prefix': 'config-key get', 'key': key}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0 and not r == -errno.ENOENT:
+ log.error(f'mon command to get config-key {key} failed: {err}')
+ raise Exception(-errno.EINVAL)
+ val = {}
+ if r == 0:
+ val = json.loads(outs)
+ return val
+
def filesystem_exist(self, filesystem):
for fs in self.fs_map['filesystems']:
if fs['mdsmap']['fs_name'] == filesystem:
return fs['mirror_info']['peers']
return None
+ def peer_exists(self, filesystem, remote_cluster_spec, remote_fs_name):
+ peers = self.get_filesystem_peers(filesystem)
+ for _, rem in peers.items():
+ remote = rem['remote']
+ spec = FSSnapshotMirror.make_spec(remote['client_name'], remote['cluster_name'])
+ if spec == remote_cluster_spec and remote['fs_name'] == remote_fs_name:
+ return True
+ return False
+
def get_mirror_info(self, remote_fs):
try:
val = remote_fs.getxattr('/', 'ceph.mirror.info')
def purge_mirror_info(self, local_fs_name, peer_uuid):
log.debug(f'local fs={local_fs_name} peer_uuid={peer_uuid}')
+ # resolve the peer to its spec
rem = self.resolve_peer(local_fs_name, peer_uuid)
+ if not rem:
+ return
log.debug(f'peer_uuid={peer_uuid} resolved to {rem}')
- if rem:
- client_name = rem['client_name']
- cluster_name = rem['cluster_name']
- client_name, cluster_name = FSSnapshotMirror.split_spec(f'{client_name}@{cluster_name}')
- remote_cluster, remote_fs = connect_to_filesystem(client_name,
- cluster_name,
- rem['fs_name'], 'remote')
- try:
- remote_fs.removexattr('/', 'ceph.mirror.info')
- except cephfs.Error as e:
- if not e.errno == errno.ENOENT:
- log.error('error removing mirror info')
- raise Exception(-e.errno)
- finally:
- disconnect_from_filesystem(cluster_name, rem['fs_name'], remote_cluster, remote_fs)
+ _, client_name = rem['client_name'].split('.')
- def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name):
+ # fetch auth details from config store
+ remote_conf = self.config_get(FSSnapshotMirror.peer_config_key(local_fs_name, peer_uuid))
+ remote_cluster, remote_fs = connect_to_filesystem(client_name,
+ rem['cluster_name'],
+ rem['fs_name'], 'remote', conf_dct=remote_conf)
+ try:
+ remote_fs.removexattr('/', 'ceph.mirror.info')
+ except cephfs.Error as e:
+ if not e.errno == errno.ENOENT:
+ log.error('error removing mirror info')
+ raise Exception(-e.errno)
+ finally:
+ disconnect_from_filesystem(rem['cluster_name'], rem['fs_name'], remote_cluster, remote_fs)
+
+ def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name, remote_conf={}):
log.debug(f'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}')
client_name, cluster_name = FSSnapshotMirror.split_spec(remote_cluster_spec)
- remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name,
- remote_fs_name, 'remote')
+ remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name, remote_fs_name,
+ 'remote', conf_dct=remote_conf)
+ if 'fsid' in remote_conf:
+ if not remote_cluster.get_fsid() == remote_conf['fsid']:
+ raise MirrorException(-errno.EINVAL, 'FSID mismatch between bootstrap token and remote cluster')
local_fsid = FSSnapshotMirror.get_filesystem_id(local_fs_name, self.fs_map)
if local_fsid is None:
except Exception as e:
return e.args[0], '', 'failed to disable mirroring'
- def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name):
+ def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name, remote_conf):
try:
if remote_fs_name == None:
remote_fs_name = filesystem
fspolicy = self.pool_policy.get(filesystem, None)
if not fspolicy:
raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
- self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name)
+ ### peer updates for key, site-name are not yet supported
+ if self.peer_exists(filesystem, remote_cluster_spec, remote_fs_name):
+ return 0, json.dumps({}), ''
+ # _own_ the peer
+ self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name, remote_conf)
+ # unique peer uuid
+ peer_uuid = str(uuid.uuid4())
+ config_key = FSSnapshotMirror.peer_config_key(filesystem, peer_uuid)
+ if remote_conf.get('mon_host') and remote_conf.get('key'):
+ self.config_set(config_key, json.dumps(remote_conf))
cmd = {'prefix': 'fs mirror peer_add',
'fs_name': filesystem,
+ 'uuid': peer_uuid,
'remote_cluster_spec': remote_cluster_spec,
'remote_fs_name': remote_fs_name}
r, outs, err = self.mgr.mon_command(cmd)
if r < 0:
log.error(f'mon command to add peer failed: {err}')
+ try:
+ log.debug(f'cleaning up config-key for {peer_uuid}')
+ self.config_set(config_key)
+ except:
+ pass
raise Exception(-errno.EINVAL)
return 0, json.dumps({}), ''
except MirrorException as me:
if r < 0:
log.error(f'mon command to remove peer failed: {err}')
raise Exception(-errno.EINVAL)
+ self.config_set(FSSnapshotMirror.peer_config_key(filesystem, peer_uuid))
return 0, json.dumps({}), ''
except MirrorException as me:
return me.args[0], '', me.args[1]
except Exception as e:
return e.args[0], '', 'failed to remove peer'
+ def peer_bootstrap_create(self, fs_name, client_name, site_name):
+ """create a bootstrap token for this peer filesystem"""
+ try:
+ with self.lock:
+ cmd = {'prefix': 'fs authorize',
+ 'filesystem': fs_name,
+ 'entity': client_name,
+ 'caps': ['/', 'rwps']}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0:
+ log.error(f'mon command to create peer user failed: {err}')
+ raise Exception(-errno.EINVAL)
+ cmd = {'prefix': 'auth get',
+ 'entity': client_name,
+ 'format': 'json'}
+ r, outs, err = self.mgr.mon_command(cmd)
+ if r < 0:
+ log.error(f'mon command to fetch keyring failed: {err}')
+ raise Exception(-errno.EINVAL)
+ outs = json.loads(outs)
+ outs0 = outs[0]
+ token_dct = {'fsid': self.mgr.rados.get_fsid(),
+ 'filesystem': fs_name,
+ 'user': outs0['entity'],
+ 'site_name': site_name,
+ 'key': outs0['key'],
+ 'mon_host': self.mgr.rados.conf_get('mon_host')}
+ token_str = json.dumps(token_dct).encode('utf-8')
+ encoded_token = base64.b64encode(token_str)
+ return 0, json.dumps({'token': encoded_token.decode('utf-8')}), ''
+ except MirrorException as me:
+ return me.args[0], '', me.args[1]
+ except Exception as e:
+ return e.args[0], '', 'failed to bootstrap peer'
+
+ def peer_bootstrap_import(self, filesystem, token):
+ try:
+ token_str = base64.b64decode(token)
+ token_dct = json.loads(token_str.decode('utf-8'))
+ except:
+ return -errno.EINVAL, '', 'failed to parse token'
+ client_name = token_dct.pop('user')
+ cluster_name = token_dct.pop('site_name')
+ remote_fs_name = token_dct.pop('filesystem')
+ remote_cluster_spec = f'{client_name}@{cluster_name}'
+ return self.peer_add(filesystem, remote_cluster_spec, remote_fs_name, token_dct)
+
@staticmethod
def norm_path(dir_path):
if not os.path.isabs(dir_path):