"""
dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
try:
- ret = self.rados(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
+ self.rados(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
except CommandFailedError as e:
log.error(e.__str__())
raise ObjectNotFound(dir_ino)
import json
import time
import logging
-import re
import six
from textwrap import dedent
run_cmd.extend(fuse_cmd)
def list_connections():
- from teuthology.misc import get_system_type
-
conn_dir = "/sys/fs/fuse/connections"
self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
import json
import logging
-import time
from textwrap import dedent
from teuthology.orchestra.run import CommandFailedError
-from teuthology import misc
-
-from teuthology.orchestra import remote as orchestra_remote
from teuthology.orchestra import run
from teuthology.contextutil import MaxWhileTries
from tasks.cephfs.mount import CephFSMount
from teuthology.orchestra import run
from teuthology.orchestra.run import CommandFailedError, ConnectionLostError
from tasks.cephfs.filesystem import Filesystem
-import platform
log = logging.getLogger(__name__)
@netns_name.setter
def netns_name(self, name):
- if not isinstance(path, str):
- raise RuntimeError('path should be of str type.')
self._netns_name = name
def is_mounted(self):
n = "test_add_data_pool_ec"
self._setup_ec_pools(n, metadata=False)
- p = self.fs.add_data_pool(n+"-data", create=False)
+ self.fs.add_data_pool(n+"-data", create=False)
def test_new_default_ec(self):
"""
"""
from io import StringIO
from os import path
-from os import getcwd as os_getcwd
import crypt
import logging
from tempfile import mkstemp as tempfile_mkstemp
-from tempfile import mkdtemp as tempfile_mkdtemp
import math
from six import ensure_str
from sys import version_info as sys_version_info
from time import sleep
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.misc import sudo_write_file
-from teuthology.misc import sh as misc_sh
from teuthology.orchestra.run import CommandFailedError
log = logging.getLogger(__name__)
except AssertionError as e:
log.debug("%s", e)
return False
- status = self.wait_until_true(takeover, 30)
+ self.wait_until_true(takeover, 30)
def test_join_fs_runtime(self):
"""
That a standby with mds_join_fs set to another fs is still used if necessary.
"""
status, target = self._verify_init()
- active = self.fs.get_active_names(status=status)[0]
standbys = [info['name'] for info in status.get_standbys()]
for mds in standbys:
self.config_set('mds.'+mds, 'mds_join_fs', 'cephfs2')
import logging
import os
from textwrap import dedent
-import time
try:
from typing import Optional
except:
from tasks.cephfs.cephfs_test_case import CephFSTestCase
-import random
-import os
class TestMetaInjection(CephFSTestCase):
def test_meta_injection(self):
import time
import logging
from tasks.cephfs.cephfs_test_case import CephFSTestCase
-from teuthology.exceptions import CommandFailedError
-from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology
log = logging.getLogger(__name__)
mds0_openfiles.1 to hold the extra keys.
"""
- stat_out = self.fs.rados(["stat", "mds0_openfiles.1"])
+ self.fs.rados(["stat", "mds0_openfiles.1"])
# Now close the file
self.mount_a.kill_background(p)
-import sys
import logging
import signal
from textwrap import dedent
subvolpath = self._get_subvolume_path(self.volname, subvolume, group_name=subvolume_group)
reg_file = "regfile.0"
- reg_path = os.path.join(subvolpath, reg_file)
dir_path = os.path.join(subvolpath, "dir.0")
sym_path1 = os.path.join(subvolpath, "sym.0")
# this symlink's ownership would be changed
sym_path2 = os.path.join(dir_path, "sym.0")
- #self.mount_a.write_n_mb(reg_path, TestVolumes.DEFAULT_FILE_SIZE)
self.mount_a.run_shell(["sudo", "mkdir", dir_path], omit_sudo=False)
self.mount_a.run_shell(["sudo", "ln", "-s", "./{}".format(reg_file), sym_path1], omit_sudo=False)
self.mount_a.run_shell(["sudo", "ln", "-s", "./{}".format(reg_file), sym_path2], omit_sudo=False)