'pending' : handle_clone_pending,
'in-progress' : handle_clone_in_progress,
'complete' : handle_clone_complete,
- 'failed' : handle_clone_failed
+ 'failed' : handle_clone_failed,
+ 'canceled' : handle_clone_failed,
}
super(Cloner, self).__init__(volume_client, "cloner", tp_size)
+ def is_clone_cancelable(self, clone_state):
+ return not (OpSm.is_final_state(clone_state) or OpSm.is_failed_state(clone_state))
+
+ def get_clone_tracking_index(self, fs_handle, clone_subvolume):
+ with open_clone_index(fs_handle, self.vc.volspec) as index:
+ return index.find_clone_entry_index(clone_subvolume.base_path)
+
+ def _cancel_pending_clone(self, fs_handle, clone_subvolume, status, track_idx):
+ clone_state = status['state']
+ assert self.is_clone_cancelable(clone_state)
+
+ s_groupname = status['source'].get('group', None)
+ s_subvolname = status['source']['subvolume']
+ s_snapname = status['source']['snapshot']
+
+ with open_group(fs_handle, self.vc.volspec, s_groupname) as s_group:
+ with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname) as s_subvolume:
+ next_state = OpSm.get_next_state("clone", clone_state, -errno.EINTR)
+ clone_subvolume.state = (next_state, True)
+ s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
+
+ def cancel_job(self, volname, job):
+ """
+ override base class `cancel_job`. interpret @job as (clone, group) tuple.
+ """
+ clonename = job[0]
+ groupname = job[1]
+ track_idx = None
+
+ try:
+ with open_volume(self.vc, volname) as fs_handle:
+ with open_group(fs_handle, self.vc.volspec, groupname) as group:
+ with open_subvol(fs_handle, self.vc.volspec, group, clonename,
+ need_complete=False, expected_types=["clone"]) as clone_subvolume:
+ status = clone_subvolume.status
+ clone_state = status['state']
+ if not self.is_clone_cancelable(clone_state):
+ raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
+ track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume)
+ if not track_idx:
+ log.warn("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path))
+ raise VolumeException(-errno.EINVAL, "error canceling clone")
+ if OpSm.is_init_state("clone", clone_state):
+ # clone has not started yet -- cancel right away.
+ self._cancel_pending_clone(fs_handle, clone_subvolume, status, track_idx)
+ return
+ # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
+ # to persist the new state, async cloner accesses the volume in exclusive mode.
+ # accessing the volume in exclusive mode here would lead to deadlock.
+ assert track_idx is not None
+ with self.lock:
+ with open_volume_lockless(self.vc, volname) as fs_handle:
+ with open_group(fs_handle, self.vc.volspec, groupname) as group:
+ with open_subvol(fs_handle, self.vc.volspec, group, clonename,
+ need_complete=False, expected_types=["clone"]) as clone_subvolume:
+ if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
+ raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
+ except (IndexException, MetadataMgrException) as e:
+ log.error("error cancelling clone {0}: ({1})".format(job, e))
+ raise VolumeException(-errno.EINVAL, "error canceling clone")
+
def get_next_job(self, volname, running_jobs):
return get_next_clone_entry(self.vc, volname, running_jobs)
'desc': "Get status on a cloned subvolume.",
'perm': 'r'
},
+ {
+ 'cmd': 'fs clone cancel '
+ 'name=vol_name,type=CephString '
+ 'name=clone_name,type=CephString '
+ 'name=group_name,type=CephString,req=false ',
+ 'desc': "Cancel an pending or ongoing clone operation.",
+ 'perm': 'r'
+ },
# volume ls [recursive]
# subvolume ls <volume>
def _cmd_fs_clone_status(self, inbuf, cmd):
return self.vc.clone_status(
vol_name=cmd['vol_name'], clone_name=cmd['clone_name'], group_name=cmd.get('group_name', None))
+
+ def _cmd_fs_clone_cancel(self, inbuf, cmd):
+ return self.vc.clone_cancel(
+ vol_name=cmd['vol_name'], clone_name=cmd['clone_name'], group_name=cmd.get('group_name', None))