The number of cloner threads is set to 4 and it can't be
configured. This patch makes the number of cloner threads
configurable via the mgr config option "max_concurrent_clones".
On an increase in number of cloner threads, it will just
spawn the difference of threads between existing number of
cloner threads and the new configuration. It will not cancel
the running cloner threads.
On decrease in number of cloner threads, the cases are as follows.
1. If all cloner threads are waiting for the job:
In this case, all threads are notified and required number
threads are terminated.
2. If all the cloner threads are processing a job:
In this case, the condition is validated for each thread after
the current job is finished and the thread is termianted if the
condition for required number of cloner threads is not satisified.
3. If few cloner threads are processing and others are waiting:
The threads which are waiting are notified to validate the
number of threads required. If terminating those doesn't satisfy the
required number of threads, the remaining threads are terminated
upon completion of existing job.
Fixes: https://tracker.ceph.com/issues/46892
Signed-off-by: Kotresh HR <khiremat@redhat.com>
(cherry picked from commit
83c4442c765bb64f9d4fbd3edcb4967dfa21cafe)
Conflicts:
src/pybind/mgr/volumes/fs/volume.py
- keep connection_pool because nautilus does not introduce
the CephfsClient
src/pybind/mgr/volumes/module.py
- Remove nfs export related things as nautilus doesn't contains NFSExport, FSExport class
qa/tasks/cephfs/test_volumes.py
- nautilus expects particular 'mgr.id', hence used 'mgr.x' instead of 'mgr'
$ ceph fs subvolume snapshot clone <vol_name> <subvol_name> <snap_name> <target_subvol_name> --pool_layout <pool_layout>
+Configure maximum number of concurrent clones. The default is set to 4::
+
+ $ ceph config set mgr mgr/volumes/max_concurrent_clones <value>
+
To check the status of a clone operation use::
$ ceph fs clone status <vol_name> <clone_name> [--group_name <group_name>]
# verify trash dir is clean
self._wait_for_trash_empty()
+ def test_subvolume_snapshot_reconf_max_concurrent_clones(self):
+ """
+ Validate 'max_concurrent_clones' config option
+ """
+
+ # get the default number of cloner threads
+ default_max_concurrent_clones = int(self.config_get('mgr.x', 'mgr/volumes/max_concurrent_clones'))
+ self.assertEqual(default_max_concurrent_clones, 4)
+
+ # Increase number of cloner threads
+ self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 6)
+ max_concurrent_clones = int(self.config_get('mgr.x', 'mgr/volumes/max_concurrent_clones'))
+ self.assertEqual(max_concurrent_clones, 6)
+
+ # Decrease number of cloner threads
+ self.config_set('mgr', 'mgr/volumes/max_concurrent_clones', 2)
+ max_concurrent_clones = int(self.config_get('mgr.x', 'mgr/volumes/max_concurrent_clones'))
+ self.assertEqual(max_concurrent_clones, 2)
+
def test_subvolume_snapshot_clone_pool_layout(self):
subvolume = self._generate_random_subvolume_name()
snapshot = self._generate_random_snapshot_name()
}
super(Cloner, self).__init__(volume_client, "cloner", tp_size)
+ def reconfigure_max_concurrent_clones(self, tp_size):
+ super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
+
def is_clone_cancelable(self, clone_state):
return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
thread_id = threading.currentThread()
assert isinstance(thread_id, JobThread)
thread_name = thread_id.getName()
+ log.debug("thread [{0}] starting".format(thread_name))
while retries < JobThread.MAX_RETRIES_ON_EXCEPTION:
vol_job = None
# fetch next job to execute
with self.async_job.lock:
while True:
+ if self.should_reconfigure_num_threads():
+ log.info("thread [{0}] terminating due to reconfigure".format(thread_name))
+ self.async_job.threads.remove(self)
+ return
vol_job = self.async_job.get_job()
if vol_job:
break
time.sleep(1)
log.error("thread [{0}] reached exception limit, bailing out...".format(thread_name))
self.vc.cluster_log("thread {0} bailing out due to exception".format(thread_name))
+ with self.async_job.lock:
+ self.async_job.threads.remove(self)
+
+ def should_reconfigure_num_threads(self):
+ # reconfigure of max_concurrent_clones
+ return len(self.async_job.threads) > self.async_job.nr_concurrent_jobs
def cancel_job(self):
self.cancel_event.set()
# cv for job cancelation
self.waiting = False
self.cancel_cv = threading.Condition(self.lock)
+ self.nr_concurrent_jobs = nr_concurrent_jobs
self.threads = []
for i in range(nr_concurrent_jobs):
self.threads.append(JobThread(self, volume_client, name="{0}.{1}".format(name_pfx, i)))
self.threads[-1].start()
+ def reconfigure_max_concurrent_clones(self, name_pfx, nr_concurrent_jobs):
+ """
+ reconfigure number of cloner threads
+ """
+ with self.lock:
+ self.nr_concurrent_jobs = nr_concurrent_jobs
+ # Decrease in concurrency. Notify threads which are waiting for a job to terminate.
+ if len(self.threads) > nr_concurrent_jobs:
+ self.cv.notifyAll()
+ # Increase in concurrency
+ if len(self.threads) < nr_concurrent_jobs:
+ for i in range(len(self.threads), nr_concurrent_jobs):
+ self.threads.append(JobThread(self, self.vc, name="{0}.{1}.{2}".format(name_pfx, time.time(), i)))
+ self.threads[-1].start()
+
def get_job(self):
log.debug("processing {0} volume entries".format(len(self.q)))
nr_vols = len(self.q)
# volume specification
self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
self.connection_pool = ConnectionPool(self.mgr)
- # TODO: make thread pool size configurable
- self.cloner = Cloner(self, 4)
+ self.cloner = Cloner(self, self.mgr.max_concurrent_clones)
self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
# on startup, queue purge job for available volumes to kickstart
# purge for leftover subvolume entries in trash. note that, if the
import json
import logging
import traceback
+import threading
from mgr_module import MgrModule
import orchestrator
# volume in the lifetime of this module instance.
]
+ MODULE_OPTIONS = [
+ {
+ 'name': 'max_concurrent_clones',
+ 'type': 'int',
+ 'default': 4,
+ 'desc': 'Number of asynchronous cloner threads',
+ }
+ ]
+
def __init__(self, *args, **kwargs):
+ self.inited = False
+ # for mypy
+ self.max_concurrent_clones = None
+ self.lock = threading.Lock()
super(Module, self).__init__(*args, **kwargs)
- self.vc = VolumeClient(self)
+ # Initialize config option members
+ self.config_notify()
+ with self.lock:
+ self.vc = VolumeClient(self)
+ self.inited = True
def __del__(self):
self.vc.shutdown()
def shutdown(self):
self.vc.shutdown()
+ def config_notify(self):
+ """
+ This method is called whenever one of our config options is changed.
+ """
+ with self.lock:
+ for opt in self.MODULE_OPTIONS:
+ setattr(self,
+ opt['name'], # type: ignore
+ self.get_module_option(opt['name'])) # type: ignore
+ self.log.debug(' mgr option %s = %s',
+ opt['name'], getattr(self, opt['name'])) # type: ignore
+ if self.inited:
+ if opt['name'] == "max_concurrent_clones":
+ self.vc.cloner.reconfigure_max_concurrent_clones(self.max_concurrent_clones)
+
def handle_command(self, inbuf, cmd):
handler_name = "_cmd_" + cmd['prefix'].replace(" ", "_")
try: