))
return r
+
+ MDS_STATE_ORD = {
+ "down:dne": 0, # CEPH_MDS_STATE_DNE,
+ "down:stopped": -1, # CEPH_MDS_STATE_STOPPED,
+ "down:damaged": 15, # CEPH_MDS_STATE_DAMAGED,
+ "up:boot": -4, # CEPH_MDS_STATE_BOOT,
+ "up:standby": -5, # CEPH_MDS_STATE_STANDBY,
+ "up:standby-replay": -8, # CEPH_MDS_STATE_STANDBY_REPLAY,
+ "up:oneshot-replay": -9, # CEPH_MDS_STATE_REPLAYONCE,
+ "up:creating": -6, # CEPH_MDS_STATE_CREATING,
+ "up:starting": -7, # CEPH_MDS_STATE_STARTING,
+ "up:replay": 8, # CEPH_MDS_STATE_REPLAY,
+ "up:resolve": 9, # CEPH_MDS_STATE_RESOLVE,
+ "up:reconnect": 10, # CEPH_MDS_STATE_RECONNECT,
+ "up:rejoin": 11, # CEPH_MDS_STATE_REJOIN,
+ "up:clientreplay": 12, # CEPH_MDS_STATE_CLIENTREPLAY,
+ "up:active": 13, # CEPH_MDS_STATE_ACTIVE,
+ "up:stopping": 14, # CEPH_MDS_STATE_STOPPING,
+ }
+ MDS_STATE_ACTIVE_ORD = MDS_STATE_ORD["up:active"]
+
+ def get_quiesce_leader_info(self, fscid: str) -> dict:
+ leader_info = None
+
+ for fs in self.get("fs_map")['filesystems']:
+ if fscid != fs["id"]:
+ continue
+
+ # quiesce leader is the lowest rank
+ # with the highest state
+ mdsmap = fs["mdsmap"]
+ for info in mdsmap['info'].values():
+ if info['rank'] == -1:
+ continue
+ if leader_info is None:
+ leader_info = info
+ else:
+ if info['rank'] < leader_info['rank']:
+ leader_info = info
+ elif info['rank'] == leader_info['rank']:
+ state_ord = self.MDS_STATE_ORD.get(info['state'])
+ leader_state_ord = self.MDS_STATE_ORD.get(leader_info['state'])
+
+ if state_ord <= self.MDS_STATE_ACTIVE_ORD and state_ord > leader_state_ord:
+ leader_info = info
+ break
+
+ return leader_info
+
+ def tell_quiesce_leader(self, fscid: str, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+ qleader: dict = self.get_quiesce_leader_info(fscid)
+ if qleader is None:
+ self.log.warn("Couldn't resolve the quiesce leader for fscid %s" % fscid)
+ return (-errno.ENOENT, "", "Couldn't resolve the quiesce leader for fscid %s" % fscid)
+ self.log.debug("resolved quiesce leader for fscid {fscid} at daemon '{name}' gid {gid} rank {rank} ({state})".format(fscid=fscid, **qleader))
+ return self.tell_command('mds', str(qleader['gid']), cmd_dict, one_shot=True)
def send_command(
self,
import inspect
import functools
from typing import TYPE_CHECKING, Any, Callable, Optional
+from urllib.parse import urlsplit, urlunsplit
import cephfs
ret = self.volume_exception_to_retval(ve)
return ret
+ @volume_exception_to_retval
+ def quiesce(self, cmd):
+ volname = cmd['vol_name']
+ default_group_name = cmd.get('group_name', None)
+ roots = []
+ fscid = None
+
+ with open_volume(self, volname) as fs_handle:
+ fscid = fs_handle.get_fscid()
+
+ if cmd.get('leader', False):
+ leader_info = self.mgr.get_quiesce_leader_info(fscid)
+ if leader_info is None:
+ return -errno.ENOENT, "", "Couldn't resolve the quiesce leader for volume %s (%s)" % (volname, fscid)
+ return (
+ 0,
+ "mds.%d" % leader_info['gid'],
+ "Resolved the quiesce leader for volume '{volname}' as daemon '{name}' ({gid}) {state} rank {rank}".format(volname=volname, **leader_info)
+ )
+
+
+ for member in cmd.get('members', []):
+ try:
+ member_parts = urlsplit(member)
+ except ValueError as ve:
+ return -errno.EINVAL, "", str(ve)
+ group_name = default_group_name
+
+ *maybe_group_name, subvol_name = member_parts.path.strip('/').split('/')
+ if len(maybe_group_name) > 1:
+ return -errno.EINVAL, "", "The `<group>/<subvol>` member syntax is accepted with no more than one group"
+ elif len(maybe_group_name) == 1:
+ group_name = maybe_group_name[0]
+
+ with open_group(fs_handle, self.volspec, group_name) as group:
+ with open_subvol(self.mgr, fs_handle, self.volspec, group, subvol_name, SubvolumeOpType.GETPATH) as subvol:
+ member_parts = member_parts._replace(path=subvol.path.decode('utf-8'))
+ roots.append(urlunsplit(member_parts))
+
+ cmd['roots'] = roots
+ cmd['prefix'] = 'quiesce db'
+
+ return self.mgr.tell_quiesce_leader(fscid, cmd)
+
def set_user_metadata(self, **kwargs):
ret = 0, "", ""
volname = kwargs['vol_name']
"and optionally, in a specific subvolume group",
'perm': 'rw'
},
+ {
+ 'cmd': 'fs quiesce '
+ 'name=vol_name,type=CephString '
+ 'name=members,type=CephString,n=N,req=false '
+ '-- '
+ 'name=set_id,type=CephString,req=false '
+ 'name=timeout,type=CephFloat,range=0,req=false '
+ 'name=expiration,type=CephFloat,range=0,req=false '
+ 'name=await_for,type=CephFloat,range=0,req=false '
+ 'name=await,type=CephBool,req=false '
+ 'name=if_version,type=CephInt,range=0,req=false '
+ 'name=include,type=CephBool,req=false '
+ 'name=exclude,type=CephBool,req=false '
+ 'name=reset,type=CephBool,req=false '
+ 'name=release,type=CephBool,req=false '
+ 'name=query,type=CephBool,req=false '
+ 'name=all,type=CephBool,req=false '
+ 'name=cancel,type=CephBool,req=false '
+ 'name=group_name,type=CephString,req=false '
+ 'name=leader,type=CephBool,req=false ',
+ 'desc': "Manage quiesce sets of subvolumes",
+ 'perm': 'rw'
+ },
{
'cmd': 'fs subvolumegroup pin'
' name=vol_name,type=CephString'
key_name=cmd['key_name'],
group_name=cmd.get('group_name', None),
force=cmd.get('force', False))
+
+ @mgr_cmd_wrap
+ def _cmd_fs_quiesce(self, inbuf, cmd):
+ return self.vc.quiesce(cmd)
@mgr_cmd_wrap
def _cmd_fs_subvolumegroup_pin(self, inbuf, cmd):