return "Object not found: '{0}'".format(self._object_name)
-class Filesystem(object):
+class MDSCluster(object):
"""
- This object is for driving a CephFS filesystem.
+ Collective operations on all the MDS daemons in the Ceph cluster. These
+ daemons may be in use by various Filesystems.
- Limitations:
- * Assume a single filesystem+cluster
- * Assume a single MDS
+ For the benefit of pre-multi-filesystem tests, this class is also
+ a parent of Filesystem. The correct way to use MDSCluster going forward is
+ as a separate instance outside of your (multiple) Filesystem instances.
"""
- def __init__(self, ctx, admin_remote=None):
- self._ctx = ctx
+ @property
+ def admin_remote(self):
+ first_mon = misc.get_first_mon(self._ctx, None)
+ (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
+ return result
+
+ def __init__(self, ctx):
self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
+ self._ctx = ctx
+
if len(self.mds_ids) == 0:
raise RuntimeError("This task requires at least one MDS")
- first_mon = misc.get_first_mon(ctx, None)
- if admin_remote is None:
- (self.admin_remote,) = ctx.cluster.only(first_mon).remotes.iterkeys()
- else:
- self.admin_remote = admin_remote
self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
if hasattr(self._ctx, "daemons"):
# Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
+ def _one_or_all(self, mds_id, cb, in_parallel=True):
+ """
+ Call a callback for a single named MDS, or for all.
+
+ Note that the parallelism here isn't for performance, it's to avoid being overly kind
+ to the cluster by waiting a graceful ssh-latency of time between doing things, and to
+ avoid being overly kind by executing them in a particular order. However, some actions
+ don't cope with being done in parallel, so it's optional (`in_parallel`)
+
+ :param mds_id: MDS daemon name, or None
+ :param cb: Callback taking single argument of MDS daemon name
+ :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
+ """
+ if mds_id is None:
+ if in_parallel:
+ with parallel() as p:
+ for mds_id in self.mds_ids:
+ p.spawn(cb, mds_id)
+ else:
+ for mds_id in self.mds_ids:
+ cb(mds_id)
+ else:
+ cb(mds_id)
+
+ def mds_stop(self, mds_id=None):
+ """
+ Stop the MDS daemon process(se). If it held a rank, that rank
+ will eventually go laggy.
+ """
+ self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
+
+ def mds_fail(self, mds_id=None):
+ """
+ Inform MDSMonitor of the death of the daemon process(es). If it held
+ a rank, that rank will be relinquished.
+ """
+ self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
+
+ def mds_restart(self, mds_id=None):
+ self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
+
+ def mds_fail_restart(self, mds_id=None):
+ """
+ Variation on restart that includes marking MDSs as failed, so that doing this
+ operation followed by waiting for healthy daemon states guarantees that they
+ have gone down and come up, rather than potentially seeing the healthy states
+ that existed before the restart.
+ """
+ def _fail_restart(id_):
+ self.mds_daemons[id_].stop()
+ self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
+ self.mds_daemons[id_].restart()
+
+ self._one_or_all(mds_id, _fail_restart)
+
+ def get_filesystem(self, name):
+ return Filesystem(self._ctx, name)
+
+ def get_fs_map(self):
+ fs_map = json.loads(self.mon_manager.raw_cluster_cmd("fs", "dump", "--format=json-pretty"))
+ return fs_map
+
+ def delete_all_filesystems(self):
+ """
+ Remove all filesystems that exist, and any pools in use by them.
+ """
+ fs_ls = json.loads(self.mon_manager.raw_cluster_cmd("fs", "ls", "--format=json-pretty"))
+ for fs in fs_ls:
+ self.mon_manager.raw_cluster_cmd("fs", "set", fs['name'], "cluster_down", "true")
+ mds_map = json.loads(
+ self.mon_manager.raw_cluster_cmd(
+ "fs", "get", fs['name'], "--format=json-pretty"))['mdsmap']
+
+ for gid in mds_map['up'].values():
+ self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
+
+ self.mon_manager.raw_cluster_cmd('fs', 'rm', fs['name'], '--yes-i-really-mean-it')
+ self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
+ fs['metadata_pool'],
+ fs['metadata_pool'],
+ '--yes-i-really-really-mean-it')
+ for data_pool in fs['data_pools']:
+ self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
+ data_pool, data_pool,
+ '--yes-i-really-really-mean-it')
+
+ def get_standby_daemons(self):
+ return set([s['name'] for s in self.get_fs_map()['standbys']])
+
+ def get_mds_hostnames(self):
+ result = set()
+ for mds_id in self.mds_ids:
+ mds_remote = self.mon_manager.find_remote('mds', mds_id)
+ result.add(mds_remote.hostname)
+
+ return list(result)
+
+ def get_config(self, key, service_type=None):
+ """
+ Get config from mon by default, or a specific service if caller asks for it
+ """
+ if service_type is None:
+ service_type = 'mon'
+
+ service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
+ return self.json_asok(['config', 'get', key], service_type, service_id)[key]
+
+ def set_ceph_conf(self, subsys, key, value):
+ if subsys not in self._ctx.ceph.conf:
+ self._ctx.ceph.conf[subsys] = {}
+ self._ctx.ceph.conf[subsys][key] = value
+ write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
+ # used a different config path this won't work.
+
+ def clear_ceph_conf(self, subsys, key):
+ del self._ctx.ceph.conf[subsys][key]
+ write_conf(self._ctx)
+
+ def json_asok(self, command, service_type, service_id):
+ proc = self.mon_manager.admin_socket(service_type, service_id, command)
+ response_data = proc.stdout.getvalue()
+ log.info("_json_asok output: {0}".format(response_data))
+ if response_data.strip():
+ return json.loads(response_data)
+ else:
+ return None
+
+ def set_clients_block(self, blocked, mds_id=None):
+ """
+ Block (using iptables) client communications to this MDS. Be careful: if
+ other services are running on this MDS, or other MDSs try to talk to this
+ MDS, their communications may also be blocked as collatoral damage.
+
+ :param mds_id: Optional ID of MDS to block, default to all
+ :return:
+ """
+ da_flag = "-A" if blocked else "-D"
+
+ def set_block(_mds_id):
+ remote = self.mon_manager.find_remote('mds', _mds_id)
+
+ addr = self.get_mds_addr(_mds_id)
+ ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
+
+ remote.run(
+ args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
+ "comment", "--comment", "teuthology"])
+ remote.run(
+ args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+ "comment", "--comment", "teuthology"])
+
+ self._one_or_all(mds_id, set_block, in_parallel=False)
+
+ def clear_firewall(self):
+ clear_firewall(self._ctx)
+
+ def get_mds_addr(self, mds_id):
+ """
+ Return the instance addr as a string, like "10.214.133.138:6807\/10825"
+ """
+ fs_map = self.get_fs_map()
+ infos = fs_map.standbys
+ for fs in fs_map['filesystems']:
+ infos += fs['mdsmap']['info'].values()
+ for mds_info in infos:
+ if mds_info['name'] == mds_id:
+ return mds_info['addr']
+
+ log.warn(json.dumps(infos, indent=2)) # dump for debugging
+ raise RuntimeError("MDS id '{0}' not found in map".format(mds_id))
+
+
+class Filesystem(MDSCluster):
+ """
+ This object is for driving a CephFS filesystem. The MDS daemons driven by
+ MDSCluster may be shared with other Filesystems.
+ """
+ def __init__(self, ctx, name=None):
+ super(Filesystem, self).__init__(ctx)
+
+ if name is None:
+ name = "cephfs"
+
+ self.name = name
+ self.metadata_pool_name = "{0}_metadata".format(name)
+ self.data_pool_name = "{0}_data".format(name)
+
client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
self.client_id = client_list[0]
self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
return pg_warn_min_per_osd * osd_count
def create(self):
+ log.info("Creating filesystem '{0}'".format(self.name))
+
pgs_per_fs_pool = self.get_pgs_per_fs_pool()
- self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', 'metadata', pgs_per_fs_pool.__str__())
- self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', 'data', pgs_per_fs_pool.__str__())
- self.mon_manager.raw_cluster_cmd('fs', 'new', 'default', 'metadata', 'data')
+ self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
+ self.metadata_pool_name, pgs_per_fs_pool.__str__())
+ self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
+ self.data_pool_name, pgs_per_fs_pool.__str__())
+ self.mon_manager.raw_cluster_cmd('fs', 'new',
+ self.name, self.metadata_pool_name, self.data_pool_name)
def exists(self):
"""
- Whether a filesystem is enabled at all
+ Whether a filesystem exists in the mon's filesystem list
"""
fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
- return len(fs_list) > 0
-
- def delete_all(self):
- """
- Remove all filesystems that exist, and any pools in use by them.
- """
- fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
- for fs_data in fs_list:
- self.mon_manager.raw_cluster_cmd('fs', 'rm', fs_data['name'], '--yes-i-really-mean-it')
- self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
- fs_data['metadata_pool'], fs_data['metadata_pool'],
- '--yes-i-really-really-mean-it')
- for data_pool in fs_data['data_pools']:
- self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
- data_pool, data_pool,
- '--yes-i-really-really-mean-it')
+ return self.name in [fs['name'] for fs in fs_list]
def legacy_configured(self):
"""
def _df(self):
return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
- def _fs_ls(self):
- fs_list = json.loads(self.mon_manager.raw_cluster_cmd("fs", "ls", "--format=json-pretty"))
- assert len(fs_list) == 1 # we don't handle multiple filesystems yet
- return fs_list[0]
+ def get_mds_map(self):
+ fs = json.loads(self.mon_manager.raw_cluster_cmd("fs", "get", self.name, "--format=json-pretty"))
+ return fs['mdsmap']
def get_data_pool_name(self):
- """
- Return the name of the data pool if there is only one, else raise exception -- call
- this in tests where there will only be one data pool.
- """
- names = self.get_data_pool_names()
- if len(names) > 1:
- raise RuntimeError("Multiple data pools found")
- else:
- return names[0]
+ return self.data_pool_name
def get_data_pool_names(self):
- return self._fs_ls()['data_pools']
+ return self.get_mds_map()['data_pools']
def get_metadata_pool_name(self):
- return self._fs_ls()['metadata_pool']
+ return self.metadata_pool_name
+
+ def get_namespace_id(self):
+ fs = json.loads(self.mon_manager.raw_cluster_cmd("fs", "get", self.name, "--format=json-pretty"))
+ return fs['id']
def get_pool_df(self, pool_name):
"""
def get_usage(self):
return self._df()['stats']['total_used_bytes']
- def get_mds_hostnames(self):
- result = set()
- for mds_id in self.mds_ids:
- mds_remote = self.mon_manager.find_remote('mds', mds_id)
- result.add(mds_remote.hostname)
-
- return list(result)
-
- def get_config(self, key, service_type=None):
- """
- Get config from mon by default, or a specific service if caller asks for it
- """
- if service_type is None:
- service_type = 'mon'
-
- service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
- return self.json_asok(['config', 'get', key], service_type, service_id)[key]
-
- def set_ceph_conf(self, subsys, key, value):
- if subsys not in self._ctx.ceph.conf:
- self._ctx.ceph.conf[subsys] = {}
- self._ctx.ceph.conf[subsys][key] = value
- write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
- # used a different config path this won't work.
-
- def clear_ceph_conf(self, subsys, key):
- del self._ctx.ceph.conf[subsys][key]
- write_conf(self._ctx)
-
def are_daemons_healthy(self):
"""
Return true if all daemons are in one of active, standby, standby-replay, and
"""
active_count = 0
- status = self.mon_manager.get_mds_status_all()
+ status = self.get_mds_map()
for mds_id, mds_status in status['info'].items():
if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
:param state:
:return:
"""
- status = self.mon_manager.get_mds_status_all()
+ status = self.get_mds_map()
result = []
for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
if mds_status['state'] == state:
as well as active, but does not include standby or
standby-replay.
"""
- status = self.mon_manager.get_mds_status_all()
+ status = self.get_mds_map()
result = []
for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
else:
return self.mds_ids[0]
- def _one_or_all(self, mds_id, cb, in_parallel=True):
- """
- Call a callback for a single named MDS, or for all.
-
- Note that the parallelism here isn't for performance, it's to avoid being overly kind
- to the cluster by waiting a graceful ssh-latency of time between doing things, and to
- avoid being overly kind by executing them in a particular order. However, some actions
- don't cope with being done in parallel, so it's optional (`in_parallel`)
-
- :param mds_id: MDS daemon name, or None
- :param cb: Callback taking single argument of MDS daemon name
- :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
- """
- if mds_id is None:
- if in_parallel:
- with parallel() as p:
- for mds_id in self.mds_ids:
- p.spawn(cb, mds_id)
- else:
- for mds_id in self.mds_ids:
- cb(mds_id)
- else:
- cb(mds_id)
-
- def mds_stop(self, mds_id=None):
- """
- Stop the MDS daemon process(se). If it held a rank, that rank
- will eventually go laggy.
- """
- self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
-
- def mds_fail(self, mds_id=None):
- """
- Inform MDSMonitor of the death of the daemon process(es). If it held
- a rank, that rank will be relinquished.
- """
- self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
-
- def mds_restart(self, mds_id=None):
- self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
-
- def mds_fail_restart(self, mds_id=None):
- """
- Variation on restart that includes marking MDSs as failed, so that doing this
- operation followed by waiting for healthy daemon states guarantees that they
- have gone down and come up, rather than potentially seeing the healthy states
- that existed before the restart.
- """
- def _fail_restart(id_):
- self.mds_daemons[id_].stop()
- self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
- self.mds_daemons[id_].restart()
-
- self._one_or_all(mds_id, _fail_restart)
-
def reset(self):
log.info("Creating new filesystem")
"""
temp_bin_path = '/tmp/out.bin'
- # FIXME get the metadata pool name from mdsmap instead of hardcoding
self.client_remote.run(args=[
- 'sudo', os.path.join(self._prefix, 'rados'), '-p', 'metadata', 'get', object_id, temp_bin_path
+ 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
])
stdout = StringIO()
return version
- def json_asok(self, command, service_type, service_id):
- proc = self.mon_manager.admin_socket(service_type, service_id, command)
- response_data = proc.stdout.getvalue()
- log.info("_json_asok output: {0}".format(response_data))
- if response_data.strip():
- return json.loads(response_data)
- else:
- return None
-
def mds_asok(self, command, mds_id=None):
if mds_id is None:
mds_id = self.get_lone_mds_id()
return self.json_asok(command, 'mds', mds_id)
- def get_mds_map(self):
- """
- Return the MDS map, as a JSON-esque dict from 'mds dump'
- """
- return json.loads(self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json-pretty'))
-
- def get_mds_addr(self, mds_id):
- """
- Return the instance addr as a string, like "10.214.133.138:6807\/10825"
- """
- mds_map = self.get_mds_map()
- for gid_string, mds_info in mds_map['info'].items():
- # For some reason
- if mds_info['name'] == mds_id:
- return mds_info['addr']
-
- log.warn(json.dumps(mds_map, indent=2)) # dump map for debugging
- raise RuntimeError("MDS id '{0}' not found in MDS map".format(mds_id))
-
- def set_clients_block(self, blocked, mds_id=None):
- """
- Block (using iptables) client communications to this MDS. Be careful: if
- other services are running on this MDS, or other MDSs try to talk to this
- MDS, their communications may also be blocked as collatoral damage.
-
- :param mds_id: Optional ID of MDS to block, default to all
- :return:
- """
- da_flag = "-A" if blocked else "-D"
-
- def set_block(_mds_id):
- remote = self.mon_manager.find_remote('mds', _mds_id)
-
- addr = self.get_mds_addr(_mds_id)
- ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
-
- remote.run(
- args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
- "comment", "--comment", "teuthology"])
- remote.run(
- args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
- "comment", "--comment", "teuthology"])
-
- self._one_or_all(mds_id, set_block, in_parallel=False)
-
- def clear_firewall(self):
- clear_firewall(self._ctx)
-
def is_full(self):
flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
return 'full' in flags
log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
else:
# In general, look for a single MDS
- mds_status = self.mon_manager.get_mds_status_all()
+ mds_status = self.get_mds_map()
states = [m['state'] for m in mds_status['info'].values()]
if [s for s in states if s == goal_state] == [goal_state]:
current_state = goal_state
elif reject is not None and current_state == reject:
raise RuntimeError("MDS in reject state {0}".format(current_state))
elif timeout is not None and elapsed > timeout:
- log.error("MDS status at timeout: {0}".format(self.mon_manager.get_mds_status_all()))
+ log.error("MDS status at timeout: {0}".format(self.get_mds_map()))
raise RuntimeError(
"Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
elapsed, goal_state, current_state