TASK_REF_ACTION_MIGRATION_ABORT]
TASK_RETRY_INTERVAL = timedelta(seconds=30)
+MAX_COMPLETED_TASKS = 50
def extract_pool_key(pool_spec):
tasks_by_sequence = dict()
tasks_by_id = dict()
+ completed_tasks = []
+
sequence = 0
def __init__(self, module):
self.tasks_by_sequence[task.sequence] = task
self.tasks_by_id[task.task_id] = task
- def add_task(self, ioctx, message, refs):
- self.log.debug("add_task: message={}, refs={}".format(message, refs))
+ def find_task(self, refs):
+ self.log.debug("find_task: refs={}".format(refs))
# search for dups and return the original
for task_id, task in self.tasks_by_id.items():
if task.refs == refs:
- return json.dumps(task.to_dict())
+ return task
+
+ # search for a completed task (message replay)
+ for task in self.completed_tasks:
+ if task.refs == refs:
+ return task
+
+ def add_task(self, ioctx, message, refs):
+ self.log.debug("add_task: message={}, refs={}".format(message, refs))
# ensure unique uuid across all pools
while True:
try:
del self.tasks_by_id[task.task_id]
del self.tasks_by_sequence[task.sequence]
+
+ # keep a record of the last N tasks to help avoid command replay
+ # races
+ if not task.failed and not task.canceled:
+ self.completed_tasks.append(task)
+ self.completed_tasks = self.completed_tasks[-MAX_COMPLETED_TASKS:]
+
except KeyError:
pass
image_spec = self.extract_image_spec(image_spec)
self.log.info("queue_flatten: {}".format(image_spec))
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_FLATTEN,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
with self.open_ioctx(image_spec) as ioctx:
with rbd.Image(ioctx, image_spec[2]) as image:
try:
image_spec = self.extract_image_spec(image_spec)
self.log.info("queue_remove: {}".format(image_spec))
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
with self.open_ioctx(image_spec) as ioctx:
with rbd.Image(ioctx, image_spec[2]) as image:
if list(image.list_snaps()):
return 0, self.add_task(ioctx,
"Removing image {}".format(
self.format_image_spec(image_spec)),
- {TASK_REF_ACTION: TASK_REF_ACTION_REMOVE,
- TASK_REF_POOL_NAME: image_spec[0],
- TASK_REF_POOL_NAMESPACE: image_spec[1],
- TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+ refs), ''
def queue_trash_remove(self, image_id_spec):
image_id_spec = self.extract_image_spec(image_id_spec)
self.log.info("queue_trash_remove: {}".format(image_id_spec))
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
+ TASK_REF_POOL_NAME: image_id_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_id_spec[1],
+ TASK_REF_IMAGE_ID: image_id_spec[2]}
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
# verify that image exists in trash
with self.open_ioctx(image_id_spec) as ioctx:
rbd.RBD().trash_get(ioctx, image_id_spec[2])
return 0, self.add_task(ioctx,
"Removing image {} from trash".format(
self.format_image_spec(image_id_spec)),
- {TASK_REF_ACTION: TASK_REF_ACTION_TRASH_REMOVE,
- TASK_REF_POOL_NAME: image_id_spec[0],
- TASK_REF_POOL_NAMESPACE: image_id_spec[1],
- TASK_REF_IMAGE_ID: image_id_spec[2]}), ""
+ refs), ''
def validate_image_migrating(self, ioctx, image_spec):
with rbd.Image(ioctx, image_spec[2]) as image:
image_spec = self.extract_image_spec(image_spec)
self.log.info("queue_migration_execute: {}".format(image_spec))
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
with self.open_ioctx(image_spec) as ioctx:
self.validate_image_migrating(ioctx, image_spec)
self.format_image_spec((dest_pool,
status['dest_pool_namespace'],
status['dest_image_name']))),
- {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_EXECUTE,
- TASK_REF_POOL_NAME: image_spec[0],
- TASK_REF_POOL_NAMESPACE: image_spec[1],
- TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+ refs), ''
def queue_migration_commit(self, image_spec):
image_spec = self.extract_image_spec(image_spec)
self.log.info("queue_migration_commit: {}".format(image_spec))
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
with self.open_ioctx(image_spec) as ioctx:
self.validate_image_migrating(ioctx, image_spec)
return 0, self.add_task(ioctx,
"Committing image migration for {}".format(
self.format_image_spec(image_spec)),
- {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_COMMIT,
- TASK_REF_POOL_NAME: image_spec[0],
- TASK_REF_POOL_NAMESPACE: image_spec[1],
- TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+ refs), ''
def queue_migration_abort(self, image_spec):
image_spec = self.extract_image_spec(image_spec)
self.log.info("queue_migration_abort: {}".format(image_spec))
+ refs = {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
+ TASK_REF_POOL_NAME: image_spec[0],
+ TASK_REF_POOL_NAMESPACE: image_spec[1],
+ TASK_REF_IMAGE_NAME: image_spec[2]}
+ task = self.find_task(refs)
+ if task:
+ return 0, task.to_json(), ''
+
with self.open_ioctx(image_spec) as ioctx:
self.validate_image_migrating(ioctx, image_spec)
return 0, self.add_task(ioctx,
"Aborting image migration for {}".format(
self.format_image_spec(image_spec)),
- {TASK_REF_ACTION: TASK_REF_ACTION_MIGRATION_ABORT,
- TASK_REF_POOL_NAME: image_spec[0],
- TASK_REF_POOL_NAMESPACE: image_spec[1],
- TASK_REF_IMAGE_NAME: image_spec[2]}), ""
+ refs), ''
def task_cancel(self, task_id):
self.log.info("task_cancel: {}".format(task_id))