self.tasks_by_sequence[task.sequence] = task
self.tasks_by_id[task.task_id] = task
+ def task_refs_match(self, task_refs, refs):
+ if TASK_REF_IMAGE_ID not in refs and TASK_REF_IMAGE_ID in task_refs:
+ task_refs = task_refs.copy()
+ del task_refs[TASK_REF_IMAGE_ID]
+
+ self.log.debug("task_refs_match: ref1={}, ref2={}".format(task_refs, refs))
+ return task_refs == refs
+
def find_task(self, refs):
self.log.debug("find_task: refs={}".format(refs))
# search for dups and return the original
- for task_id, task in self.tasks_by_id.items():
- if task.refs == refs:
+ for task_id in reversed(sorted(self.tasks_by_id.keys())):
+ task = self.tasks_by_id[task_id]
+ if self.task_refs_match(task.refs, refs):
return task
# search for a completed task (message replay)
- for task in self.completed_tasks:
- if task.refs == refs:
+ for task in reversed(self.completed_tasks):
+ if self.task_refs_match(task.refs, refs):
return task
def add_task(self, ioctx, message, refs):
# keep a record of the last N tasks to help avoid command replay
# races
if not task.failed and not task.canceled:
+ self.log.debug("remove_task: moving to completed tasks")
self.completed_tasks.append(task)
self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:]
TASK_REF_POOL_NAME: image_spec[0],
TASK_REF_POOL_NAMESPACE: image_spec[1],
TASK_REF_IMAGE_NAME: image_spec[2]}
- task = self.find_task(refs)
- if task:
- return 0, task.to_json(), ''
with self.open_ioctx(image_spec) as ioctx:
- with rbd.Image(ioctx, image_spec[2]) as image:
- try:
- image.parent_id()
- except rbd.ImageNotFound:
- raise rbd.ImageNotFound("Image {} does not have a parent".format(
- self.format_image_spec(image_spec)), errno=errno.ENOENT)
+ try:
+ with rbd.Image(ioctx, image_spec[2]) as image:
+ refs[TASK_REF_IMAGE_ID] = image.id()
+
+ try:
+ parent_image_id = image.parent_id()
+ except rbd.ImageNotFound:
+ parent_image_id = None
+
+ except rbd.ImageNotFound:
+ pass
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ if TASK_REF_IMAGE_ID not in refs:
+ raise rbd.ImageNotFound("Image {} does not exist".format(
+ self.format_image_spec(image_spec)), errno=errno.ENOENT)
+ if not parent_image_id:
+ raise rbd.ImageNotFound("Image {} does not have a parent".format(
+ self.format_image_spec(image_spec)), errno=errno.ENOENT)
return 0, self.add_task(ioctx,
"Flattening image {}".format(
self.format_image_spec(image_spec)),
- {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN,
- TASK_REF_POOL_NAME: image_spec[0],
- TASK_REF_POOL_NAMESPACE: image_spec[1],
- TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+ refs), ""
def queue_remove(self, image_spec):
image_spec = self.extract_image_spec(image_spec)
TASK_REF_POOL_NAME: image_spec[0],
TASK_REF_POOL_NAMESPACE: image_spec[1],
TASK_REF_IMAGE_NAME: image_spec[2]}
- task = self.find_task(refs)
- if task:
- return 0, task.to_json(), ''
with self.open_ioctx(image_spec) as ioctx:
- with rbd.Image(ioctx, image_spec[2]) as image:
- if list(image.list_snaps()):
- raise rbd.ImageBusy("Image {} has snapshots".format(
- self.format_image_spec(image_spec)), errno=errno.EBUSY)
+ try:
+ with rbd.Image(ioctx, image_spec[2]) as image:
+ refs[TASK_REF_IMAGE_ID] = image.id()
+ snaps = list(image.list_snaps())
+
+ except rbd.ImageNotFound:
+ pass
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
+ if TASK_REF_IMAGE_ID not in refs:
+ raise rbd.ImageNotFound("Image {} does not exist".format(
+ self.format_image_spec(image_spec)), errno=errno.ENOENT)
+ if snaps:
+ raise rbd.ImageBusy("Image {} has snapshots".format(
+ self.format_image_spec(image_spec)), errno=errno.EBUSY)
return 0, self.add_task(ioctx,
"Removing image {}".format(
self.format_image_spec(image_id_spec)),
refs), ''
- def validate_image_migrating(self, ioctx, image_spec):
- with rbd.Image(ioctx, image_spec[2]) as image:
- features = image.features()
- if features & rbd.RBD_FEATURE_MIGRATING == 0:
- raise rbd.InvalidArgument("Image {} is not migrating".format(
- self.format_image_spec(image_spec)), errno=errno.EINVAL)
+ def get_migration_status(self, ioctx, image_spec):
+ try:
+ return rbd.RBD().migration_status(ioctx, image_spec[2])
+ except (rbd.InvalidArgument, rbd.ImageNotFound):
+ return None
+
+ def validate_image_migrating(self, migration_status):
+ if not migration_status:
+ raise rbd.InvalidArgument("Image {} is not migrating".format(
+ self.format_image_spec(image_spec)), errno=errno.EINVAL)
def resolve_pool_name(self, pool_id):
osd_map = self.module.get('osd_map')
TASK_REF_POOL_NAME: image_spec[0],
TASK_REF_POOL_NAMESPACE: image_spec[1],
TASK_REF_IMAGE_NAME: image_spec[2]}
- task = self.find_task(refs)
- if task:
- return 0, task.to_json(), ''
with self.open_ioctx(image_spec) as ioctx:
- self.validate_image_migrating(ioctx, image_spec)
+ status = self.get_migration_status(ioctx, image_spec)
+ if status:
+ refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
- status = rbd.RBD().migration_status(ioctx, image_spec[2])
+ self.validate_image_migrating(status)
if status['state'] not in [rbd.RBD_IMAGE_MIGRATION_STATE_PREPARED,
rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTING]:
raise rbd.InvalidArgument("Image {} is not in ready state".format(
TASK_REF_POOL_NAME: image_spec[0],
TASK_REF_POOL_NAMESPACE: image_spec[1],
TASK_REF_IMAGE_NAME: image_spec[2]}
- task = self.find_task(refs)
- if task:
- return 0, task.to_json(), ''
with self.open_ioctx(image_spec) as ioctx:
- self.validate_image_migrating(ioctx, image_spec)
+ status = self.get_migration_status(ioctx, image_spec)
+ if status:
+ refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
- status = rbd.RBD().migration_status(ioctx, image_spec[2])
+ self.validate_image_migrating(status)
if status['state'] != rbd.RBD_IMAGE_MIGRATION_STATE_EXECUTED:
raise rbd.InvalidArgument("Image {} has not completed migration".format(
self.format_image_spec(image_spec)), errno=errno.EINVAL)
TASK_REF_POOL_NAME: image_spec[0],
TASK_REF_POOL_NAMESPACE: image_spec[1],
TASK_REF_IMAGE_NAME: image_spec[2]}
- task = self.find_task(refs)
- if task:
- return 0, task.to_json(), ''
with self.open_ioctx(image_spec) as ioctx:
- self.validate_image_migrating(ioctx, image_spec)
+ status = self.get_migration_status(ioctx, image_spec)
+ if status:
+ refs[TASK_REF_IMAGE_ID] = status['dest_image_id']
+
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+ self.validate_image_migrating(status)
return 0, self.add_task(ioctx,
"Aborting image migration for {}".format(
self.format_image_spec(image_spec)),