-
+from StringIO import StringIO
import json
import logging
from tasks.cephfs.cephfs_test_case import CephFSTestCase
ls_data = self.fs.mds_asok(['session', 'ls'])
self.assertEqual(len(ls_data), 0)
+ def _get_thread_count(self, mds_id):
+ remote = self.fs.mds_daemons[mds_id].remote
+
+ ps_txt = remote.run(
+ args=["ps", "axo", "cmd,nlwp"],
+ stdout=StringIO()
+ ).stdout.getvalue().strip()
+ lines = ps_txt.split("\n")[1:]
+
+ for line in lines:
+ if line.find("ceph-mds") != -1:
+ if line.find("-i {0}".format(mds_id)) != -1:
+ log.info("Found ps line for daemon: {0}".format(line))
+ return int(line.split()[-1])
+
+ raise RuntimeError("No process found in ps output for MDS {0}: {1}".format(
+ mds_id, ps_txt
+ ))
+
+ def test_tell_conn_close(self):
+ """
+ That when a `tell` command is sent using the python CLI,
+ the thread count goes back to where it started (i.e. we aren't
+ leaving connections open)
+ """
+ self.mount_a.umount_wait()
+ self.mount_b.umount_wait()
+
+ mds_id = self.fs.get_lone_mds_id()
+
+ initial_thread_count = self._get_thread_count(mds_id)
+ self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
+ final_thread_count = self._get_thread_count(mds_id)
+
+ self.assertEqual(initial_thread_count, final_thread_count)
+
+ def test_mount_conn_close(self):
+ """
+ That when a client unmounts, the thread count on the MDS goes back
+ to what it was before the client mounted
+ """
+ self.mount_a.umount_wait()
+ self.mount_b.umount_wait()
+
+ mds_id = self.fs.get_lone_mds_id()
+
+ initial_thread_count = self._get_thread_count(mds_id)
+ self.mount_a.mount()
+ self.assertGreater(self._get_thread_count(mds_id), initial_thread_count)
+ self.mount_a.umount_wait()
+ final_thread_count = self._get_thread_count(mds_id)
+
+ self.assertEqual(initial_thread_count, final_thread_count)
+
def test_version_splitting(self):
"""
That when many sessions are updated, they are correctly