groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
subvolname = parts[3]
return (groupname, subvolname)
+
+def resolve_trash(vol_spec, path):
+ parts = splitall(path)
+ if len(parts) != 6 or os.path.join(parts[0], parts[1]) != vol_spec.subvolume_prefix or \
+ parts[4] != '.trash':
+ return None
+ groupname = None if parts[2] == Group.NO_GROUP_NAME else parts[2]
+ subvolname = parts[3]
+ return (groupname, subvolname)
with self.fs.opendir(self.path) as d:
entry = self.fs.readdir(d)
while entry:
- if entry.d_name not in exclude_list and entry.is_dir():
+ if entry.d_name not in exclude_list:
return entry.d_name
entry = self.fs.readdir(d)
return None
"""
return self._get_single_dir_entry(exclude_list)
- def purge(self, trash_entry, should_cancel):
+ def purge(self, trashpath, should_cancel):
"""
purge a trash entry.
if not should_cancel():
self.fs.rmdir(root_path)
- trashpath = os.path.join(self.path, trash_entry)
# catch any unlink errors
try:
rmtree(trashpath)
except cephfs.Error as e:
raise VolumeException(-e.args[0], e.args[1])
+ def link(self, path, bname):
+ pth = os.path.join(self.path, bname)
+ try:
+ self.fs.symlink(path, pth)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
+ def delink(self, bname):
+ pth = os.path.join(self.path, bname)
+ try:
+ self.fs.unlink(pth)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
+
def create_trashcan(fs, vol_spec):
"""
create a trash can.
trashcan.dump(path)
log.info("subvolume path '{0}' moved to trashcan".format(path))
+ def _link_dir(self, path, bname):
+ create_trashcan(self.fs, self.vol_spec)
+ with open_trashcan(self.fs, self.vol_spec) as trashcan:
+ trashcan.link(path, bname)
+ log.info("subvolume path '{0}' linked in trashcan bname {1}".format(path, bname))
+
def trash_base_dir(self):
if self.legacy_mode:
self.fs.unlink(self.legacy_config_path)
from .subvolume_v1 import SubvolumeV1
from ..template import SubvolumeTemplate
from ...exception import OpSmException, VolumeException, MetadataMgrException
+from ...fs_util import listdir
from ..template import SubvolumeOpType
log = logging.getLogger(__name__)
raise VolumeException(me.errno, "internal error while processing subvolume '{0}'".format(self.subvolname))
return False
+ @property
+ def is_in_use(self):
+ return not self.path == b''
+
+ @property
+ def has_pending_purges(self):
+ try:
+ return not listdir(self.fs, self.trash_dir) == []
+ except VolumeException as ve:
+ if ve.errno == -errno.ENOENT:
+ return False
+ raise
+
+ @property
+ def trash_dir(self):
+ return os.path.join(self.base_path, b".trash")
+
+ def create_trashcan(self):
+ """per subvolume trash directory"""
+ try:
+ self.fs.stat(self.trash_dir)
+ except cephfs.Error as e:
+ if e.args[0] == errno.ENOENT:
+ try:
+ log.debug("creating trash can: {0}".format(self.trash_dir))
+ self.fs.mkdir(self.trash_dir, 0o700)
+ except cephfs.Error as ce:
+ raise VolumeException(-ce.args[0], ce.args[1])
+ else:
+ raise VolumeException(-e.args[0], e.args[1])
+
@staticmethod
def is_valid_uuid(uuid_str):
try:
self.uid = int(st.st_uid)
self.gid = int(st.st_gid)
self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
+ self.create_trashcan()
except MetadataMgrException as me:
if me.errno == -errno.ENOENT:
raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
raise VolumeException(-e.args[0], e.args[1])
def trash_incarnation_dir(self):
- self._trash_dir(self.path)
+ """rename subvolume (uuid component) to trash"""
+ try:
+ bname = os.path.basename(self.path)
+ tpath = os.path.join(self.trash_dir, bname)
+ log.debug("trash: {0} -> {1}".format(self.path, tpath))
+ self.fs.rename(self.path, tpath)
+ self._link_dir(tpath, bname)
+ except cephfs.Error as e:
+ raise VolumeException(-e.args[0], e.args[1])
def remove(self, retainsnaps=False):
if self.list_snapshots():
if not retainsnaps:
raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
- if self.state != SubvolumeStates.STATE_RETAINED:
- self.trash_incarnation_dir()
- self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
- self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
- self.metadata_mgr.flush()
- else:
- self.trash_base_dir()
+ if self.state != SubvolumeStates.STATE_RETAINED:
+ self.trash_incarnation_dir()
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_PATH, "")
+ self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, SubvolumeStates.STATE_RETAINED.value)
+ self.metadata_mgr.flush()
def info(self):
if self.state != SubvolumeStates.STATE_RETAINED:
def remove_snapshot(self, snapname):
super(SubvolumeV2, self).remove_snapshot(snapname)
if self.state == SubvolumeStates.STATE_RETAINED and not self.list_snapshots():
- self.trash_base_dir()
+ # fake a trash entry for purge threads to find a job
+ bname = str(uuid.uuid4()).encode('utf-8')
+ self._link_dir(os.path.join(self.trash_dir, bname), bname)
# tickle the volume purge job to purge this entry, using ESTALE
raise VolumeException(-errno.ESTALE, "subvolume '{0}' has been removed as the last retained snapshot is removed".format(self.subvolname))
import errno
import logging
+import os
+import stat
+
+import cephfs
from .async_job import AsyncJobs
from .exception import VolumeException
+from .operations.resolver import resolve_trash
+from .operations.template import SubvolumeOpType
+from .operations.group import open_group
+from .operations.subvolume import open_subvol
from .operations.volume import open_volume, open_volume_lockless
from .operations.trash import open_trashcan
log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
return ve.errno, None
+def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, should_cancel):
+ groupname, subvolname = resolve_trash(volume_client.volspec, subvolume_trash_entry.decode('utf-8'))
+ log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
+
+ try:
+ with open_volume(volume_client, volname) as fs_handle:
+ with open_group(fs_handle, volume_client.volspec, groupname) as group:
+ with open_subvol(fs_handle, volume_client.volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
+ log.debug("subvolume.path={0}".format(subvolume.path))
+ log.debug("subvolume.is_in_use={0}".format(subvolume.is_in_use))
+ log.debug("subvolume.has_pending_purges={0}".format(subvolume.has_pending_purges))
+ log.debug("subvolume.list_snapshots={0}".format(subvolume.list_snapshots()))
+ if subvolume.is_in_use or subvolume.has_pending_purges or subvolume.list_snapshots():
+ log.debug("not purging subvolume -- bailing out.")
+ return
+ # this is fine under the global lock -- there are just a handful
+ # of entries in the subvolume to purge. moreover, the purge needs
+ # to be guarded since a create request might sneak in.
+ trashcan.purge(subvolume.base_path, should_cancel)
+ except VolumeException as ve:
+ if not ve.errno == -errno.ENOENT:
+ raise
+
# helper for starting a purge operation on a trash entry
-def purge_trash_entry_for_volume(volume_client, volname, purge_dir, should_cancel):
- log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_dir, volname))
+def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_cancel):
+ log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
ret = 0
try:
with open_volume_lockless(volume_client, volname) as fs_handle:
with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
- trashcan.purge(purge_dir, should_cancel)
+ try:
+ pth = os.path.join(trashcan.path, purge_entry)
+ stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
+ cephfs.AT_SYMLINK_NOFOLLOW)
+ if stat.S_ISLNK(stx['mode']):
+ tgt = fs_handle.readlink(pth, 4096)
+ tgt = tgt[:stx['size']]
+ log.debug("entry points to subvolume trash: {0}".format(tgt))
+ delink = True
+ try:
+ log.debug("purging subvolume trash: {0}".format(tgt))
+ trashcan.purge(tgt, should_cancel)
+ except VolumeException as ve:
+ if not ve.errno == -errno.ENOENT:
+ delink = False
+ return ve.errno
+ finally:
+ if delink:
+ subvolume_purge(volume_client, volname, trashcan, tgt, should_cancel)
+ log.debug("purging trash link: {0}".format(purge_entry))
+ trashcan.delink(purge_entry)
+ else:
+ log.debug("entry points to trash: {0}".format(pth))
+ trashcan.purge(pth)
+ except cephfs.Error as e:
+ log.warn("failed to remove trash entry: {0}".format(e))
except VolumeException as ve:
ret = ve.errno
return ret