self._delete_test_volume()
super(TestVolumes, self).tearDown()
+ def test_connection_expiration(self):
+ # unmount any cephfs mounts
+ self.mount_a.umount_wait()
+ sessions = self._session_list()
+ self.assertLessEqual(len(sessions), 1) # maybe mgr is already mounted
+
+ # Get the mgr to definitely mount cephfs
+ subvolume = self._generate_random_subvolume_name()
+ self._fs_cmd("subvolume", "create", self.volname, subvolume)
+ sessions = self._session_list()
+ self.assertEqual(len(sessions), 1)
+
+ # Now wait for the mgr to expire the connection:
+ self.wait_until_evicted(sessions[0]['id'], timeout=90)
+
def test_volume_rm(self):
try:
self._fs_cmd("volume", "rm", self.volname)