From: J. Eric Ivancich Date: Thu, 27 Sep 2018 17:31:57 +0000 (-0400) Subject: rgw: renew resharding locks to prevent expiration X-Git-Tag: v12.2.11~115^2~15 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=c90493fecfacadae6c5bdcba62adcd8e9277f002;p=ceph.git rgw: renew resharding locks to prevent expiration Fix lock expiration problem with resharding. The resharding process will renew its bucket lock (and logshard lock if necessary) when half the remaining time is left on the lock. If the lock is expired and cannot renew the process fails and errors out appropriately. Signed-off-by: J. Eric Ivancich (cherry picked from commit 8cebffa1d8ad4df6fdae4e10e782aad0753545ce) --- diff --git a/src/cls/lock/cls_lock_client.h b/src/cls/lock/cls_lock_client.h index cf3886eebf20..e2c21cc9e5b0 100644 --- a/src/cls/lock/cls_lock_client.h +++ b/src/cls/lock/cls_lock_client.h @@ -4,6 +4,8 @@ #ifndef CEPH_CLS_LOCK_CLIENT_H #define CEPH_CLS_LOCK_CLIENT_H +#include + #include "cls/lock/cls_lock_types.h" namespace librados { diff --git a/src/cls/lock/cls_lock_ops.h b/src/cls/lock/cls_lock_ops.h index dbdddfe21407..b9388e78877c 100644 --- a/src/cls/lock/cls_lock_ops.h +++ b/src/cls/lock/cls_lock_ops.h @@ -1,3 +1,6 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + #ifndef CEPH_CLS_LOCK_OPS_H #define CEPH_CLS_LOCK_OPS_H diff --git a/src/cls/lock/cls_lock_types.h b/src/cls/lock/cls_lock_types.h index d00d6a3398f5..cc2e5617a0a6 100644 --- a/src/cls/lock/cls_lock_types.h +++ b/src/cls/lock/cls_lock_types.h @@ -1,3 +1,6 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + #ifndef CEPH_CLS_LOCK_TYPES_H #define CEPH_CLS_LOCK_TYPES_H diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 311596b8820a..73b998a8ac24 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -5909,7 +5909,7 @@ next: return ret; } - RGWBucketReshard br(store, bucket_info, attrs); + RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */); #define DEFAULT_RESHARD_MAX_ENTRIES 1000 if (max_entries < 1) { @@ -5995,7 +5995,6 @@ next: return 0; } - if (opt_cmd == OPT_RESHARD_STATUS) { if (bucket_name.empty()) { cerr << "ERROR: bucket not specified" << std::endl; @@ -6011,11 +6010,12 @@ next: return -ret; } - RGWBucketReshard br(store, bucket_info, attrs); + RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */); list status; int r = br.get_status(&status); if (r < 0) { - cerr << "ERROR: could not get resharding status for bucket " << bucket_name << std::endl; + cerr << "ERROR: could not get resharding status for bucket " << + bucket_name << std::endl; return -r; } @@ -6048,7 +6048,7 @@ next: return -ret; } - RGWBucketReshard br(store, bucket_info, attrs); + RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */); int ret = br.cancel(); if (ret < 0) { if (ret == -EBUSY) { diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 1cb8d518ed74..7a34af8489cf 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -66,8 +66,10 @@ class BucketReshardShard { public: BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info, int _num_shard, - deque& _completions) : store(_store), bucket_info(_bucket_info), bs(store), - aio_completions(_completions) { + deque& _completions) : + store(_store), bucket_info(_bucket_info), bs(store), + aio_completions(_completions) + { num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1); bs.init(bucket_info.bucket, num_shard); } @@ -92,8 +94,10 @@ public: return ret; } } + return 0; } + int flush() { if (entries.size() == 0) { return 0; @@ -157,14 +161,20 @@ public: } } + /* + * did_flush is set if not nullptr and a flush occurred; otherwise not altered + */ int add_entry(int shard_index, rgw_cls_bi_entry& entry, bool account, uint8_t category, const rgw_bucket_category_stats& entry_stats) { - int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats); + int ret = target_shards[shard_index]->add_entry(entry, account, category, + entry_stats); if (ret < 0) { - derr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << dendl; + derr << "ERROR: target_shards.add_entry(" << entry.idx << + ") returned error: " << cpp_strerror(-ret) << dendl; return ret; } + return 0; } @@ -190,13 +200,21 @@ public: } }; -RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map& _bucket_attrs) : - store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), - reshard_lock(reshard_lock_name), locked_bucket(false) { +RGWBucketReshard::RGWBucketReshard(RGWRados *_store, + const RGWBucketInfo& _bucket_info, + const map& _bucket_attrs, + RenewLocksCallback _renew_locks_callback) : + store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), + reshard_lock(reshard_lock_name), + renew_locks_callback(_renew_locks_callback) +{ const rgw_bucket& b = bucket_info.bucket; - reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id; + reshard_oid = b.get_key(':'); + + const int lock_dur_secs = + store->ctx()->_conf->rgw_reshard_bucket_lock_duration; + lock_duration = std::chrono::seconds(lock_dur_secs); - utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0); #define COOKIE_LEN 16 char cookie_buf[COOKIE_LEN + 1]; gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); @@ -206,23 +224,17 @@ RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucke reshard_lock.set_duration(lock_duration); } -RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map& _bucket_attrs, rados::cls::lock::Lock& _reshard_lock, const utime_t& _lock_start_time) : - store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs), - reshard_lock(_reshard_lock), lock_start_time(_lock_start_time), locked_bucket(true) -{ - const rgw_bucket& b = bucket_info.bucket; - reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id; -} - int RGWBucketReshard::lock_bucket() { + reshard_lock.set_must_renew(false); int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); if (ret < 0) { ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl; return ret; } - lock_start_time = ceph_clock_now(); - locked_bucket = true; + lock_start_time = Clock::now(); + lock_renew_thresh = lock_start_time + lock_duration / 2; + return 0; } @@ -232,27 +244,31 @@ void RGWBucketReshard::unlock_bucket() if (ret < 0) { ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl; } - locked_bucket = false; } - -int RGWBucketReshard::renew_lock_bucket() +int RGWBucketReshard::renew_lock_bucket(const Clock::time_point& now) { - if (!locked_bucket) { - return 0; - } - - utime_t now = ceph_clock_now(); - /* do you need to renew lock? */ - if (now > lock_start_time + store->ctx()->_conf->rgw_reshard_bucket_lock_duration/ 2) { - reshard_lock.set_must_renew(true); - int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); - if (ret == -EBUSY) { /* already locked by another processor */ - ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << reshard_oid << dendl; + // assume outer locks have timespans at least the size of ours, so + // can call inside conditional + if (renew_locks_callback) { + int ret = renew_locks_callback(now); + if (ret < 0) { return ret; } - lock_start_time = now; } + + reshard_lock.set_must_renew(true); + int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid); + if (ret < 0) { /* expired or already locked by another processor */ + ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " << + reshard_oid << " with " << cpp_strerror(-ret) << dendl; + return ret; + } + lock_start_time = now; + lock_renew_thresh = lock_start_time + lock_duration / 2; + ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " << + reshard_oid << dendl; + return 0; } @@ -358,12 +374,17 @@ class BucketInfoReshardUpdate } public: - BucketInfoReshardUpdate(RGWRados *_store, RGWBucketInfo& _bucket_info, - map& _bucket_attrs, const string& new_bucket_id) : store(_store), - bucket_info(_bucket_info), - bucket_attrs(_bucket_attrs) { + BucketInfoReshardUpdate(RGWRados *_store, + RGWBucketInfo& _bucket_info, + map& _bucket_attrs, + const string& new_bucket_id) : + store(_store), + bucket_info(_bucket_info), + bucket_attrs(_bucket_attrs) + { bucket_info.new_bucket_instance_id = new_bucket_id; } + ~BucketInfoReshardUpdate() { if (in_progress) { bucket_info.new_bucket_instance_id.clear(); @@ -390,13 +411,12 @@ public: } }; -int RGWBucketReshard::do_reshard( - int num_shards, - RGWBucketInfo& new_bucket_info, - int max_entries, - bool verbose, - ostream *out, - Formatter *formatter) +int RGWBucketReshard::do_reshard(int num_shards, + RGWBucketInfo& new_bucket_info, + int max_entries, + bool verbose, + ostream *out, + Formatter *formatter) { rgw_bucket& bucket = bucket_info.bucket; @@ -453,7 +473,7 @@ int RGWBucketReshard::do_reshard( ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated); if (ret < 0 && ret != -ENOENT) { derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl; - return -ret; + return ret; } list::iterator iter; @@ -485,30 +505,30 @@ int RGWBucketReshard::do_reshard( int shard_index = (target_shard_id > 0 ? target_shard_id : 0); - ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats); + ret = target_shards_mgr.add_entry(shard_index, entry, account, + category, stats); if (ret < 0) { return ret; } + + Clock::time_point now = Clock::now(); + if (now >= lock_renew_thresh) { + ret = renew_lock_bucket(now); + if (ret < 0) { + lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl; + return ret; + } + } + if (verbose) { formatter->close_section(); if (out) { formatter->flush(*out); - formatter->flush(*out); } } else if (out && !(total_entries % 1000)) { (*out) << " " << total_entries; } - } - ret = renew_lock_bucket(); - if (ret < 0) { - lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl; - return -ret; - } - ret = renew_lock_bucket(); - if (ret < 0) { - lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl; - return -ret; - } + } // entries loop } } @@ -524,13 +544,13 @@ int RGWBucketReshard::do_reshard( ret = target_shards_mgr.finish(); if (ret < 0) { lderr(store->ctx()) << "ERROR: failed to reshard" << dendl; - return EIO; + return -EIO; } ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time); if (ret < 0) { lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl; - return -ret; + return ret; } ret = bucket_info_updater.complete(); @@ -538,8 +558,9 @@ int RGWBucketReshard::do_reshard( ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl; /* don't error out, reshard process succeeded */ } + return 0; -} +} // RGWBucketReshard::do_reshard int RGWBucketReshard::get_status(list *status) { @@ -568,8 +589,8 @@ int RGWBucketReshard::get_status(list *status) int RGWBucketReshard::execute(int num_shards, int max_op_entries, - bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log) - + bool verbose, ostream *out, Formatter *formatter, + RGWReshard* reshard_log) { int ret = lock_bucket(); if (ret < 0) { @@ -620,8 +641,9 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries, RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out, - Formatter *_formatter) : store(_store), instance_lock(bucket_instance_lock_name), - verbose(_verbose), out(_out), formatter(_formatter) + Formatter *_formatter) : + store(_store), instance_lock(bucket_instance_lock_name), + verbose(_verbose), out(_out), formatter(_formatter) { num_logshards = store->ctx()->_conf->rgw_reshard_num_logs; } @@ -815,18 +837,16 @@ int RGWReshard::process_single_logshard(int logshard_num) bool truncated = true; CephContext *cct = store->ctx(); - int max_entries = 1000; + constexpr uint32_t max_entries = 1000; int max_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration; + std::chrono::seconds lock_duration(max_secs); rados::cls::lock::Lock l(reshard_lock_name); - - utime_t time(max_secs, 0); - l.set_duration(time); + l.set_duration(lock_duration); char cookie_buf[COOKIE_LEN + 1]; gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1); cookie_buf[COOKIE_LEN] = '\0'; - l.set_cookie(cookie_buf); string logshard_oid; @@ -837,8 +857,24 @@ int RGWReshard::process_single_logshard(int logshard_num) ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl; return ret; } - - utime_t lock_start_time = ceph_clock_now(); + Clock::time_point lock_start_time = Clock::now(); + const char* const log_func_name = __func__; + + auto renew_locks_callback = + [&l, &lock_start_time, &logshard_oid, &log_func_name, this](const Clock::time_point& now) -> int { + l.set_must_renew(true); + int ret = l.lock_exclusive(&this->store->reshard_pool_ctx, logshard_oid); + if (ret < 0) { /* expired or already locked by another processor */ + ldout(this->store->ctx(), 5) << log_func_name << + "[lambda](): failed to renew lock on " << logshard_oid << + " with " << cpp_strerror(-ret) << dendl; + return ret; + } + lock_start_time = now; + ldout(this->store->ctx(), 20) << log_func_name << + "[lambda](): successfully renewed lock on " << logshard_oid << dendl; + return 0; + }; do { std::list entries; @@ -848,28 +884,30 @@ int RGWReshard::process_single_logshard(int logshard_num) continue; } - for(auto& entry: entries) { + for(auto& entry: entries) { // logshard entries if(entry.new_instance_id.empty()) { - ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name << dendl; + ldout(store->ctx(), 20) << __func__ << " resharding " << + entry.bucket_name << dendl; RGWObjectCtx obj_ctx(store); rgw_bucket bucket; RGWBucketInfo bucket_info; map attrs; - ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr, - &attrs); + ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, + bucket_info, nullptr, &attrs); if (ret < 0) { ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl; return -ret; } - RGWBucketReshard br(store, bucket_info, attrs, l, lock_start_time); + RGWBucketReshard br(store, bucket_info, attrs, renew_locks_callback); Formatter* formatter = new JSONFormatter(false); auto formatter_ptr = std::unique_ptr(formatter); - ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this); + ret = br.execute(entry.new_num_shards, max_entries, true, nullptr, + formatter, this); if (ret < 0) { ldout (store->ctx(), 0) << __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" << cpp_strerror(-ret)<< dendl; @@ -884,24 +922,16 @@ int RGWReshard::process_single_logshard(int logshard_num) << cpp_strerror(-ret) << dendl; return ret; } - ret = br.renew_lock_bucket(); + } + + Clock::time_point now = Clock::now(); + if (now >= lock_start_time + lock_duration / 2) { + ret = renew_locks_callback(now); if (ret < 0) { - ldout(cct, 0)<< __func__ << ":Error renewing bucket " << entry.bucket_name << " lock: " - << cpp_strerror(-ret) << dendl; return ret; } } - utime_t now = ceph_clock_now(); - - if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */ - l.set_must_renew(true); - ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid); - if (ret == -EBUSY) { /* already locked by another processor */ - ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl; - return ret; - } - lock_start_time = now; - } + entry.get_key(&marker); } } while (truncated); diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index f9d3bf0720c4..f8f77abbffdc 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -5,30 +5,45 @@ #define RGW_RESHARD_H #include +#include + #include "include/rados/librados.hpp" +#include "common/ceph_time.h" #include "cls/rgw/cls_rgw_types.h" #include "cls/lock/cls_lock_client.h" #include "rgw_bucket.h" + class CephContext; class RGWRados; - class RGWBucketReshard { +public: + friend class RGWReshard; + using Clock = ceph::coarse_mono_clock; + + // returns 0 for success or a negative error code + using RenewLocksCallback = std::function; + +private: + RGWRados *store; RGWBucketInfo bucket_info; std::map bucket_attrs; string reshard_oid; rados::cls::lock::Lock reshard_lock; - utime_t lock_start_time; - bool locked_bucket; + Clock::time_point lock_start_time; + std::chrono::seconds lock_duration; + Clock::time_point lock_renew_thresh; + + RenewLocksCallback renew_locks_callback; int lock_bucket(); void unlock_bucket(); - int renew_lock_bucket(); + int renew_lock_bucket(const Clock::time_point&); int set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status); int clear_resharding(); @@ -40,22 +55,24 @@ class RGWBucketReshard { ostream *os, Formatter *formatter); public: - RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, - const std::map& _bucket_attrs); + // pass nullptr for the final parameter if no callback is used RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const std::map& _bucket_attrs, - rados::cls::lock::Lock& reshard_lock, const utime_t& lock_start_time); - + RenewLocksCallback _renew_locks_callback); int execute(int num_shards, int max_op_entries, bool verbose = false, ostream *out = nullptr, Formatter *formatter = nullptr, RGWReshard *reshard_log = nullptr); int get_status(std::list *status); int cancel(); -}; +}; // RGWBucketReshard class RGWReshard { +public: + using Clock = ceph::coarse_mono_clock; + +private: RGWRados *store; string lock_name; rados::cls::lock::Lock instance_lock;