self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
self.mount_a.run_shell(["rmdir", "d1"])
+ def test_directory_command_ls(self):
+ dir1 = 'dls1'
+ dir2 = 'dls2'
+ self.mount_a.run_shell(["mkdir", dir1])
+ self.mount_a.run_shell(["mkdir", dir2])
+ self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ try:
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir1}')
+ self.add_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir2}')
+ time.sleep(10)
+ dirs_list = json.loads(self.get_ceph_cmd_stdout("fs", "snapshot", "mirror", "ls", self.primary_fs_name))
+ # verify via asok
+ res = self.mirror_daemon_command(f'mirror status for fs: {self.primary_fs_name}',
+ 'fs', 'mirror', 'status', f'{self.primary_fs_name}@{self.primary_fs_id}')
+ dir_count = res['snap_dirs']['dir_count']
+ self.assertTrue(len(dirs_list) == dir_count and f'/{dir1}' in dirs_list and f'/{dir2}' in dirs_list)
+ except CommandFailedError:
+ raise RuntimeError('Error listing directories')
+ except AssertionError:
+ raise RuntimeError('Wrong number of directories listed')
+ finally:
+ self.remove_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir1}')
+ self.remove_directory(self.primary_fs_name, self.primary_fs_id, f'/{dir2}')
+
+ self.disable_mirroring(self.primary_fs_name, self.primary_fs_id)
+ self.mount_a.run_shell(["rmdir", dir1])
+ self.mount_a.run_shell(["rmdir", dir2])
+
def test_add_relative_directory_path(self):
self.enable_mirroring(self.primary_fs_name, self.primary_fs_id)
try: