def run_shell(self, args, timeout=300, **kwargs):
omit_sudo = kwargs.pop('omit_sudo', False)
- sudo = kwargs.pop('sudo', False)
cwd = kwargs.pop('cwd', self.mountpoint)
stdout = kwargs.pop('stdout', StringIO())
stderr = kwargs.pop('stderr', StringIO())
- if sudo:
- if isinstance(args, list):
- args.insert(0, 'sudo')
- elif isinstance(args, str):
- args = 'sudo ' + args
-
return self.client_remote.run(args=args, cwd=cwd, timeout=timeout,
stdout=stdout, stderr=stderr,
omit_sudo=omit_sudo, **kwargs)
def run_shell_payload(self, payload, **kwargs):
- return self.run_shell(["bash", "-c", Raw(f"'{payload}'")], **kwargs)
+ kwargs['args'] = ["bash", "-c", Raw(f"'{payload}'")]
+ if kwargs.pop('sudo', False):
+ kwargs['args'].insert(0, 'sudo')
+ kwargs['omit_sudo'] = False
+ return self.run_shell(**kwargs)
def run_as_user(self, **kwargs):
"""
"""
Wrap ls: return a list of strings
"""
- cmd = ["ls"]
+ kwargs['args'] = ["ls"]
if path:
- cmd.append(path)
-
- ls_text = self.run_shell(cmd, **kwargs).stdout.getvalue().strip()
+ kwargs['args'].append(path)
+ if kwargs.pop('sudo', False):
+ kwargs['args'].insert(0, 'sudo')
+ kwargs['omit_sudo'] = False
+ ls_text = self.run_shell(**kwargs).stdout.getvalue().strip()
if ls_text:
return ls_text.split("\n")
:param val: xattr value
:return: None
"""
- self.run_shell(["setfattr", "-n", key, "-v", val, path], **kwargs)
+ kwargs['args'] = ["setfattr", "-n", key, "-v", val, path]
+ if kwargs.pop('sudo', False):
+ kwargs['args'].insert(0, 'sudo')
+ kwargs['omit_sudo'] = False
+ self.run_shell(**kwargs)
def getfattr(self, path, attr, **kwargs):
"""
:return: a string
"""
- p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False, **kwargs)
+ kwargs['args'] = ["getfattr", "--only-values", "-n", attr, path]
+ if kwargs.pop('sudo', False):
+ kwargs['args'].insert(0, 'sudo')
+ kwargs['omit_sudo'] = False
+ kwargs['wait'] = False
+ p = self.run_shell(**kwargs)
try:
p.wait()
except CommandFailedError as e:
subvolpath = self._get_subvolume_path(self.volname, subvolume, group_name=subvolume_group)
# mode
- self.mount_a.run_shell(['chmod', mode, subvolpath], sudo=True)
+ self.mount_a.run_shell(['sudo', 'chmod', mode, subvolpath], omit_sudo=False)
# ownership
- self.mount_a.run_shell(['chown', uid, subvolpath], sudo=True)
- self.mount_a.run_shell(['chgrp', gid, subvolpath], sudo=True)
+ self.mount_a.run_shell(['sudo', 'chown', uid, subvolpath], omit_sudo=False)
+ self.mount_a.run_shell(['sudo', 'chgrp', gid, subvolpath], omit_sudo=False)
def _do_subvolume_io(self, subvolume, subvolume_group=None, create_dir=None,
number_of_files=DEFAULT_NUMBER_OF_FILES, file_size=DEFAULT_FILE_SIZE):
self.mount_a.run_shell(["ln", "-s", "./{}".format(reg_file), sym_path1])
self.mount_a.run_shell(["ln", "-s", "./{}".format(reg_file), sym_path2])
# flip ownership to nobody. assumption: nobody's id is 65534
- self.mount_a.run_shell(["chown", "-h", "65534:65534", sym_path2], sudo=True, omit_sudo=False)
+ self.mount_a.run_shell(["sudo", "chown", "-h", "65534:65534", sym_path2], omit_sudo=False)
def _wait_for_trash_empty(self, timeout=60):
# XXX: construct the trash dir path (note that there is no mgr
group = subvol_group if subvol_group is not None else '_nogroup'
metapath = os.path.join(".", "volumes", group, subvol_name, ".meta")
- out = self.mount_a.run_shell(['cat', metapath], sudo=True)
+ out = self.mount_a.run_shell(['sudo', 'cat', metapath], omit_sudo=False)
lines = out.stdout.getvalue().strip().split('\n')
sv_version = -1
for line in lines:
basepath = os.path.join("volumes", group, subvol_name)
uuid_str = str(uuid.uuid4())
createpath = os.path.join(basepath, uuid_str)
- self.mount_a.run_shell(['mkdir', '-p', createpath], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', createpath], omit_sudo=False)
# create a v1 snapshot, to prevent auto upgrades
if has_snapshot:
snappath = os.path.join(createpath, ".snap", "fake")
- self.mount_a.run_shell(['mkdir', '-p', snappath], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', snappath], omit_sudo=False)
# add required xattrs to subvolume
default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
group = subvol_group if subvol_group is not None else '_nogroup'
trashpath = os.path.join("volumes", group, subvol_name, '.trash', trash_name)
if create:
- self.mount_a.run_shell(['mkdir', '-p', trashpath], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', trashpath], omit_sudo=False)
else:
- self.mount_a.run_shell(['rmdir', trashpath], sudo=True)
+ self.mount_a.run_shell(['sudo', 'rmdir', trashpath], omit_sudo=False)
def _configure_guest_auth(self, guest_mount, authid, key):
"""
# emulate a old-fashioned subvolume -- in a custom group
createpath1 = os.path.join(".", "volumes", group, subvolume)
- self.mount_a.run_shell(['mkdir', '-p', createpath1], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', createpath1], omit_sudo=False)
# this would auto-upgrade on access without anyone noticing
subvolpath1 = self._fs_cmd("subvolume", "getpath", self.volname, subvolume, "--group-name", group)
# emulate a old-fashioned subvolume in a custom group
createpath = os.path.join(".", "volumes", group, subvolume)
- self.mount_a.run_shell(['mkdir', '-p', createpath], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', createpath], omit_sudo=False)
# add required xattrs to subvolume
default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
# Induce partial auth update state by modifying the auth metadata file,
# and then run authorize again.
- guest_mount.run_shell(['sed', '-i', 's/false/true/g', 'volumes/{0}'.format(auth_metadata_filename)], sudo=True)
+ guest_mount.run_shell(['sudo', 'sed', '-i', 's/false/true/g', 'volumes/{0}'.format(auth_metadata_filename)], omit_sudo=False)
# Authorize 'guestclient_1' to access the subvolume.
self._fs_cmd("subvolume", "authorize", self.volname, subvolume, guestclient_1["auth_id"],
# Induce partial auth update state by modifying the auth metadata file,
# and then run de-authorize.
- guest_mount.run_shell(['sed', '-i', 's/false/true/g', 'volumes/{0}'.format(auth_metadata_filename)], sudo=True)
+ guest_mount.run_shell(['sudo', 'sed', '-i', 's/false/true/g', 'volumes/{0}'.format(auth_metadata_filename)], omit_sudo=False)
# Deauthorize 'guestclient_1' to access the subvolume2.
self._fs_cmd("subvolume", "deauthorize", self.volname, subvolume2, guestclient_1["auth_id"],
self.assertIn(auth_metadata_filename, guest_mount.ls("volumes"))
# Replace 'subvolumes' to 'volumes', old style auth-metadata file
- guest_mount.run_shell(['sed', '-i', 's/subvolumes/volumes/g', 'volumes/{0}'.format(auth_metadata_filename)], sudo=True)
+ guest_mount.run_shell(['sudo', 'sed', '-i', 's/subvolumes/volumes/g', 'volumes/{0}'.format(auth_metadata_filename)], omit_sudo=False)
# Authorize 'guestclient_1' to access the subvolume2. This should transparently update 'volumes' to 'subvolumes'
self._fs_cmd("subvolume", "authorize", self.volname, subvolume2, guestclient_1["auth_id"],
self.assertIn(auth_metadata_filename, guest_mount.ls("volumes"))
# Replace 'subvolumes' to 'volumes', old style auth-metadata file
- guest_mount.run_shell(['sed', '-i', 's/subvolumes/volumes/g', 'volumes/{0}'.format(auth_metadata_filename)], sudo=True)
+ guest_mount.run_shell(['sudo', 'sed', '-i', 's/subvolumes/volumes/g', 'volumes/{0}'.format(auth_metadata_filename)], omit_sudo=False)
# Deauthorize 'guestclient_1' to access the subvolume2. This should update 'volumes' to subvolumes'
self._fs_cmd("subvolume", "deauthorize", self.volname, subvolume2, auth_id, "--group_name", group)
# emulate a old-fashioned subvolume in a custom group
createpath = os.path.join(".", "volumes", group, subvolname)
- self.mount_a.run_shell(['mkdir', '-p', createpath], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', createpath], omit_sudo=False)
# set metadata for subvolume.
key = "key"
# emulate a old-fashioned subvolume in a custom group
createpath = os.path.join(".", "volumes", group, subvolname)
- self.mount_a.run_shell(['mkdir', '-p', createpath], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', createpath], omit_sudo=False)
# set metadata for subvolume.
input_metadata_dict = {f'key_{i}' : f'value_{i}' for i in range(3)}
# Create snapshot at ancestral level
ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", "ancestral_snap_1")
ancestral_snappath2 = os.path.join(".", "volumes", group, ".snap", "ancestral_snap_2")
- self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1, ancestral_snappath2], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', ancestral_snappath1, ancestral_snappath2], omit_sudo=False)
subvolsnapshotls = json.loads(self._fs_cmd('subvolume', 'snapshot', 'ls', self.volname, subvolume, group))
self.assertEqual(len(subvolsnapshotls), snap_count)
# remove ancestral snapshots
- self.mount_a.run_shell(['rmdir', ancestral_snappath1, ancestral_snappath2], sudo=True)
+ self.mount_a.run_shell(['sudo', 'rmdir', ancestral_snappath1, ancestral_snappath2], omit_sudo=False)
# remove snapshot
for snapshot in snapshots:
# Create snapshot at ancestral level
ancestral_snap_name = "ancestral_snap_1"
ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", ancestral_snap_name)
- self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', ancestral_snappath1], omit_sudo=False)
# Validate existence of inherited snapshot
group_path = os.path.join(".", "volumes", group)
self.fail("expected snapshot info of inherited snapshot to fail")
# remove ancestral snapshots
- self.mount_a.run_shell(['rmdir', ancestral_snappath1], sudo=True)
+ self.mount_a.run_shell(['sudo', 'rmdir', ancestral_snappath1], omit_sudo=False)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, "--group_name", group)
# Create snapshot at ancestral level
ancestral_snap_name = "ancestral_snap_1"
ancestral_snappath1 = os.path.join(".", "volumes", group, ".snap", ancestral_snap_name)
- self.mount_a.run_shell(['mkdir', '-p', ancestral_snappath1], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', ancestral_snappath1], omit_sudo=False)
# Validate existence of inherited snap
group_path = os.path.join(".", "volumes", group)
self.fail("expected removing inheirted snapshot to fail")
# remove ancestral snapshots
- self.mount_a.run_shell(['rmdir', ancestral_snappath1], sudo=True)
+ self.mount_a.run_shell(['sudo', 'rmdir', ancestral_snappath1], omit_sudo=False)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
# Create subvolumegroup snapshot
group_snapshot_path = os.path.join(".", "volumes", group, ".snap", group_snapshot)
- self.mount_a.run_shell(['mkdir', '-p', group_snapshot_path], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', group_snapshot_path], omit_sudo=False)
# Validate existence of subvolumegroup snapshot
self.mount_a.run_shell(['ls', group_snapshot_path])
self.fail("expected subvolume snapshot creation with same name as subvolumegroup snapshot to fail")
# remove subvolumegroup snapshot
- self.mount_a.run_shell(['rmdir', group_snapshot_path], sudo=True)
+ self.mount_a.run_shell(['sudo', 'rmdir', group_snapshot_path], omit_sudo=False)
# remove subvolume
self._fs_cmd("subvolume", "rm", self.volname, subvolume, group)
# remove snapshot from backend to force the clone failure.
snappath = os.path.join(".", "volumes", "_nogroup", subvolume, ".snap", snapshot)
- self.mount_a.run_shell(['rmdir', snappath], sudo=True)
+ self.mount_a.run_shell(['sudo', 'rmdir', snappath], omit_sudo=False)
# wait for clone1 to fail.
self._wait_for_clone_to_fail(clone1)
# emulate a old-fashioned subvolume
createpath = os.path.join(".", "volumes", "_nogroup", subvolume)
- self.mount_a.run_shell_payload(f"mkdir -p -m 777 {createpath}", sudo=True)
+ self.mount_a.run_shell_payload(f"sudo mkdir -p -m 777 {createpath}", omit_sudo=False)
# add required xattrs to subvolume
default_pool = self.mount_a.getfattr(".", "ceph.dir.layout.pool")
# emulate a old-fashioned subvolume -- one in the default group and
# the other in a custom group
createpath1 = os.path.join(".", "volumes", "_nogroup", subvolume1)
- self.mount_a.run_shell(['mkdir', '-p', createpath1], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', createpath1], omit_sudo=False)
# create group
createpath2 = os.path.join(".", "volumes", group, subvolume2)
- self.mount_a.run_shell(['mkdir', '-p', createpath2], sudo=True)
+ self.mount_a.run_shell(['sudo', 'mkdir', '-p', createpath2], omit_sudo=False)
# this would auto-upgrade on access without anyone noticing
subvolpath1 = self._fs_cmd("subvolume", "getpath", self.volname, subvolume1)