break
return done, metrics
+ def _setup_fs(self, fs_name):
+ fs_a = self.mds_cluster.newfs(name=fs_name)
+
+ self.mds_cluster.mds_restart()
+
+ # Wait for filesystem to go healthy
+ fs_a.wait_for_daemons()
+
+ # Reconfigure client auth caps
+ for mount in self.mounts:
+ self.mds_cluster.mon_manager.raw_cluster_cmd_result(
+ 'auth', 'caps', f"client.{mount.client_id}",
+ 'mds', 'allow',
+ 'mon', 'allow r',
+ 'osd', f'allow rw pool={fs_a.get_data_pool_name()}')
+
+ return fs_a
+
# basic check to verify if we get back metrics from each active mds rank
def test_metrics_from_rank(self):
raise
else:
raise RuntimeError("expected the 'fs perf stat' command to fail for invalid client_ip")
+
+ def test_client_metrics_and_metadata(self):
+ self.mount_a.umount_wait()
+ self.mount_b.umount_wait()
+
+ self.mds_cluster.mon_manager.raw_cluster_cmd("fs", "flag", "set",
+ "enable_multiple", "true",
+ "--yes-i-really-mean-it")
+
+ #creating filesystem
+ fs_a = self._setup_fs(fs_name = "fs1")
+
+ # Mount a client on fs_a
+ self.mount_a.mount_wait(cephfs_name=fs_a.name)
+ self.mount_a.write_n_mb("pad.bin", 1)
+ self.mount_a.write_n_mb("test.bin", 2)
+ self.mount_a.path_to_ino("test.bin")
+ self.mount_a.create_files()
+
+ #creating another filesystem
+ fs_b = self._setup_fs(fs_name = "fs2")
+
+ # Mount a client on fs_b
+ self.mount_b.mount_wait(cephfs_name=fs_b.name)
+ self.mount_b.write_n_mb("test.bin", 1)
+ self.mount_b.path_to_ino("test.bin")
+ self.mount_b.create_files()
+
+ # validate
+ valid, metrics = self._get_metrics(
+ self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
+ log.debug(f"metrics={metrics}")
+ self.assertTrue(valid)
+
+ client_metadata = metrics['client_metadata']
+
+ for i in client_metadata:
+ if not (client_metadata[i]['hostname']):
+ raise RuntimeError("hostname not found!")
+ if not (client_metadata[i]['valid_metrics']):
+ raise RuntimeError("valid_metrics not found!")
+
self.log.debug("client_metadata={0}, to_purge={1}".format(
self.client_metadata['metadata'], self.client_metadata['to_purge']))
- def update_client_meta(self, rank_set):
+ def update_client_meta(self):
new_updates = {}
pending_updates = [v[0] for v in self.client_metadata['in_progress'].values()]
with self.meta_lock:
fsmap = self.module.get('fs_map')
for fs in fsmap['filesystems']:
- mdsmap = fs['mdsmap']
- for rank in rank_set:
- gid = mdsmap['up']["mds_{0}".format(rank)]
- if gid in pending_updates:
- continue
- tag = str(uuid.uuid4())
- result = CommandResult(tag)
- new_updates[tag] = (gid, result)
+ mdsmap = fs['mdsmap']
+ gid = mdsmap['up']["mds_0"]
+ if gid in pending_updates:
+ continue
+ tag = str(uuid.uuid4())
+ result = CommandResult(tag)
+ new_updates[tag] = (gid, result)
self.client_metadata['in_progress'].update(new_updates)
- self.log.debug("updating client metadata from {0}".format(new_updates))
+ self.log.debug(f"updating client metadata from {new_updates}")
cmd_dict = {'prefix': 'client ls'}
for tag,val in new_updates.items():
# iterate over metrics list and update our copy (note that we have
# already culled the differences).
- meta_refresh_ranks = set()
for counter in incoming_metrics:
mds_rank = int(counter['k'][0][0])
client_id, client_ip = extract_client_id_and_ip(counter['k'][1][0])
if client_id is not None or not client_ip: # client_id _could_ be 0
with self.meta_lock:
- if not client_id in self.client_metadata['metadata']:
- meta_refresh_ranks.add(mds_rank)
self.set_client_metadata(client_id, "IP", client_ip)
else:
self.log.warn("client metadata for client_id={0} might be unavailable".format(client_id))
del raw_client_counters[:]
raw_client_counters.extend(counter['c'])
# send an asynchronous client metadata refresh
- self.update_client_meta(meta_refresh_ranks)
+ self.update_client_meta()
def get_raw_perf_counters_global(self, query):
raw_perf_counters = query.setdefault(QUERY_RAW_COUNTERS_GLOBAL, {})