global cephfs
cephfs.shutdown()
-def purge_dir(path, is_snap = False):
- print(b"Purge " + path)
- d = cephfs.opendir(path)
- if (not path.endswith(b"/")):
- path = path + b"/"
- dent = cephfs.readdir(d)
- while dent:
- if (dent.d_name not in [b".", b".."]):
- print(path + dent.d_name)
- if dent.is_dir():
- if (not is_snap):
- try:
- snappath = path + dent.d_name + b"/.snap"
- cephfs.stat(snappath)
- purge_dir(snappath, True)
- except:
- pass
- purge_dir(path + dent.d_name, False)
- cephfs.rmdir(path + dent.d_name)
- else:
- print("rmsnap on {} snap {}".format(path, dent.d_name))
- cephfs.rmsnap(path, dent.d_name);
- else:
- cephfs.unlink(path + dent.d_name)
- dent = cephfs.readdir(d)
- cephfs.closedir(d)
-
@pytest.fixture
def testdir():
- purge_dir(b"/")
+ print(f'purging entire file hierarchy under "/"...')
+ cephfs.rmtree(b'/')
cephfs.chdir(b"/")
_, ret_buf = cephfs.listxattr("/")