CLIENTS_REQUIRED = 1
def run_cephfs_shell_cmd(self, cmd, mount_x=None, shell_conf_path=None,
- opts=None, stdout=None, stderr=None, stdin=None):
+ opts=None, stdout=None, stderr=None, stdin=None,
+ check_status=True):
stdout = stdout or StringIO()
stderr = stderr or StringIO()
if mount_x is None:
log.info("Running command: {}".format(" ".join(args)))
return mount_x.client_remote.run(args=args, stdout=stdout,
- stderr=stderr, stdin=stdin)
+ stderr=stderr, stdin=stdin,
+ check_status=check_status)
+
+ def negtest_cephfs_shell_cmd(self, **kwargs):
+ """
+ This method verifies that cephfs shell command fails with expected
+ return value and/or error message.
+
+ kwargs is expected to hold the arguments same as
+ run_cephfs_shell_cmd() with the following exceptions -
+ * It should not contain check_status (since commands are expected
+ to fail, check_status is hardcoded to False).
+ * It is optional to set expected error message and return value to
+ dict members 'errmsg' and 'retval' respectively.
+
+ This method servers as shorthand for codeblocks like -
+
+ try:
+ proc = self.run_cephfs_shell_cmd(args=['some', 'cmd'],
+ check_status=False,
+ stdout=stdout)
+ except CommandFailedError as e:
+ self.assertNotIn('some error message',
+ proc.stderr.getvalue.lower())
+
+
+ try:
+ proc = self.run_cephfs_shell_cmd(args=['some', 'cmd'],
+ check_status=False,
+ stdout=stdout)
+ except CommandFailedError as e:
+ self.assertNotEqual(1, proc.returncode)
+ """
+ retval = kwargs.pop('retval', None)
+ errmsg = kwargs.pop('errmsg', None)
+ kwargs['check_status'] = False
+
+ proc = self.run_cephfs_shell_cmd(**kwargs)
+ if retval:
+ self.assertEqual(proc.returncode, retval)
+ else:
+ self.assertNotEqual(proc.returncode, 0)
+ if errmsg:
+ self.assertIn(errmsg, proc.stderr.getvalue().lower())
+
+ return proc
def get_cephfs_shell_cmd_output(self, cmd, mount_x=None,
shell_conf_path=None, opts=None,
- stdout=None, stdin=None):
+ stdout=None, stdin=None,check_status=True):
return ensure_str(self.run_cephfs_shell_cmd(
cmd=cmd, mount_x=mount_x, shell_conf_path=shell_conf_path,
- opts=opts, stdout=stdout, stdin=stdin).stdout.getvalue().strip())
+ opts=opts, stdout=stdout, stdin=stdin,
+ check_status=check_status).stdout.getvalue().strip())
def get_cephfs_shell_cmd_error(self, cmd, mount_x=None,
shell_conf_path=None, opts=None,
- stderr=None, stdin=None):
+ stderr=None, stdin=None, check_status=True):
return ensure_str(self.run_cephfs_shell_cmd(
cmd=cmd, mount_x=mount_x, shell_conf_path=shell_conf_path,
- opts=opts, stderr=stderr, stdin=stdin).stderr.getvalue().strip())
+ opts=opts, stderr=stderr, stdin=stdin,
+ check_status=check_status).stderr.getvalue().strip())
def run_cephfs_shell_script(self, script, mount_x=None,
shell_conf_path=None, opts=None, stdout=None,
- stderr=None, stdin=None):
+ stderr=None, stdin=None, check_status=True):
stdout = stdout or StringIO()
stderr = stderr or StringIO()
if mount_x is None:
args[1:1] = ["-c", shell_conf_path]
log.info('Running script \"' + scriptpath + '\"')
return mount_x.client_remote.run(args=args, stdout=stdout,
- stderr=stderr, stdin=stdin)
+ stderr=stderr, stdin=stdin,
+ check_status=True)
def get_cephfs_shell_script_output(self, script, mount_x=None,
shell_conf_path=None, opts=None,
- stdout=None, stdin=None):
+ stdout=None, stdin=None,
+ check_status=True):
return ensure_str(self.run_cephfs_shell_script(
script=script, mount_x=mount_x, shell_conf_path=shell_conf_path,
- opts=opts, stdout=stdout, stdin=stdin).stdout.getvalue().strip())
+ opts=opts, stdout=stdout, stdin=stdin,
+ check_status=check_status).stdout.getvalue().strip())
class TestMkdir(TestCephFSShell):
"""
Test that mkdir fails with octal mode greater than 0777
"""
- o = self.get_cephfs_shell_cmd_output("mkdir -m 07000 d2")
- log.info("cephfs-shell output:\n{}".format(o))
-
- # mkdir d2 should fail
+ self.negtest_cephfs_shell_cmd(cmd="mkdir -m 07000 d2")
try:
- o = self.mount_a.stat('d2')
- log.info("mount_a output:\n{}".format(o))
- except:
+ self.mount_a.stat('d2')
+ except CommandFailedError:
pass
def test_mkdir_with_negative_octal_mode(self):
"""
Test that mkdir fails with negative octal mode
"""
- o = self.get_cephfs_shell_cmd_output("mkdir -m -0755 d3")
- log.info("cephfs-shell output:\n{}".format(o))
-
- # mkdir d3 should fail
+ self.negtest_cephfs_shell_cmd(cmd="mkdir -m -0755 d3")
try:
- o = self.mount_a.stat('d3')
- log.info("mount_a output:\n{}".format(o))
+ self.mount_a.stat('d3')
except:
pass
"""
Test that mkdir failes with bad non-octal mode
"""
- o = self.get_cephfs_shell_cmd_output("mkdir -m ugx=0755 d5")
- log.info("cephfs-shell output:\n{}".format(o))
-
- # mkdir d5 should fail
+ self.negtest_cephfs_shell_cmd(cmd="mkdir -m ugx=0755 d5")
try:
- o = self.mount_a.stat('d5')
- log.info("mount_a output:\n{}".format(o))
+ self.mount_a.stat('d5')
except:
pass
"""
Test that mkdir fails without path option for creating path
"""
- o = self.get_cephfs_shell_cmd_output("mkdir d5/d6/d7")
- log.info("cephfs-shell output:\n{}".format(o))
-
- # mkdir d5/d6/d7 should fail
+ self.negtest_cephfs_shell_cmd(cmd="mkdir d5/d6/d7")
try:
- o = self.mount_a.stat('d5/d6/d7')
- log.info("mount_a output:\n{}".format(o))
+ self.mount_a.stat('d5/d6/d7')
except:
pass
"""
Test that rmdir does not delete a non existing directory
"""
- rmdir_output = self.get_cephfs_shell_cmd_error("rmdir test_dir")
- log.info("rmdir error output:\n{}".format(rmdir_output))
+ self.negtest_cephfs_shell_cmd(cmd="rmdir test_dir")
self.dir_does_not_exists()
def test_rmdir_dir_with_file(self):
Test that rmdir does not delete a file
"""
self.run_cephfs_shell_cmd("put - dumpfile", stdin="Valid File")
- self.run_cephfs_shell_cmd("rmdir dumpfile")
+ self.negtest_cephfs_shell_cmd(cmd="rmdir dumpfile")
self.mount_a.stat("dumpfile")
def test_rmdir_p(self):
"""
Test that rmdir -p does not delete an invalid directory
"""
- rmdir_output = self.get_cephfs_shell_cmd_error("rmdir -p test_dir")
- log.info("rmdir error output:\n{}".format(rmdir_output))
+ self.negtest_cephfs_shell_cmd(cmd="rmdir -p test_dir")
self.dir_does_not_exists()
def test_rmdir_p_dir_with_file(self):
self.assertIn('st_mode', o)
# create the same snapshot again - must fail with an error message
- o = self.get_cephfs_shell_cmd_error("snap create snap1 /data_dir")
- log.info("cephfs-shell output:\n{}".format(o))
- self.assertIn("snapshot 'snap1' already exists", o)
+ self.negtest_cephfs_shell_cmd(cmd="snap create snap1 /data_dir",
+ errmsg="snapshot 'snap1' already exists")
o = self.mount_a.stat(sdn)
log.info("mount_a output:\n{}".format(o))
self.assertIn('st_mode', o)
self.assertNotIn('st_mode', o)
# delete the same snapshot again - must fail with an error message
- o = self.get_cephfs_shell_cmd_error("snap delete snap1 /data_dir")
- self.assertIn("'snap1': no such snapshot", o)
+ self.negtest_cephfs_shell_cmd(cmd="snap delete snap1 /data_dir",
+ errmsg="'snap1': no such snapshot")
try:
o = self.mount_a.stat(sdn)
except:
def test_df_for_invalid_directory(self):
dir_abspath = path.join(self.mount_a.mountpoint, 'non-existent-dir')
- proc = self.run_cephfs_shell_cmd('df ' + dir_abspath)
- assert proc.stderr.getvalue().find('error in stat') != -1
+ self.negtest_cephfs_shell_cmd(cmd='df ' + dir_abspath,
+ errmsg='error in stat')
def test_df_for_valid_file(self):
s = 'df test' * 14145016
mount_output = self.get_cephfs_shell_cmd_output('mkdir ' + self.dir_name)
log.info("cephfs-shell mount output:\n{}".format(mount_output))
- def set_and_get_quota_vals(self, input_val):
- quota_output = self.run_cephfs_shell_cmd('quota set --max_bytes '
- + input_val[0] + ' --max_files '
- + input_val[1] + ' '
- + self.dir_name)
- log.info("cephfs-shell quota set output:\n{}".format(quota_output))
+ def set_and_get_quota_vals(self, input_val, check_status=True):
+ self.run_cephfs_shell_cmd(['quota', 'set', '--max_bytes',
+ input_val[0], '--max_files', input_val[1],
+ self.dir_name], check_status=check_status)
- quota_output = self.get_cephfs_shell_cmd_output('quota get '+ self.dir_name)
- log.info("cephfs-shell quota get output:\n{}".format(quota_output))
+ quota_output = self.get_cephfs_shell_cmd_output(['quota', 'get', self.dir_name],
+ check_status=check_status)
quota_output = quota_output.split()
return quota_output[1], quota_output[3]
def test_set_invalid_dir(self):
set_values = ('5', '5')
try:
- self.assertTupleEqual(self.set_and_get_quota_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_and_get_quota_vals(
+ set_values, False), set_values)
raise Exception("Something went wrong!! Values set for non existing directory")
except IndexError:
# Test should pass as values cannot be set for non existing directory
self.create_dir()
set_values = ('-6', '-5')
try:
- self.assertTupleEqual(self.set_and_get_quota_vals(set_values), set_values)
+ self.assertTupleEqual(self.set_and_get_quota_vals(set_values,
+ False), set_values)
raise Exception("Something went wrong!! Invalid values set")
except IndexError:
# Test should pass as invalid values cannot be set
def create_dir(self):
self.run_cephfs_shell_cmd('mkdir ' + self.dir_name)
- def set_get_list_xattr_vals(self, input_val):
- setxattr_output = self.get_cephfs_shell_cmd_output('setxattr '
- + self.dir_name
- + ' '
- + input_val[0]
- + ' ' + input_val[1])
+ def set_get_list_xattr_vals(self, input_val, negtest=False):
+ setxattr_output = self.get_cephfs_shell_cmd_output(
+ ['setxattr', self.dir_name, input_val[0], input_val[1]])
log.info("cephfs-shell setxattr output:\n{}".format(setxattr_output))
- getxattr_output = self.get_cephfs_shell_cmd_output('getxattr '
- + self.dir_name
- + ' ' + input_val[0])
+ getxattr_output = self.get_cephfs_shell_cmd_output(
+ ['getxattr', self.dir_name, input_val[0]])
log.info("cephfs-shell getxattr output:\n{}".format(getxattr_output))
- listxattr_output = self.get_cephfs_shell_cmd_output('listxattr '+ self.dir_name)
+ listxattr_output = self.get_cephfs_shell_cmd_output(
+ ['listxattr', self.dir_name])
log.info("cephfs-shell listxattr output:\n{}".format(listxattr_output))
return listxattr_output, getxattr_output
self.assertTupleEqual(self.set_get_list_xattr_vals(set_values), set_values)
def test_non_existing_dir(self):
- set_values = ('user.key', '9')
- self.assertTupleEqual(self.set_get_list_xattr_vals(set_values), (u'', u''))
+ input_val = ('user.key', '9')
+ self.negtest_cephfs_shell_cmd(cmd=['setxattr', self.dir_name, input_val[0],
+ input_val[1]])
+ self.negtest_cephfs_shell_cmd(cmd=['getxattr', self.dir_name, input_val[0]])
+ self.negtest_cephfs_shell_cmd(cmd=['listxattr', self.dir_name])
# def test_ls(self):
# """
from traceback import print_exc as traceback_print_exc
def fake_run_cephfs_shell_cmd(cmd, mount_x=None, opts=None,
- config_path=None, stdin=None):
+ config_path=None, stdin=None,
+ check_status=True):
if not mount_x:
mount_x = self.mount_a
if not config_path:
log.info("Running command: {}".format(" ".join(args)))
return mount_x.client_remote.run(args=args, stdout=StringIO(),
- stderr=StringIO(), stdin=stdin)
+ stderr=StringIO(), stdin=stdin,
+ check_status=check_status)
def fake_get_cephfs_shell_cmd_output(cmd, mount_x=None, opts=None,
- stdin=None, config_path=None):
- return fake_run_cephfs_shell_cmd(cmd=cmd, mount_x=mount_x,
- opts=opts, stdin=stdin,
- config_path=config_path).\
- stdout.getvalue().strip()
+ stdin=None, config_path=None,
+ check_status=True):
+ return ensure_str(fake_run_cephfs_shell_cmd(
+ cmd=cmd, mount_x=mount_x, opts=opts, stdin=stdin,
+ config_path=config_path, check_status=check_status).\
+ stdout.getvalue().strip())
def fake_get_cephfs_shell_cmd_error(cmd, mount_x=None, opts=None,
- stdin=None, config_path=None):
- return fake_run_cephfs_shell_cmd(cmd=cmd, mount_x=mount_x,
- opts=opts, stdin=stdin,
- config_path=config_path).\
- stderr.getvalue().strip()
+ stdin=None, config_path=None,
+ check_status=True):
+ return ensure_str(fake_run_cephfs_shell_cmd(
+ cmd=cmd, mount_x=mount_x, opts=opts, stdin=stdin,
+ config_path=config_path, check_status=check_status).\
+ stderr.getvalue().strip())
def fetch_classes_and_run_tests():
unreqd_classes = ['CephFSTestCase', 'TestCephFSShell', 'TestMisc',