From 8cebffa1d8ad4df6fdae4e10e782aad0753545ce Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Thu, 27 Sep 2018 13:31:57 -0400 Subject: [PATCH] rgw: renew resharding locks to prevent expiration Fix lock expiration problem with resharding. The resharding process will renew its bucket lock (and logshard lock if necessary) when half the remaining time is left on the lock. If the lock is expired and cannot renew the process fails and errors out appropriately. Signed-off-by: J. Eric Ivancich --- src/cls/lock/cls_lock_client.h | 2 + src/cls/lock/cls_lock_ops.h | 3 + src/cls/lock/cls_lock_types.h | 3 + src/rgw/rgw_admin.cc | 10 +- src/rgw/rgw_reshard.cc | 216 +++++++++++++++++++-------------- src/rgw/rgw_reshard.h | 35 ++++-- 6 files changed, 162 insertions(+), 107 deletions(-) diff --git a/src/cls/lock/cls_lock_client.h b/src/cls/lock/cls_lock_client.h index ae1a2381174..002864edecc 100644 --- a/src/cls/lock/cls_lock_client.h +++ b/src/cls/lock/cls_lock_client.h @@ -4,6 +4,8 @@ #ifndef CEPH_CLS_LOCK_CLIENT_H #define CEPH_CLS_LOCK_CLIENT_H +#include + #include "cls/lock/cls_lock_types.h" namespace librados { diff --git a/src/cls/lock/cls_lock_ops.h b/src/cls/lock/cls_lock_ops.h index 0138f6e0c2b..5d22452b3b2 100644 --- a/src/cls/lock/cls_lock_ops.h +++ b/src/cls/lock/cls_lock_ops.h @@ -1,3 +1,6 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + #ifndef CEPH_CLS_LOCK_OPS_H #define CEPH_CLS_LOCK_OPS_H diff --git a/src/cls/lock/cls_lock_types.h b/src/cls/lock/cls_lock_types.h index b77bb06ac1d..e8ec4518fa9 100644 --- a/src/cls/lock/cls_lock_types.h +++ b/src/cls/lock/cls_lock_types.h @@ -1,3 +1,6 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + #ifndef CEPH_CLS_LOCK_TYPES_H #define CEPH_CLS_LOCK_TYPES_H diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 047a506248c..6da4d464b56 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -6017,7 +6017,7 @@ next: return ret; } - RGWBucketReshard br(store, bucket_info, attrs); + RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */); #define DEFAULT_RESHARD_MAX_ENTRIES 1000 if (max_entries < 1) { @@ -6103,7 +6103,6 @@ next: return 0; } - if (opt_cmd == OPT_RESHARD_STATUS) { if (bucket_name.empty()) { cerr << "ERROR: bucket not specified" << std::endl; @@ -6119,11 +6118,12 @@ next: return -ret; } - RGWBucketReshard br(store, bucket_info, attrs); + RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */); list status; int r = br.get_status(&status); if (r < 0) { - cerr << "ERROR: could not get resharding status for bucket " << bucket_name << std::endl; + cerr << "ERROR: could not get resharding status for bucket " << + bucket_name << std::endl; return -r; } @@ -6156,7 +6156,7 @@ next: return -ret; } - RGWBucketReshard br(store, bucket_info, attrs); + RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */); int ret = br.cancel(); if (ret < 0) { if (ret == -EBUSY) { diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 474ea9baf83..3b563cd5138 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -65,8 +65,10 @@ class BucketReshardShard { public: BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info, int _num_shard, - deque& _completions) : store(_store), bucket_info(_bucket_info), bs(store), - aio_completions(_completions) { + deque& _completions) : + store(_store), bucket_info(_bucket_info), bs(store), + aio_completions(_completions) + { num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); bs.init(bucket_info.bucket, num_shard); } @@ -91,8 +93,10 @@ public: return ret; } } + return 0; } + int flush() { if (entries.size() == 0) { return 0; @@ -156,14 +160,20 @@ public: } } + /* + * did_flush is set if not nullptr and a flush occurred; otherwise not altered + */ int add_entry(int shard_index, rgw_cls_bi_entry& entry, bool account, uint8_t category, const rgw_bucket_category_stats& entry_stats) { - int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats); + int ret = target_shards[shard_index]->add_entry(entry, account, category, + entry_stats); if (ret < 0) { - derr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << dendl; + derr << "ERROR: target_shards.add_entry(" << entry.idx << + ") returned error: " << cpp_strerror(-ret) << dendl; return ret; } + return 0; } @@ -189,13 +199,21 @@ public: } }; -RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map& _bucket_attrs) : - store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), - reshard_lock(reshard_lock_name), locked_bucket(false) { +RGWBucketReshard::RGWBucketReshard(RGWRados *_store, + const RGWBucketInfo& _bucket_info, + const map& _bucket_attrs, + RenewLocksCallback _renew_locks_callback) : + store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), + reshard_lock(reshard_lock_name), + renew_locks_callback(_renew_locks_callback) +{ const rgw_bucket& b = bucket_info.bucket; - reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id; + reshard_oid = b.get_key(':'); + + const int lock_dur_secs = + store->ctx()->_conf->rgw_reshard_bucket_lock_duration; + lock_duration = std::chrono::seconds(lock_dur_secs); - utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0); #define COOKIE_LEN 16 char cookie_buf[COOKIE_LEN + 1]; gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); @@ -205,23 +223,17 @@ RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucke reshard_lock.set_duration(lock_duration); } -RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map& _bucket_attrs, rados::cls::lock::Lock& _reshard_lock, const utime_t& _lock_start_time) : - store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), - reshard_lock(_reshard_lock), lock_start_time(_lock_start_time), locked_bucket(true) -{ - const rgw_bucket& b = bucket_info.bucket; - reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id; -} - int RGWBucketReshard::lock_bucket() { + reshard_lock.set_must_renew(false); int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); if (ret < 0) { ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl; return ret; } - lock_start_time = ceph_clock_now(); - locked_bucket = true; + lock_start_time = Clock::now(); + lock_renew_thresh = lock_start_time + lock_duration / 2; + return 0; } @@ -231,27 +243,31 @@ void RGWBucketReshard::unlock_bucket() if (ret < 0) { ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl; } - locked_bucket = false; } - -int RGWBucketReshard::renew_lock_bucket() +int RGWBucketReshard::renew_lock_bucket(const Clock::time_point& now) { - if (!locked_bucket) { - return 0; - } - - utime_t now = ceph_clock_now(); - /* do you need to renew lock? */ - if (now > lock_start_time + store->ctx()->_conf->rgw_reshard_bucket_lock_duration/ 2) { - reshard_lock.set_must_renew(true); - int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); - if (ret == -EBUSY) { /* already locked by another processor */ - ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << reshard_oid << dendl; + // assume outer locks have timespans at least the size of ours, so + // can call inside conditional + if (renew_locks_callback) { + int ret = renew_locks_callback(now); + if (ret < 0) { return ret; } - lock_start_time = now; } + + reshard_lock.set_must_renew(true); + int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); + if (ret < 0) { /* expired or already locked by another processor */ + ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " << + reshard_oid << " with " << cpp_strerror(-ret) << dendl; + return ret; + } + lock_start_time = now; + lock_renew_thresh = lock_start_time + lock_duration / 2; + ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " << + reshard_oid << dendl; + return 0; } @@ -357,12 +373,17 @@ class BucketInfoReshardUpdate } public: - BucketInfoReshardUpdate(RGWRados *_store, RGWBucketInfo& _bucket_info, - map& _bucket_attrs, const string& new_bucket_id) : store(_store), - bucket_info(_bucket_info), - bucket_attrs(_bucket_attrs) { + BucketInfoReshardUpdate(RGWRados *_store, + RGWBucketInfo& _bucket_info, + map& _bucket_attrs, + const string& new_bucket_id) : + store(_store), + bucket_info(_bucket_info), + bucket_attrs(_bucket_attrs) + { bucket_info.new_bucket_instance_id = new_bucket_id; } + ~BucketInfoReshardUpdate() { if (in_progress) { bucket_info.new_bucket_instance_id.clear(); @@ -389,13 +410,12 @@ public: } }; -int RGWBucketReshard::do_reshard( - int num_shards, - RGWBucketInfo& new_bucket_info, - int max_entries, - bool verbose, - ostream *out, - Formatter *formatter) +int RGWBucketReshard::do_reshard(int num_shards, + RGWBucketInfo& new_bucket_info, + int max_entries, + bool verbose, + ostream *out, + Formatter *formatter) { rgw_bucket& bucket = bucket_info.bucket; @@ -452,7 +472,7 @@ int RGWBucketReshard::do_reshard( ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated); if (ret < 0 && ret != -ENOENT) { derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl; - return -ret; + return ret; } list::iterator iter; @@ -484,30 +504,30 @@ int RGWBucketReshard::do_reshard( int shard_index = (target_shard_id > 0 ? target_shard_id : 0); - ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats); + ret = target_shards_mgr.add_entry(shard_index, entry, account, + category, stats); if (ret < 0) { return ret; } + + Clock::time_point now = Clock::now(); + if (now >= lock_renew_thresh) { + ret = renew_lock_bucket(now); + if (ret < 0) { + lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl; + return ret; + } + } + if (verbose) { formatter->close_section(); if (out) { formatter->flush(*out); - formatter->flush(*out); } } else if (out && !(total_entries % 1000)) { (*out) << " " << total_entries; } - } - ret = renew_lock_bucket(); - if (ret < 0) { - lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl; - return -ret; - } - ret = renew_lock_bucket(); - if (ret < 0) { - lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl; - return -ret; - } + } // entries loop } } @@ -523,13 +543,13 @@ int RGWBucketReshard::do_reshard( ret = target_shards_mgr.finish(); if (ret < 0) { lderr(store->ctx()) << "ERROR: failed to reshard" << dendl; - return EIO; + return -EIO; } ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time); if (ret < 0) { lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl; - return -ret; + return ret; } ret = bucket_info_updater.complete(); @@ -537,8 +557,9 @@ int RGWBucketReshard::do_reshard( ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl; /* don't error out, reshard process succeeded */ } + return 0; -} +} // RGWBucketReshard::do_reshard int RGWBucketReshard::get_status(list *status) { @@ -567,8 +588,8 @@ int RGWBucketReshard::get_status(list *status) int RGWBucketReshard::execute(int num_shards, int max_op_entries, - bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log) - + bool verbose, ostream *out, Formatter *formatter, + RGWReshard* reshard_log) { int ret = lock_bucket(); if (ret < 0) { @@ -619,8 +640,9 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries, RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out, - Formatter *_formatter) : store(_store), instance_lock(bucket_instance_lock_name), - verbose(_verbose), out(_out), formatter(_formatter) + Formatter *_formatter) : + store(_store), instance_lock(bucket_instance_lock_name), + verbose(_verbose), out(_out), formatter(_formatter) { num_logshards = store->ctx()->_conf->rgw_reshard_num_logs; } @@ -814,18 +836,16 @@ int RGWReshard::process_single_logshard(int logshard_num) bool truncated = true; CephContext *cct = store->ctx(); - int max_entries = 1000; + constexpr uint32_t max_entries = 1000; int max_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration; + std::chrono::seconds lock_duration(max_secs); rados::cls::lock::Lock l(reshard_lock_name); - - utime_t time(max_secs, 0); - l.set_duration(time); + l.set_duration(lock_duration); char cookie_buf[COOKIE_LEN + 1]; gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); cookie_buf[COOKIE_LEN] = '\0'; - l.set_cookie(cookie_buf); string logshard_oid; @@ -836,8 +856,24 @@ int RGWReshard::process_single_logshard(int logshard_num) ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl; return ret; } - - utime_t lock_start_time = ceph_clock_now(); + Clock::time_point lock_start_time = Clock::now(); + const char* const log_func_name = __func__; + + auto renew_locks_callback = + [&l, &lock_start_time, &logshard_oid, &log_func_name, this](const Clock::time_point& now) -> int { + l.set_must_renew(true); + int ret = l.lock_exclusive(&this->store->reshard_pool_ctx, logshard_oid); + if (ret < 0) { /* expired or already locked by another processor */ + ldout(this->store->ctx(), 5) << log_func_name << + "[lambda](): failed to renew lock on " << logshard_oid << + " with " << cpp_strerror(-ret) << dendl; + return ret; + } + lock_start_time = now; + ldout(this->store->ctx(), 20) << log_func_name << + "[lambda](): successfully renewed lock on " << logshard_oid << dendl; + return 0; + }; do { std::list entries; @@ -847,28 +883,30 @@ int RGWReshard::process_single_logshard(int logshard_num) continue; } - for(auto& entry: entries) { + for(auto& entry: entries) { // logshard entries if(entry.new_instance_id.empty()) { - ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name << dendl; + ldout(store->ctx(), 20) << __func__ << " resharding " << + entry.bucket_name << dendl; RGWObjectCtx obj_ctx(store); rgw_bucket bucket; RGWBucketInfo bucket_info; map attrs; - ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr, - &attrs); + ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, + bucket_info, nullptr, &attrs); if (ret < 0) { ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl; return -ret; } - RGWBucketReshard br(store, bucket_info, attrs, l, lock_start_time); + RGWBucketReshard br(store, bucket_info, attrs, renew_locks_callback); Formatter* formatter = new JSONFormatter(false); auto formatter_ptr = std::unique_ptr(formatter); - ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this); + ret = br.execute(entry.new_num_shards, max_entries, true, nullptr, + formatter, this); if (ret < 0) { ldout (store->ctx(), 0) << __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" << cpp_strerror(-ret)<< dendl; @@ -883,24 +921,16 @@ int RGWReshard::process_single_logshard(int logshard_num) << cpp_strerror(-ret) << dendl; return ret; } - ret = br.renew_lock_bucket(); + } + + Clock::time_point now = Clock::now(); + if (now >= lock_start_time + lock_duration / 2) { + ret = renew_locks_callback(now); if (ret < 0) { - ldout(cct, 0)<< __func__ << ":Error renewing bucket " << entry.bucket_name << " lock: " - << cpp_strerror(-ret) << dendl; return ret; } } - utime_t now = ceph_clock_now(); - - if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */ - l.set_must_renew(true); - ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid); - if (ret == -EBUSY) { /* already locked by another processor */ - ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl; - return ret; - } - lock_start_time = now; - } + entry.get_key(&marker); } } while (truncated); diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index eeaf901f7a2..ffff5f29a2a 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -5,30 +5,45 @@ #define RGW_RESHARD_H #include +#include + #include "include/rados/librados.hpp" +#include "common/ceph_time.h" #include "cls/rgw/cls_rgw_types.h" #include "cls/lock/cls_lock_client.h" #include "rgw_bucket.h" + class CephContext; class RGWRados; - class RGWBucketReshard { +public: + friend class RGWReshard; + using Clock = ceph::coarse_mono_clock; + + // returns 0 for success or a negative error code + using RenewLocksCallback = std::function; + +private: + RGWRados *store; RGWBucketInfo bucket_info; std::map bucket_attrs; string reshard_oid; rados::cls::lock::Lock reshard_lock; - utime_t lock_start_time; - bool locked_bucket; + Clock::time_point lock_start_time; + std::chrono::seconds lock_duration; + Clock::time_point lock_renew_thresh; + + RenewLocksCallback renew_locks_callback; int lock_bucket(); void unlock_bucket(); - int renew_lock_bucket(); + int renew_lock_bucket(const Clock::time_point&); int set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status); int clear_resharding(); @@ -40,22 +55,24 @@ class RGWBucketReshard { ostream *os, Formatter *formatter); public: - RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, - const std::map& _bucket_attrs); + // pass nullptr for the final parameter if no callback is used RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const std::map& _bucket_attrs, - rados::cls::lock::Lock& reshard_lock, const utime_t& lock_start_time); - + RenewLocksCallback _renew_locks_callback); int execute(int num_shards, int max_op_entries, bool verbose = false, ostream *out = nullptr, Formatter *formatter = nullptr, RGWReshard *reshard_log = nullptr); int get_status(std::list *status); int cancel(); -}; +}; // RGWBucketReshard class RGWReshard { +public: + using Clock = ceph::coarse_mono_clock; + +private: RGWRados *store; string lock_name; rados::cls::lock::Lock instance_lock; -- 2.39.5