g_conf->rgw_enable_gc_threads,
g_conf->rgw_enable_lc_threads,
g_conf->rgw_enable_quota_threads,
- g_conf->rgw_run_sync_thread);
+ g_conf->rgw_run_sync_thread,
+ g_conf->rgw_dynamic_resharding);
if (!store) {
mutex.Lock();
if (raw_storage_op) {
store = RGWStoreManager::get_raw_storage(g_ceph_context);
} else {
- store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false);
+ store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false);
}
if (!store) {
cerr << "couldn't init storage provider" << std::endl;
RGWRados *store = RGWStoreManager::get_storage(g_ceph_context,
g_conf->rgw_enable_gc_threads, g_conf->rgw_enable_lc_threads, g_conf->rgw_enable_quota_threads,
- g_conf->rgw_run_sync_thread);
+ g_conf->rgw_run_sync_thread, g_conf->rgw_dynamic_resharding);
if (!store) {
mutex.Lock();
init_timer.cancel_all_events();
common_init_finish(g_ceph_context);
- store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false);
+ store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false);
if (!store) {
std::cerr << "couldn't init storage provider" << std::endl;
return EIO;
reshard_wait->stop();
reshard_wait.reset();
}
+
+ if (run_reshard_thread) {
+ reshard->stop_processor();
+ }
delete reshard;
delete index_completion_manager;
}
lc = new RGWLC();
lc->initialize(cct, this);
-
+
if (use_lc_thread)
lc->start_processor();
-
+
quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
reshard_wait = std::make_shared<RGWReshardWait>(this);
reshard = new RGWReshard(this);
+ if (run_reshard_thread) {
+ reshard->start_processor();
+ }
+
index_completion_manager = new RGWIndexCompletionManager(this);
ret = index_completion_manager->start();
return ++max_bucket_id;
}
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread)
{
int use_cache = cct->_conf->rgw_cache_enabled;
RGWRados *store = NULL;
store = new RGWCache<RGWRados>;
}
- if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread) < 0) {
+ if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, run_reshard_thread) < 0) {
delete store;
return NULL;
}
bool use_lc_thread;
bool quota_threads;
bool run_sync_thread;
+ bool run_reshard_thread;
RGWAsyncRadosProcessor* async_rados;
RGWQuotaHandler *quota_handler;
Finisher *finisher;
-
+
RGWCoroutinesManagerRegistry *cr_registry;
RGWSyncModulesManager *sync_modules_manager{nullptr};
public:
RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
- run_sync_thread(false), async_rados(nullptr), meta_notifier(NULL),
+ run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL),
data_notifier(NULL), meta_sync_processor_thread(NULL),
meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
num_watchers(0), watchers(NULL),
CephContext *ctx() { return cct; }
/** do all necessary setup of the storage device */
- int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads, bool _run_sync_thread) {
+ int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads, bool _run_sync_thread, bool _run_reshard_thread) {
set_context(_cct);
use_gc_thread = _use_gc_thread;
use_lc_thread = _use_lc_thread;
quota_threads = _quota_threads;
run_sync_thread = _run_sync_thread;
+ run_reshard_thread = _run_reshard_thread;
return initialize();
}
/** Initialize the RADOS instance and prepare to do other ops */
class RGWStoreManager {
public:
RGWStoreManager() {}
- static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread) {
- RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread);
+ static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread) {
+ RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread,
+ run_reshard_thread);
return store;
}
static RGWRados *get_raw_storage(CephContext *cct) {
RGWRados *store = init_raw_storage_provider(cct);
return store;
}
- static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread);
+ static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread);
static RGWRados *init_raw_storage_provider(CephContext *cct);
static void close_storage(RGWRados *store);
cct->_conf->rgw_enable_gc_threads,
cct->_conf->rgw_enable_lc_threads,
cct->_conf->rgw_enable_quota_threads,
- cct->_conf->rgw_run_sync_thread);
+ cct->_conf->rgw_run_sync_thread,
+ cct->_conf->rgw_dynamic_resharding);
ldout(cct, 1) << "Creating new store" << dendl;
auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
CODE_ENVIRONMENT_UTILITY, 0);
common_init_finish(g_ceph_context);
- store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false);
+ store = RGWStoreManager::get_storage(g_ceph_context, false, false, false, false, false);
g_test = new admin_log::test_helper();
finisher = new Finisher(g_ceph_context);
#ifdef GTEST