$ ceph fs subvolume snapshot clone <vol_name> <subvol_name> <snap_name> <target_subvol_name> --pool_layout <pool_layout>
+Configure maximum number of concurrent clones. The default is set to 4::
+
+ $ ceph config set mgr mgr/volumes/max_concurrent_clones <value>
+
To check the status of a clone operation use::
$ ceph fs clone status <vol_name> <clone_name> [--group_name <group_name>]
# verify trash dir is clean
self._wait_for_trash_empty()
+ def test_subvolume_snapshot_reconf_max_concurrent_clones(self):
+ """
+ Validate 'max_concurrent_clones' config option
+ """
+
+ # get the default number of cloner threads
+ default_max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+ self.assertEqual(default_max_concurrent_clones, 4)
+
+ # Increase number of cloner threads
+ self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 6)
+ max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+ self.assertEqual(max_concurrent_clones, 6)
+
+ # Decrease number of cloner threads
+ self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
+ max_concurrent_clones = int(self.config_get('mgr', 'mgr/volumes/max_concurrent_clones'))
+ self.assertEqual(max_concurrent_clones, 2)
+
def test_subvolume_snapshot_clone_pool_layout(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
}
super(Cloner, self).__init__(volume_client, "cloner", tp_size)
+ def reconfigure_max_concurrent_clones(self, tp_size):
+ super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
+
def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
thread_id = threading.currentThread()
assert isinstance(thread_id, JobThread)
thread_name = thread_id.getName()
+ log.debug("thread [{0}] starting".format(thread_name))
while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
vol_job = None
# fetch next job to execute
with self.async_job.lock:
while True:
+ if self.should_reconfigure_num_threads():
+ log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
+ self.async_job.threads.remove(self)
+ return
vol_job = self.async_job.get_job()
if vol_job:
break
time.sleep(1)
log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+ with self.async_job.lock:
+ self.async_job.threads.remove(self)
+
+ def should_reconfigure_num_threads(self):
+ # reconfigure of max_concurrent_clones
+ return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
def cancel_job(self):
self.cancel_event.set()
# cv for job cancelation
self.waiting = False
self.cancel_cv = threading.Condition(self.lock)
+ self.nr_concurrent_jobs = nr_concurrent_jobs
self.threads = []
for i in range(nr_concurrent_jobs):
self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
self.threads[-1].start()
+ def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
+ """
+ reconfigure number of cloner threads
+ """
+ with self.lock:
+ self.nr_concurrent_jobs = nr_concurrent_jobs
+ # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
+ if len(self.threads) > nr_concurrent_jobs:
+ self.cv.notifyAll()
+ # Increase in concurrency
+ if len(self.threads) < nr_concurrent_jobs:
+ for i in range(len(self.threads), nr_concurrent_jobs):
+ self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
+ self.threads[-1].start()
+
def get_job(self):
log.debug("processing {0} volume entries".format(len(self.q)))
nr_vols = len(self.q)
super().__init__(mgr)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
- # TODO: make thread pool size configurable
- self.cloner = Cloner(self, 4)
+ self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
import json
import logging
import traceback
+import threading
from mgr_module import MgrModule
import orchestrator
# volume in the lifetime of this module instance.
]
+ MODULE_OPTIONS = [
+ {
+ 'name': 'max_concurrent_clones',
+ 'type': 'int',
+ 'default': 4,
+ 'desc': 'Number of asynchronous cloner threads',
+ }
+ ]
+
def __init__(self, *args, **kwargs):
+ self.inited = False
+ # for mypy
+ self.max_concurrent_clones = None
+ self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
- self.vc = VolumeClient(self)
- self.fs_export = FSExport(self)
- self.nfs = NFSCluster(self)
+ # Initialize config option members
+ self.config_notify()
+ with self.lock:
+ self.vc = VolumeClient(self)
+ self.fs_export = FSExport(self)
+ self.nfs = NFSCluster(self)
+ self.inited = True
def __del__(self):
self.vc.shutdown()
def shutdown(self):
self.vc.shutdown()
+ def config_notify(self):
+ """
+ This method is called whenever one of our config options is changed.
+ """
+ with self.lock:
+ for opt in self.MODULE_OPTIONS:
+ setattr(self,
+ opt['name'], # type: ignore
+ self.get_module_option(opt['name'])) # type: ignore
+ self.log.debug(' mgr option %s = %s',
+ opt['name'], getattr(self, opt['name'])) # type: ignore
+ if self.inited:
+ if opt['name'] == "max_concurrent_clones":
+ self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
+
def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
try: