"expected_output -\n{}\ndu_output -\n{}\n".format(
expected_output, du_output)
+ # NOTE: tests using these are pretty slow since to this methods sleeps for 15
+ # seconds.
+ def _setup_files(self, return_path_to_files=False, path_prefix='./'):
+ dirname = 'dir1'
+ regfilename = 'regfile'
+ hlinkname = 'hlink'
+ slinkname = 'slink1'
+ slink2name = 'slink2'
+
+ dir_abspath = path.join(self.mount_a.mountpoint, dirname)
+ regfile_abspath = path.join(self.mount_a.mountpoint, regfilename)
+ hlink_abspath = path.join(self.mount_a.mountpoint, hlinkname)
+ slink_abspath = path.join(self.mount_a.mountpoint, slinkname)
+ slink2_abspath = path.join(self.mount_a.mountpoint, slink2name)
+
+ self.mount_a.run_shell('mkdir ' + dir_abspath)
+ self.mount_a.run_shell('touch ' + regfile_abspath)
+ self.mount_a.run_shell(['ln', regfile_abspath, hlink_abspath])
+ self.mount_a.run_shell(['ln', '-s', regfile_abspath, slink_abspath])
+ self.mount_a.run_shell(['ln', '-s', dir_abspath, slink2_abspath])
+
+ dir2_name = 'dir2'
+ dir21_name = 'dir21'
+ regfile121_name = 'regfile121'
+ dir2_abspath = path.join(self.mount_a.mountpoint, dir2_name)
+ dir21_abspath = path.join(dir2_abspath, dir21_name)
+ regfile121_abspath = path.join(dir21_abspath, regfile121_name)
+ self.mount_a.run_shell('mkdir -p ' + dir21_abspath)
+ self.mount_a.run_shell('touch ' + regfile121_abspath)
+
+ sudo_write_file(self.mount_a.client_remote, regfile_abspath,
+ 'somedata')
+ sudo_write_file(self.mount_a.client_remote, regfile121_abspath,
+ 'somemoredata')
+
+ # TODO: is there a way to trigger/force update ceph.dir.rbytes?
+ # wait so that attr ceph.dir.rbytes gets a chance to be updated.
+ sleep(20)
+
+ expected_patterns = []
+ path_to_files = []
+
+ def append_expected_output_pattern(f):
+ if f == '/':
+ expected_patterns.append(r'{}{}{}'.format(size, " +", '.' + f))
+ else:
+ expected_patterns.append(r'{}{}{}'.format(size, " +",
+ path_prefix + path.relpath(f, self.mount_a.mountpoint)))
+
+ for f in [dir_abspath, regfile_abspath, regfile121_abspath,
+ hlink_abspath, slink_abspath, slink2_abspath]:
+ size = humansize(self.mount_a.stat(f, follow_symlinks=
+ False)['st_size'])
+ append_expected_output_pattern(f)
+
+ # get size for directories containig regfiles within
+ for f in [dir2_abspath, dir21_abspath]:
+ size = humansize(self.mount_a.stat(regfile121_abspath,
+ follow_symlinks=False)['st_size'])
+ append_expected_output_pattern(f)
+
+ # get size for CephFS root
+ size = 0
+ for f in [regfile_abspath, regfile121_abspath, slink_abspath,
+ slink2_abspath]:
+ size += self.mount_a.stat(f, follow_symlinks=False)['st_size']
+ size = humansize(size)
+ append_expected_output_pattern('/')
+
+ if return_path_to_files:
+ for p in [dir_abspath, regfile_abspath, dir2_abspath,
+ dir21_abspath, regfile121_abspath, hlink_abspath,
+ slink_abspath, slink2_abspath]:
+ path_to_files.append(path.relpath(p, self.mount_a.mountpoint))
+
+ return (expected_patterns, path_to_files)
+ else:
+ return expected_patterns
+
+ def test_du_works_recursively_with_no_path_in_args(self):
+ expected_patterns_in_output = self._setup_files()
+ du_output = self.get_cephfs_shell_cmd_output('du -r')
+
+ for expected_output in expected_patterns_in_output:
+ if sys_version_info.major >= 3:
+ self.assertRegex(expected_output, du_output)
+ elif sys_version_info.major < 3:
+ assert re_search(expected_output, du_output) != None, "\n" + \
+ "expected_output -\n{}\ndu_output -\n{}\n".format(
+ expected_output, du_output)
+
+
# def test_ls(self):
# """
# Test that ls passes