]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
qa: decouple session map test from simple msgr 27415/head
authorPatrick Donnelly <pdonnell@redhat.com>
Fri, 5 Apr 2019 23:06:22 +0000 (16:06 -0700)
committerPatrick Donnelly <pdonnell@redhat.com>
Mon, 8 Apr 2019 18:50:17 +0000 (11:50 -0700)
Instead of looking at the number of threads (used by the simple messenger) to
judge the coming and going of connections, use the (async) messenger perf
counters.

Plus some other minor improvements.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
qa/tasks/cephfs/test_sessionmap.py

index 29dd0ff298b69704935429723d8b9590262b8dd7..e7de6ef0576962ab598d6254cc717935a797b387 100644 (file)
@@ -22,47 +22,35 @@ class TestSessionMap(CephFSTestCase):
         self.mount_a.umount_wait()
         self.mount_b.umount_wait()
 
-        mds_id = self.fs.get_lone_mds_id()
-        self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
+        status = self.fs.status()
+        self.fs.rank_tell(["session", "ls"], status=status)
 
-        ls_data = self.fs.mds_asok(['session', 'ls'])
+        ls_data = self.fs.rank_asok(['session', 'ls'], status=status)
         self.assertEqual(len(ls_data), 0)
 
-    def _get_thread_count(self, mds_id):
-        remote = self.fs.mds_daemons[mds_id].remote
-
-        ps_txt = remote.run(
-            args=["ps", "-ww", "axo", "nlwp,cmd"],
-            stdout=StringIO()
-        ).stdout.getvalue().strip()
-        lines = ps_txt.split("\n")[1:]
-
-        for line in lines:
-            if "ceph-mds" in line and not "daemon-helper" in line:
-                if line.find("-i {0}".format(mds_id)) != -1:
-                    log.info("Found ps line for daemon: {0}".format(line))
-                    return int(line.split()[0])
-
-        raise RuntimeError("No process found in ps output for MDS {0}: {1}".format(
-            mds_id, ps_txt
-        ))
+    def _get_connection_count(self, status=None):
+        perf = self.fs.rank_asok(["perf", "dump"], status=status)
+        conn = 0
+        for module, dump in perf.iteritems():
+            if "AsyncMessenger::Worker" in module:
+                conn += dump['msgr_active_connections']
+        return conn
 
     def test_tell_conn_close(self):
         """
         That when a `tell` command is sent using the python CLI,
-        the thread count goes back to where it started (i.e. we aren't
+        the conn count goes back to where it started (i.e. we aren't
         leaving connections open)
         """
         self.mount_a.umount_wait()
         self.mount_b.umount_wait()
 
-        mds_id = self.fs.get_lone_mds_id()
+        status = self.fs.status()
+        s = self._get_connection_count(status=status)
+        self.fs.rank_tell(["session", "ls"], status=status)
+        e = self._get_connection_count(status=status)
 
-        initial_thread_count = self._get_thread_count(mds_id)
-        self.fs.mon_manager.raw_cluster_cmd("tell", "mds.{0}".format(mds_id), "session", "ls")
-        final_thread_count = self._get_thread_count(mds_id)
-
-        self.assertEqual(initial_thread_count, final_thread_count)
+        self.assertEqual(s, e)
 
     def test_mount_conn_close(self):
         """
@@ -72,16 +60,15 @@ class TestSessionMap(CephFSTestCase):
         self.mount_a.umount_wait()
         self.mount_b.umount_wait()
 
-        mds_id = self.fs.get_lone_mds_id()
-
-        initial_thread_count = self._get_thread_count(mds_id)
+        status = self.fs.status()
+        s = self._get_connection_count(status=status)
         self.mount_a.mount()
         self.mount_a.wait_until_mounted()
-        self.assertGreater(self._get_thread_count(mds_id), initial_thread_count)
+        self.assertGreater(self._get_connection_count(status=status), s)
         self.mount_a.umount_wait()
-        final_thread_count = self._get_thread_count(mds_id)
+        e = self._get_connection_count(status=status)
 
-        self.assertEqual(initial_thread_count, final_thread_count)
+        self.assertEqual(s, e)
 
     def test_version_splitting(self):
         """
@@ -102,11 +89,7 @@ class TestSessionMap(CephFSTestCase):
         self.fs.set_max_mds(2)
         self.fs.wait_for_daemons()
 
-        active_mds_names = self.fs.get_active_names()
-        rank_0_id = active_mds_names[0]
-        rank_1_id = active_mds_names[1]
-        log.info("Ranks 0 and 1 are {0} and {1}".format(
-            rank_0_id, rank_1_id))
+        status = self.fs.status()
 
         # Bring the clients back
         self.mount_a.mount()
@@ -115,10 +98,10 @@ class TestSessionMap(CephFSTestCase):
         self.mount_b.create_files()
 
         # See that they've got sessions
-        self.assert_session_count(2, mds_id=rank_0_id)
+        self.assert_session_count(2, mds_id=self.fs.get_rank(status=status)['name'])
 
         # See that we persist their sessions
-        self.fs.mds_asok(["flush", "journal"], rank_0_id)
+        self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
         table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
         log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
         self.assertEqual(table_json['0']['result'], 0)
@@ -130,14 +113,14 @@ class TestSessionMap(CephFSTestCase):
         self.mount_b.run_shell(["ls", "-l", "bravo/file"])
 
         def get_omap_wrs():
-            return self.fs.mds_asok(['perf', 'dump', 'objecter'], rank_1_id)['objecter']['omap_wr']
+            return self.fs.rank_asok(['perf', 'dump', 'objecter'], rank=1, status=status)['objecter']['omap_wr']
 
         # Flush so that there are no dirty sessions on rank 1
-        self.fs.mds_asok(["flush", "journal"], rank_1_id)
+        self.fs.rank_asok(["flush", "journal"], rank=1, status=status)
 
         # Export so that we get a force_open to rank 1 for the two sessions from rank 0
         initial_omap_wrs = get_omap_wrs()
-        self.fs.mds_asok(['export', 'dir', '/bravo', '1'], rank_0_id)
+        self.fs.rank_asok(['export', 'dir', '/bravo', '1'], rank=0, status=status)
 
         # This is the critical (if rather subtle) check: that in the process of doing an export dir,
         # we hit force_open_sessions, and as a result we end up writing out the sessionmap.  There
@@ -156,10 +139,10 @@ class TestSessionMap(CephFSTestCase):
         self.mount_b.umount_wait()
 
         # In-memory sessionmap check
-        self.assert_session_count(0, mds_id=rank_0_id)
+        self.assert_session_count(0, mds_id=self.fs.get_rank(status=status)['name'])
 
         # On-disk sessionmap check
-        self.fs.mds_asok(["flush", "journal"], rank_0_id)
+        self.fs.rank_asok(["flush", "journal"], rank=0, status=status)
         table_json = json.loads(self.fs.table_tool(["0", "show", "session"]))
         log.info("SessionMap: {0}".format(json.dumps(table_json, indent=2)))
         self.assertEqual(table_json['0']['result'], 0)