From e72b6510a402cbafa6c99475626f15d97fd00f86 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Wed, 23 Feb 2022 17:21:10 +0200 Subject: [PATCH] rgw: prevent spurious/lost notifications in the index completion thread this was happening when asyn completions happened during reshard. more information about testing: https://gist.github.com/yuvalif/d526c0a3a4c5b245b9e951a6c5a10517 we also add more logs to the completion manager. should allow finding unhandled completions due to reshards. Fixes: https://tracker.ceph.com/issues/54435 Signed-off-by: Yuval Lifshitz --- src/rgw/rgw_rados.cc | 234 ++++++++++++++++++++----------------------- 1 file changed, 110 insertions(+), 124 deletions(-) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index f992532fe3874..099276d4bd189 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -804,121 +804,65 @@ struct complete_op_data { } }; -class RGWIndexCompletionThread : public RGWRadosThread, public DoutPrefixProvider { - RGWRados *store; - - uint64_t interval_msec() override { - return 0; - } - - list completions; - - ceph::mutex completions_lock = - ceph::make_mutex("RGWIndexCompletionThread::completions_lock"); -public: - RGWIndexCompletionThread(RGWRados *_store) - : RGWRadosThread(_store, "index-complete"), store(_store) {} - - int process(const DoutPrefixProvider *dpp) override; - - void add_completion(complete_op_data *completion) { - { - std::lock_guard l{completions_lock}; - completions.push_back(completion); - } - - signal(); - } - - CephContext *get_cct() const override { return store->ctx(); } - unsigned get_subsys() const { return dout_subsys; } - std::ostream& gen_prefix(std::ostream& out) const { return out << "rgw index completion thread: "; } -}; - -int RGWIndexCompletionThread::process(const DoutPrefixProvider *dpp) -{ - list comps; - - { - std::lock_guard l{completions_lock}; - completions.swap(comps); - } +class RGWIndexCompletionManager { + RGWRados* const store; + const int num_shards; + ceph::containers::tiny_vector locks; + std::vector> completions; + std::vector retry_completions; - for (auto c : comps) { - std::unique_ptr up{c}; + std::thread retry_thread; + std::condition_variable cond; + std::mutex retry_completions_lock; - if (going_down()) { - continue; - } - ldpp_dout(this, 20) << __func__ << "(): handling completion for key=" << c->key << dendl; + bool _stop{false}; - RGWRados::BucketShard bs(store); - RGWBucketInfo bucket_info; + std::atomic cur_shard {0}; - int r = bs.init(c->obj.bucket, c->obj, &bucket_info, this); - if (r < 0) { - ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl; - /* not much to do */ - continue; + void process(); + + void add_completion(complete_op_data *completion); + + void stop() { + if (retry_thread.joinable()) { + _stop = true; + cond.notify_all(); + retry_thread.join(); } - r = store->guard_reshard(this, &bs, c->obj, bucket_info, - [&](RGWRados::BucketShard *bs) -> int { - librados::ObjectWriteOperation o; - cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING); - cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs, - c->log_op, c->bilog_op, &c->zones_trace); - return bs->bucket_obj.operate(this, &o, null_yield); - }); - if (r < 0) { - ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl; - /* ignoring error, can't do anything about it */ - continue; - } - r = store->svc.datalog_rados->add_entry(this, bucket_info, bs.shard_id); - if (r < 0) { - ldpp_dout(this, -1) << "ERROR: failed writing data log" << dendl; + for (int i = 0; i < num_shards; ++i) { + std::lock_guard l{locks[i]}; + for (auto c : completions[i]) { + c->stop(); + } } + completions.clear(); + } + + int next_shard() { + int result = cur_shard % num_shards; + cur_shard++; + return result; } - - return 0; -} - -class RGWIndexCompletionManager { - RGWRados *store{nullptr}; - ceph::containers::tiny_vector locks; - vector > completions; - - RGWIndexCompletionThread *completion_thread{nullptr}; - - int num_shards; - - std::atomic cur_shard {0}; - public: RGWIndexCompletionManager(RGWRados *_store) : store(_store), + num_shards(store->ctx()->_conf->rgw_thread_pool_size), locks{ceph::make_lock_container( - store->ctx()->_conf->rgw_thread_pool_size, + num_shards, [](const size_t i) { return ceph::make_mutex("RGWIndexCompletionManager::lock::" + std::to_string(i)); - })} - { - num_shards = store->ctx()->_conf->rgw_thread_pool_size; - completions.resize(num_shards); - } + })}, + completions(num_shards), + retry_thread(&RGWIndexCompletionManager::process, this) + {} + ~RGWIndexCompletionManager() { stop(); } - int next_shard() { - int result = cur_shard % num_shards; - cur_shard++; - return result; - } - void create_completion(const rgw_obj& obj, RGWModifyOp op, string& tag, rgw_bucket_entry_ver& ver, @@ -928,36 +872,17 @@ public: uint16_t bilog_op, rgw_zone_set *zones_trace, complete_op_data **result); - bool handle_completion(completion_t cb, complete_op_data *arg); - int start(const DoutPrefixProvider *dpp) { - completion_thread = new RGWIndexCompletionThread(store); - int ret = completion_thread->init(dpp); - if (ret < 0) { - return ret; - } - completion_thread->start(); - return 0; - } - void stop() { - if (completion_thread) { - completion_thread->stop(); - delete completion_thread; - } + bool handle_completion(completion_t cb, complete_op_data *arg); - for (int i = 0; i < num_shards; ++i) { - std::lock_guard l{locks[i]}; - for (auto c : completions[i]) { - c->stop(); - } - } - completions.clear(); + CephContext* ctx() { + return store->ctx(); } }; static void obj_complete_cb(completion_t cb, void *arg) { - complete_op_data *completion = (complete_op_data *)arg; + complete_op_data *completion = reinterpret_cast(arg); completion->lock.lock(); if (completion->stopped) { completion->lock.unlock(); /* can drop lock, no one else is referencing us */ @@ -971,6 +896,57 @@ static void obj_complete_cb(completion_t cb, void *arg) } } +void RGWIndexCompletionManager::process() +{ + DoutPrefix dpp(store->ctx(), dout_subsys, "rgw index completion thread: "); + while(!_stop) { + std::vector comps; + + { + std::unique_lock l{retry_completions_lock}; + cond.wait(l, [this](){return _stop || !retry_completions.empty();}); + if (_stop) { + return; + } + retry_completions.swap(comps); + } + + for (auto c : comps) { + std::unique_ptr up{c}; + + ldpp_dout(&dpp, 20) << __func__ << "(): handling completion for key=" << c->key << dendl; + + RGWRados::BucketShard bs(store); + RGWBucketInfo bucket_info; + + int r = bs.init(c->obj.bucket, c->obj, &bucket_info, &dpp); + if (r < 0) { + ldpp_dout(&dpp, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl; + /* not much to do */ + continue; + } + + r = store->guard_reshard(&dpp, &bs, c->obj, bucket_info, + [&](RGWRados::BucketShard *bs) -> int { + librados::ObjectWriteOperation o; + cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING); + cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs, + c->log_op, c->bilog_op, &c->zones_trace); + return bs->bucket_obj.operate(&dpp, &o, null_yield); + }); + if (r < 0) { + ldpp_dout(&dpp, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl; + /* ignoring error, can't do anything about it */ + continue; + } + + r = store->svc.datalog_rados->add_entry(&dpp, bucket_info, bs.shard_id); + if (r < 0) { + ldpp_dout(&dpp, -1) << "ERROR: failed writing data log" << dendl; + } + } + } +} void RGWIndexCompletionManager::create_completion(const rgw_obj& obj, RGWModifyOp op, string& tag, @@ -1014,7 +990,16 @@ void RGWIndexCompletionManager::create_completion(const rgw_obj& obj, entry->rados_completion = librados::Rados::aio_create_completion(entry, obj_complete_cb); std::lock_guard l{locks[shard_id]}; - completions[shard_id].insert(entry); + const auto ok = completions[shard_id].insert(entry).second; + ceph_assert(ok); +} + +void RGWIndexCompletionManager::add_completion(complete_op_data *completion) { + { + std::lock_guard l{retry_completions_lock}; + retry_completions.push_back(completion); + } + cond.notify_all(); } bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg) @@ -1027,6 +1012,7 @@ bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_d auto iter = comps.find(arg); if (iter == comps.end()) { + ldout(arg->manager->ctx(), 0) << __func__ << "(): cannot find completion for obj=" << arg->key << dendl; return true; } @@ -1035,9 +1021,13 @@ bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_d int r = rados_aio_get_return_value(cb); if (r != -ERR_BUSY_RESHARDING) { + ldout(arg->manager->ctx(), 20) << __func__ << "(): completion " << + (r == 0 ? "ok" : "failed with " + to_string(r)) << + " for obj=" << arg->key << dendl; return true; } - completion_thread->add_completion(arg); + add_completion(arg); + ldout(arg->manager->ctx(), 20) << __func__ << "(): async completion added for obj=" << arg->key << dendl; return false; } @@ -1359,10 +1349,6 @@ int RGWRados::init_complete(const DoutPrefixProvider *dpp) } index_completion_manager = new RGWIndexCompletionManager(this); - ret = index_completion_manager->start(dpp); - if (ret < 0) { - return ret; - } ret = rgw::notify::init(cct, store, dpp); if (ret < 0 ) { ldpp_dout(dpp, 1) << "ERROR: failed to initialize notification manager" << dendl; -- 2.39.5