return pending_clones_info
- def remove_snapshot(self, snapname, force=False):
+ def remove_snapshot(self, snapname, force=False, uuid=None):
if self.has_pending_clones(snapname):
raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname))
- snappath = self.snapshot_path(snapname)
+ snappath = self.snapshot_path(snapname, uuid=uuid)
try:
self.metadata_mgr.remove_section(self.get_snap_section_name(snapname))
self.metadata_mgr.flush()
return {'type': self.subvol_type.value, 'features': self.features, 'state': SubvolumeStates.STATE_RETAINED.value}
- def remove_snapshot(self, snapname, force=False):
- super(SubvolumeV2, self).remove_snapshot(snapname, force)
+ def create_snapshot(self, snap_name):
+ super(SubvolumeV2, self).create_snapshot(snap_name)
+
+ def remove_snapshot(self, snapname, force=False, uuid=None):
+ super(SubvolumeV2, self).remove_snapshot(snapname, force=force, uuid=uuid)
if self.purgeable:
self.trash_base_dir()
# tickle the volume purge job to purge this entry, using ESTALE
# following are methods that help or do snapshot creation
+ def snapshot_path(self, snap_name, uuid=None):
+ '''
+ Path to a specific snapshot named 'snap_name'.
+ '''
+ if uuid:
+ return join(self.roots_dir, uuid,
+ self.vol_spec.snapshot_prefix.encode('utf-8'),
+ snap_name.encode('utf-8'))
+ else:
+ return join(self.snapshot_base_path(), snap_name.encode('utf-8'))
+
def snapshot_base_path(self):
return self.snap_dir
+ def get_incar_uuid_for_snap(self, snap_name):
+ '''
+ Return incarnation's UUID in which the snapshot name is present.
+ When multiple incarnations for a subvolume exists, check if a snap
+ exists in one of the incarnations.
+ '''
+ # list of all incarnations/UUID dirs of this subvolume.
+ incars = listdir(self.fs, self.roots_dir)
+
+ for incar_uuid in incars:
+ # construct path to ".snap" directory for given UUID.
+ snap_dir = join(self.roots_dir, incar_uuid,
+ self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
+ all_snap_names = listdir(self.fs, snap_dir)
+ # encode since listdir() call above returns list of bytes and list
+ # of str
+ if snap_name.encode('utf-8') in all_snap_names:
+ return incar_uuid
+
+ return None
+
+ def create_snapshot(self, snap_name):
+ if self.get_incar_uuid_for_snap(snap_name) != None:
+ raise VolumeException(errno.EEXIST,
+ f'subvolume \'{snap_name}\' already exists')
+
+ super(SubvolumeV3, self).create_snapshot(snap_name)
+
+ def remove_snapshot(self, snap_name, force):
+ uuid = self.get_incar_uuid_for_snap(snap_name)
+ if uuid == None:
+ raise VolumeException(errno.ENOENT,
+ f'subvolume \'{snap_name}\' does not exists')
+
+ super(SubvolumeV3, self).remove_snapshot(snap_name, force=force,
+ uuid=uuid)
+
def remove_but_retain_snaps(self):
assert self.state != SubvolumeStates.STATE_RETAINED
except MetadataMgrException as e:
log.error(f"failed to write config: {e}")
raise VolumeException(e.args[0], e.args[1])
-
# in subvol v3, self.mnt_dir (AKA data dir) is renamed to ".unlinked" if
# subvol is deleted but snapshots are retained.
def trash_incarnation_dir(self):
SubvolumeStates.STATE_RETAINED.value)
self.metadata_mgr.flush()
+
# Following methods help clone operation -