From 8e1bf1b9e8211b1183bf75f0d3a32d4eea6a52c3 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 18 May 2017 16:21:19 -0700 Subject: [PATCH] rgw: multiple fixes and adjustments related to resharding scheduler still wip, but: - get rid of some unneeded rados locking - rename shards to logshards where relevant to avoid confusion - remove unused code Signed-off-by: Yehuda Sadeh --- src/common/config_opts.h | 3 +- src/rgw/rgw_admin.cc | 39 +++++---- src/rgw/rgw_reshard.cc | 182 +++++++++++++++++---------------------- src/rgw/rgw_reshard.h | 58 +++++++------ 4 files changed, 137 insertions(+), 145 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 0502a8b1f5696..bfbcece9c2cae 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1733,7 +1733,8 @@ OPTION(debug_deliberately_leak_memory, OPT_BOOL, false) OPTION(rgw_swift_custom_header, OPT_STR, "") // option to enable swift custom headers /* resharding tunables */ -OPTION(rgw_reshard_max_jobs, OPT_INT, 1024) +OPTION(rgw_reshard_num_logs, OPT_INT, 16) OPTION(rgw_reshard_bucket_lock_duration, OPT_INT, 120) // duration of lock on bucket obj during resharding OPTION(rgw_dynamic_resharding, OPT_BOOL, true) OPTION(rgw_max_objs_per_shard, OPT_INT, 100000) +OPTION(rgw_reshard_thread_interval, OPT_U32, 60 * 10) // maximum time between rounds of reshard thread processing diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index b0d3380d86dc3..0ee585cde5ee8 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -5626,31 +5626,40 @@ next: if (opt_cmd == OPT_RESHARD_LIST) { list entries; - bool is_truncated = true; - string marker; int ret; int count = 0; if (max_entries < 0) { max_entries = 1000; } + int num_logshards = store->ctx()->_conf->rgw_reshard_num_logs; + RGWReshard reshard(store); formatter->open_array_section("reshard"); - do { - entries.clear(); - ret = reshard.list(marker, max_entries, entries, is_truncated); - if (ret < 0) { - cerr << "Error listing resharding buckets: " << cpp_strerror(-ret) << std::endl; - return ret; - } - for (auto iter=entries.begin(); iter != entries.end(); ++iter) { - cls_rgw_reshard_entry& entry = *iter; - encode_json("entry", entry, formatter); + for (int i = 0; i < num_logshards; i++) { + bool is_truncated = true; + string marker; + do { + entries.clear(); + ret = reshard.list(i, marker, max_entries, entries, &is_truncated); + if (ret < 0) { + cerr << "Error listing resharding buckets: " << cpp_strerror(-ret) << std::endl; + return ret; + } + for (auto iter=entries.begin(); iter != entries.end(); ++iter) { + cls_rgw_reshard_entry& entry = *iter; + encode_json("entry", entry, formatter); + } + count += entries.size(); + formatter->flush(cout); +#warning marker? + } while (is_truncated && count < max_entries); + + if (count >= max_entries) { + break; } - count += entries.size(); - formatter->flush(cout); - } while (is_truncated && count < max_entries); + } formatter->close_section(); formatter->flush(cout); diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 6376d3a3c7d84..7c9e414df10a2 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -12,7 +12,7 @@ #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw -const string reshard_oid = "reshard"; +const string reshard_oid_prefix = "reshard"; const string reshard_lock_name = "reshard_process"; const string bucket_instance_lock_name = "bucket_instance_lock"; @@ -478,126 +478,101 @@ sleep(10); RGWReshard::RGWReshard(RGWRados* _store): store(_store), instance_lock(bucket_instance_lock_name) { - max_jobs = store->ctx()->_conf->rgw_reshard_max_jobs; + num_logshards = store->ctx()->_conf->rgw_reshard_num_logs; +} + +string RGWReshard::get_logshard_key(const string& tenant, const string& bucket_name) +{ + return bucket_name + ":" + tenant; /* transposed on purpose */ +} + +#define MAX_RESHARD_LOGSHARDS_PRIME 7877 + +void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid) +{ + string key = get_logshard_key(tenant, bucket_name); + + uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size()); + uint32_t sid2 = sid ^ ((sid & 0xFF) << 24); + sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards; + int logshard = sid % num_logshards; + + get_logshard_oid(logshard, oid); } int RGWReshard::add(cls_rgw_reshard_entry& entry) { - rados::cls::lock::Lock l(reshard_lock_name); + string logshard_oid; - int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); - if (ret == -EBUSY) { - ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << dendl; - return 0; - } - if (ret < 0) - return ret; + get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); librados::ObjectWriteOperation op; cls_rgw_reshard_add(op, entry); - ret = store->reshard_pool_ctx.operate(reshard_oid, &op); - - l.unlock(&store->reshard_pool_ctx, reshard_oid); - return ret; + int ret = store->reshard_pool_ctx.operate(logshard_oid, &op); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; + return ret; + } + return 0; } -int RGWReshard::list(string& marker, uint32_t max, std::list& entries, bool& is_truncated) +int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list& entries, bool *is_truncated) { - rados::cls::lock::Lock l(reshard_lock_name); - - int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid); - if (ret == -EBUSY) { - ldout(store->ctx(), 0) << "RGWReshard::list failed to acquire lock on " << reshard_oid << dendl; - return 0; - } - if (ret < 0) - return ret; + string logshard_oid; - ret = cls_rgw_reshard_list(store->reshard_pool_ctx, reshard_oid, marker, max, entries, &is_truncated); + get_logshard_oid(logshard_num, &logshard_oid); - l.unlock(&store->reshard_pool_ctx, reshard_oid); - return ret; + int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl; + return ret; + } + return 0; } int RGWReshard::get(cls_rgw_reshard_entry& entry) { - rados::cls::lock::Lock l(reshard_lock_name); + string logshard_oid; - int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid); - if (ret == -EBUSY) { - ldout(store->ctx(), 0) << "RGWReshardLog::get failed to acquire lock on " << reshard_oid << dendl; - return 0; - } - if (ret < 0) - return ret; + get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); - ret = cls_rgw_reshard_get(store->reshard_pool_ctx, reshard_oid, entry); + int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; + return ret; + } - l.unlock(&store->reshard_pool_ctx, reshard_oid); - return ret; + return 0; } int RGWReshard::remove(cls_rgw_reshard_entry& entry) { - rados::cls::lock::Lock l(reshard_lock_name); + string logshard_oid; - int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); - if (ret == -EBUSY) { - ldout(store->ctx(), 0) << "RGWReshardLog::remove failed to acquire lock on " << reshard_oid << dendl; - return 0; - } - if (ret < 0) - return ret; + get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid); librados::ObjectWriteOperation op; cls_rgw_reshard_remove(op, entry); - ret = store->reshard_pool_ctx.operate(reshard_oid, &op); + int ret = store->reshard_pool_ctx.operate(logshard_oid, &op); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl; + return ret; + } - l.unlock(&store->reshard_pool_ctx, reshard_oid); return ret; } -std::string create_bucket_index_lock_name(const string& bucket_instance_id) { - return bucket_instance_lock_name + "." + bucket_instance_id; -} - int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry) { - rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id)); - - int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid); - if (ret == -EBUSY) { - ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl; - return 0; - } - if (ret < 0) + int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid); + if (ret < 0) { + lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl; return ret; - - entry.new_instance_id.clear(); - - ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid); - - l.unlock(&store->reshard_pool_ctx, bucket_instance_oid); - return ret; -} - -int RGWReshard::lock_bucket_index_shared(const string& oid) -{ - int ret = instance_lock.lock_shared(&store->reshard_pool_ctx, oid); - if (ret == -EBUSY) { - ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl; - return 0; } - return ret; -} - -int RGWReshard::unlock_bucket_index(const string& oid) -{ - instance_lock.unlock(&store->reshard_pool_ctx, oid); return 0; } @@ -679,32 +654,34 @@ static int create_new_bucket_instance(RGWRados *store, return 0; } -int RGWReshard::process_single_shard(const string& shard) +int RGWReshard::process_single_logshard(int logshard_num) { string marker; bool truncated = false; CephContext *cct = store->ctx(); int max_entries = 1000; - int max_secs = 10; - + int max_secs = 60; + rados::cls::lock::Lock l(reshard_lock_name); utime_t time(max_secs, 0); l.set_duration(time); - int ret = l.lock_exclusive(&store->reshard_pool_ctx, shard); + string logshard_oid; + get_logshard_oid(logshard_num, &logshard_oid); + + int ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid); if (ret == -EBUSY) { /* already locked by another processor */ - dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl; + ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl; return ret; } do { std::list entries; - ret = list(marker, max_entries, entries, truncated); + ret = list(logshard_num, marker, max_entries, entries, &truncated); if (ret < 0) { - ldout(cct, 10) << "cannot list all reshards: " << shard - << dendl; + ldout(cct, 10) << "cannot list all reshards in logshard oid=" << logshard_oid << dendl; continue; } @@ -758,33 +735,35 @@ int RGWReshard::process_single_shard(const string& shard) } } } +#warning update marker? +#warning do work here, renew lock } while (truncated); - l.unlock(&store->reshard_pool_ctx, shard); + l.unlock(&store->reshard_pool_ctx, logshard_oid); return 0; } -void RGWReshard::get_shard(int shard_num, string& shard) +void RGWReshard::get_logshard_oid(int shard_num, string *logshard) { char buf[32]; snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num); - string objname("bucket_reshard."); - shard = objname + buf; + string objname(reshard_oid_prefix); + *logshard = objname + buf; } -int RGWReshard::inspect_all_shards() +int RGWReshard::inspect_all_logshards() { int ret = 0; - for (int i = 0; i < num_shards; i++) { - string shard; - store->objexp_get_shard(i, shard); + for (int i = 0; i < num_logshards; i++) { + string logshard; + get_logshard_oid(i, &logshard); - ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl; + ldout(store->ctx(), 20) << "proceeding logshard = " << logshard << dendl; - ret = process_single_shard(shard); + ret = process_single_logshard(i); if (ret <0) { return ret; } @@ -820,7 +799,7 @@ void *RGWReshard::ReshardWorker::entry() { do { utime_t start = ceph_clock_now(); ldout(cct, 2) << "object expiration: start" << dendl; - if (reshard->inspect_all_shards()) { + if (reshard->inspect_all_logshards()) { /* All shards have been processed properly. Next time we can start * from this moment. */ last_run = start; @@ -833,7 +812,7 @@ void *RGWReshard::ReshardWorker::entry() { utime_t end = ceph_clock_now(); end -= start; - int secs = cct->_conf->rgw_objexp_gc_interval; + int secs = cct->_conf->rgw_reshard_thread_interval; if (secs <= end.sec()) continue; // next round @@ -854,6 +833,7 @@ void RGWReshard::ReshardWorker::stop() cond.Signal(); } +#if 0 BucketIndexLockGuard::BucketIndexLockGuard(RGWRados* _store, const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) : store(_store), diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index 1d4366203674c..d7365810c88b8 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -14,6 +14,7 @@ class CephContext; class RGWRados; +#if 0 /* gets a locked lock , release it when exiting context */ class BucketIndexLockGuard { @@ -33,6 +34,7 @@ protected: int lock(); int unlock(); }; +#endif class RGWBucketReshard { @@ -70,13 +72,10 @@ public: class RGWReshard { RGWRados *store; string lock_name; - int max_jobs; rados::cls::lock::Lock instance_lock; - int num_shards; + int num_logshards; - int lock_bucket_index_shared(const string& oid); - int unlock_bucket_index(const string& oid); - void get_shard(int shard_num, string& shard); + void get_logshard_oid(int shard_num, string *shard); protected: class ReshardWorker : public Thread { CephContext *cct; @@ -99,29 +98,32 @@ protected: ReshardWorker *worker; std::atomic down_flag = { false }; - public: - RGWReshard(RGWRados* _store); - int add(cls_rgw_reshard_entry& entry); - int get(cls_rgw_reshard_entry& entry); - int remove(cls_rgw_reshard_entry& entry); - int list(string& marker, uint32_t max, list& entries, bool& is_truncated); - int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); - - int reshard_bucket(Formatter *formatter, - int num_shards, - rgw_bucket& bucket, - RGWBucketInfo& bucket_info, - RGWBucketInfo& new_bucket_info, - int max_entries, - RGWBucketAdminOpState& bucket_op, - bool verbose = false); - - /* reshard thread */ - int process_single_shard(const std::string& shard); - int inspect_all_shards(); - bool going_down(); - void start_processor(); - void stop_processor(); + string get_logshard_key(const string& tenant, const string& bucket_name); + void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid); + +public: + RGWReshard(RGWRados* _store); + int add(cls_rgw_reshard_entry& entry); + int get(cls_rgw_reshard_entry& entry); + int remove(cls_rgw_reshard_entry& entry); + int list(int logshard_num, string& marker, uint32_t max, std::list& entries, bool *is_truncated); + int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); + + int reshard_bucket(Formatter *formatter, + int num_shards, + rgw_bucket& bucket, + RGWBucketInfo& bucket_info, + RGWBucketInfo& new_bucket_info, + int max_entries, + RGWBucketAdminOpState& bucket_op, + bool verbose = false); + + /* reshard thread */ + int process_single_logshard(int logshard_num); + int inspect_all_logshards(); + bool going_down(); + void start_processor(); + void stop_processor(); }; -- 2.39.5