log = logging.getLogger(__name__)
# helper for fetching a clone entry for a given volume
-def get_next_clone_entry(volume_client, volname, running_jobs):
+def get_next_clone_entry(fs_client, volspec, volname, running_jobs):
log.debug("fetching clone entry for volume '{0}'".format(volname))
try:
- with open_volume_lockless(volume_client, volname) as fs_handle:
+ with open_volume_lockless(fs_client, volname) as fs_handle:
try:
- with open_clone_index(fs_handle, volume_client.volspec) as clone_index:
+ with open_clone_index(fs_handle, volspec) as clone_index:
job = clone_index.get_oldest_clone_entry(running_jobs)
return 0, job
except IndexException as ve:
return ve.errno, None
@contextmanager
-def open_at_volume(volume_client, volname, groupname, subvolname, op_type):
- with open_volume(volume_client, volname) as fs_handle:
- with open_group(fs_handle, volume_client.volspec, groupname) as group:
- with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
+def open_at_volume(fs_client, volspec, volname, groupname, subvolname, op_type):
+ with open_volume(fs_client, volname) as fs_handle:
+ with open_group(fs_handle, volspec, groupname) as group:
+ with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume:
yield subvolume
@contextmanager
-def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type):
- with open_group(fs_handle, volume_client.volspec, groupname) as group:
- with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
+def open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, op_type):
+ with open_group(fs_handle, volspec, groupname) as group:
+ with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume:
yield subvolume
@contextmanager
-def open_at_group_unique(volume_client, fs_handle, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type):
+def open_at_group_unique(fs_client, fs_handle, volspec, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type):
# if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return
# the clone subvolume as the source subvolume
if s_groupname == c_groupname and s_subvolname == c_subvolname:
yield c_subvolume
else:
- with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, op_type) as s_subvolume:
+ with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, op_type) as s_subvolume:
yield s_subvolume
@contextmanager
-def open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname):
- with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
+def open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname):
+ with open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
if groupname == s_groupname and subvolname == s_subvolname:
# use the same subvolume to avoid metadata overwrites
yield (clone_subvolume, clone_subvolume, s_snapname)
else:
- with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
+ with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
yield (clone_subvolume, source_subvolume, s_snapname)
-def get_clone_state(volume_client, volname, groupname, subvolname):
- with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
+def get_clone_state(fs_client, volspec, volname, groupname, subvolname):
+ with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
return subvolume.state
-def set_clone_state(volume_client, volname, groupname, subvolname, state):
- with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
+def set_clone_state(fs_client, volspec, volname, groupname, subvolname, state):
+ with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
subvolume.state = (state, True)
def get_clone_source(clone_subvolume):
SubvolumeActions.ACTION_FAILED)
return next_state
-def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel):
+def handle_clone_pending(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
try:
if should_cancel():
next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
if should_cancel():
raise VolumeException(-errno.EINTR, "clone operation interrupted")
-def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
- with open_volume_lockless(volume_client, volname) as fs_handle:
- with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+def do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel):
+ with open_volume_lockless(fs_client, volname) as fs_handle:
+ with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2])
dst_path = clone_volumes[0].path
bulk_copy(fs_handle, src_path, dst_path, should_cancel)
-def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
+def handle_clone_in_progress(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
try:
- do_clone(volume_client, volname, groupname, subvolname, should_cancel)
+ do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel)
next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
SubvolumeStates.STATE_INPROGRESS,
SubvolumeActions.ACTION_SUCCESS)
raise VolumeException(oe.errno, oe.error_str)
return (next_state, False)
-def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel):
+def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
try:
- with open_volume(volume_client, volname) as fs_handle:
+ with open_volume(fs_client, volname) as fs_handle:
# detach source but leave the clone section intact for later inspection
- with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+ with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
clone_volumes[1].detach_snapshot(clone_volumes[2], index)
except (MetadataMgrException, VolumeException) as e:
log.error("failed to detach clone from snapshot: {0}".format(e))
return (None, True)
-def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
+def handle_clone_complete(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
try:
- with open_volume(volume_client, volname) as fs_handle:
- with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
+ with open_volume(fs_client, volname) as fs_handle:
+ with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
clone_volumes[1].detach_snapshot(clone_volumes[2], index)
clone_volumes[0].remove_clone_source(flush=True)
except (MetadataMgrException, VolumeException) as e:
log.error("failed to detach clone from snapshot: {0}".format(e))
return (None, True)
-def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay):
+def start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay):
finished = False
current_state = None
try:
- current_state = get_clone_state(volume_client, volname, groupname, subvolname)
+ current_state = get_clone_state(fs_client, volspec, volname, groupname, subvolname)
log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
if current_state == SubvolumeStates.STATE_PENDING:
time.sleep(snapshot_clone_delay)
handler = state_table.get(current_state, None)
if not handler:
raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state))
- (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel)
+ (next_state, finished) = handler(fs_client, volspec, volname, index, groupname, subvolname, should_cancel)
if next_state:
log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\
current_state, next_state))
- set_clone_state(volume_client, volname, groupname, subvolname, next_state)
+ set_clone_state(fs_client, volspec, volname, groupname, subvolname, next_state)
current_state = next_state
except VolumeException as ve:
log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
subvolname, current_state, ve))
-def clone(volume_client, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay):
+def clone(fs_client, volspec, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay):
log.info("cloning to subvolume path: {0}".format(clone_path))
- resolved = resolve(volume_client.volspec, clone_path)
+ resolved = resolve(volspec, clone_path)
groupname = resolved[0]
subvolname = resolved[1]
try:
log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
- start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay)
+ start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay)
log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
except VolumeException as ve:
log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
s_subvolname = status['source']['subvolume']
s_snapname = status['source']['snapshot']
- with open_at_group_unique(self.vc, fs_handle, s_groupname, s_subvolname, clone_subvolume, clone_groupname,
- clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
+ with open_at_group_unique(self.fs_client, fs_handle, self.vc.volspec, s_groupname, s_subvolname, clone_subvolume,
+ clone_groupname, clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
clone_state,
SubvolumeActions.ACTION_CANCELLED)
track_idx = None
try:
- with open_volume(self.vc, volname) as fs_handle:
+ with open_volume(self.fs_client, volname) as fs_handle:
with open_group(fs_handle, self.vc.volspec, groupname) as group:
- with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
+ with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
status = clone_subvolume.status
clone_state = SubvolumeStates.from_value(status['state'])
if not self.is_clone_cancelable(clone_state):
# accessing the volume in exclusive mode here would lead to deadlock.
assert track_idx is not None
with self.lock:
- with open_volume_lockless(self.vc, volname) as fs_handle:
+ with open_volume_lockless(self.fs_client, volname) as fs_handle:
with open_group(fs_handle, self.vc.volspec, groupname) as group:
- with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
+ with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
except (IndexException, MetadataMgrException) as e:
raise VolumeException(-errno.EINVAL, "error canceling clone")
def get_next_job(self, volname, running_jobs):
- return get_next_clone_entry(self.vc, volname, running_jobs)
+ return get_next_clone_entry(self.fs_client, self.vc.volspec, volname, running_jobs)
def execute_job(self, volname, job, should_cancel):
- clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay)
+ clone(self.fs_client, self.vc.volspec, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay)
log = logging.getLogger(__name__)
# helper for fetching a trash entry for a given volume
-def get_trash_entry_for_volume(volume_client, volname, running_jobs):
+def get_trash_entry_for_volume(fs_client, volspec, volname, running_jobs):
log.debug("fetching trash entry for volume '{0}'".format(volname))
try:
- with open_volume_lockless(volume_client, volname) as fs_handle:
+ with open_volume_lockless(fs_client, volname) as fs_handle:
try:
- with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+ with open_trashcan(fs_handle, volspec) as trashcan:
path = trashcan.get_trash_entry(running_jobs)
return 0, path
except VolumeException as ve:
log.error("error fetching trash entry for volume '{0}' ({1})".format(volname, ve))
return ve.errno, None
-def subvolume_purge(volume_client, volname, trashcan, subvolume_trash_entry, should_cancel):
- groupname, subvolname = resolve_trash(volume_client.volspec, subvolume_trash_entry.decode('utf-8'))
+def subvolume_purge(fs_client, volspec, volname, trashcan, subvolume_trash_entry, should_cancel):
+ groupname, subvolname = resolve_trash(volspec, subvolume_trash_entry.decode('utf-8'))
log.debug("subvolume resolved to {0}/{1}".format(groupname, subvolname))
try:
- with open_volume(volume_client, volname) as fs_handle:
- with open_group(fs_handle, volume_client.volspec, groupname) as group:
- with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
+ with open_volume(fs_client, volname) as fs_handle:
+ with open_group(fs_handle, volspec, groupname) as group:
+ with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, SubvolumeOpType.REMOVE) as subvolume:
log.debug("subvolume.path={0}, purgeable={1}".format(subvolume.path, subvolume.purgeable))
if not subvolume.purgeable:
return
raise
# helper for starting a purge operation on a trash entry
-def purge_trash_entry_for_volume(volume_client, volname, purge_entry, should_cancel):
+def purge_trash_entry_for_volume(fs_client, volspec, volname, purge_entry, should_cancel):
log.debug("purging trash entry '{0}' for volume '{1}'".format(purge_entry, volname))
ret = 0
try:
- with open_volume_lockless(volume_client, volname) as fs_handle:
- with open_trashcan(fs_handle, volume_client.volspec) as trashcan:
+ with open_volume_lockless(fs_client, volname) as fs_handle:
+ with open_trashcan(fs_handle, volspec) as trashcan:
try:
pth = os.path.join(trashcan.path, purge_entry)
stx = fs_handle.statx(pth, cephfs.CEPH_STATX_MODE | cephfs.CEPH_STATX_SIZE,
return ve.errno
finally:
if delink:
- subvolume_purge(volume_client, volname, trashcan, tgt, should_cancel)
+ subvolume_purge(fs_client, volspec, volname, trashcan, tgt, should_cancel)
log.debug("purging trash link: {0}".format(purge_entry))
trashcan.delink(purge_entry)
else:
super(ThreadPoolPurgeQueueMixin, self).__init__(volume_client, "puregejob", tp_size)
def get_next_job(self, volname, running_jobs):
- return get_trash_entry_for_volume(self.vc, volname, running_jobs)
+ return get_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, running_jobs)
def execute_job(self, volname, job, should_cancel):
- purge_trash_entry_for_volume(self.vc, volname, job, should_cancel)
+ purge_trash_entry_for_volume(self.fs_client, self.vc.volspec, volname, job, should_cancel)