From: Yehuda Sadeh Date: Sun, 1 Jul 2018 23:46:13 +0000 (-0700) Subject: rgw: introduce cr singleton, use for subscriptions init X-Git-Tag: v14.1.0~616^2~56 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=8bf3648d256777e0e19af4a55164ad046535cb11;p=ceph-ci.git rgw: introduce cr singleton, use for subscriptions init Prevent concurrent initalizations of the same subscription Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index d756cfd52a7..5910df6de7f 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -222,7 +222,7 @@ int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env) RGWCoroutine *op = *pos; op->stack = this; ldout(cct, 20) << *op << ": operate()" << dendl; - int r = op->operate(); + int r = op->operate_wrapper(); if (r < 0) { ldout(cct, 20) << *op << ": operate() returned r=" << r << dendl; } diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 8c1e25dc58a..6055eb3b70f 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -258,6 +258,9 @@ protected: return status; } + virtual int operate_wrapper() { + return operate(); + } public: RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {} ~RGWCoroutine() override; diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 0eec2d94729..23e5d14c8e8 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -340,6 +340,98 @@ public: }; +template +class RGWSingletonCR : public RGWCoroutine { + friend class WrapperCR; + + boost::asio::coroutine wrapper_state; + bool started{false}; + int operate_ret{0}; + + struct WaiterInfo { + RGWCoroutine *cr{nullptr}; + T *result; + }; + using WaiterInfoRef = std::shared_ptr; + + deque waiters; + + void add_waiter(RGWCoroutine *cr, T *result) { + auto waiter = std::make_shared(); + waiter->cr = cr; + waiter->result = result; + waiters.push_back(waiter); + }; + + bool get_next_waiter(WaiterInfoRef *waiter) { + if (waiters.empty()) { + waiter->reset(); + return false; + } + + *waiter = waiters.front(); + waiters.pop_front(); + return true; + } + + int operate_wrapper() override { + reenter(&wrapper_state) { + while (!is_done()) { + ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl; + operate_ret = operate(); + if (operate_ret < 0) { + ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl; + } + if (!is_done()) { + yield; + } + } + + ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl; + /* we're done, can't yield anymore */ + + WaiterInfoRef waiter; + while (get_next_waiter(&waiter)) { + ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl; + waiter->cr->set_retcode(retcode); + waiter->cr->set_sleeping(false); + return_result(waiter->result); + put(); + } + + return retcode; + } + return 0; + } + + virtual void return_result(T *result) {} + +public: + RGWSingletonCR(CephContext *_cct) + : RGWCoroutine(_cct) {} + + int execute(RGWCoroutine *caller, T *result = nullptr) { + if (!started) { + ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl; + started = true; + caller->call(this); + return 0; + } else if (!is_done()) { + ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl; + get(); + add_waiter(caller, result); + caller->set_sleeping(true); + return 0; + } + + ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl; + caller->set_retcode(retcode); + return_result(result); + return retcode; + } +}; + + class PSSubscription; using PSSubscriptionRef = std::shared_ptr; @@ -353,9 +445,10 @@ class PSSubscription { RGWDataAccessRef data_access; RGWDataAccess::BucketRef bucket; -public: + class InitCR; + InitCR *init_cr{nullptr}; - class InitCR : public RGWCoroutine { + class InitCR : public RGWSingletonCR { RGWDataSyncEnv *sync_env; PSSubscriptionRef sub; rgw_get_bucket_info_params get_bucket_info; @@ -365,7 +458,7 @@ public: int i; public: InitCR(RGWDataSyncEnv *_sync_env, - PSSubscriptionRef& _sub) : RGWCoroutine(_sync_env->cct), + PSSubscriptionRef& _sub) : RGWSingletonCR(_sync_env->cct), sync_env(_sync_env), sub(_sub), conf(sub->env->conf), sub_conf(sub->sub_conf) { @@ -409,7 +502,6 @@ public: << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; return set_cr_error(retcode); } -ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl; /* second iteration: we got -ENOENT and created a bucket */ } @@ -464,23 +556,32 @@ ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_s } }; +public: + PSSubscription(RGWDataSyncEnv *_sync_env, PSEnvRef _env, PSSubConfigRef& _sub_conf) : sync_env(_sync_env), env(_env), sub_conf(_sub_conf), data_access(std::make_shared(sync_env->store)) {} + virtual ~PSSubscription() { + if (init_cr) { + init_cr->put(); + } + } public: static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env, PSEnvRef _env, PSSubConfigRef& _sub_conf) { auto sub = new PSSubscription(_sync_env, _env, _sub_conf); sub->self = std::shared_ptr(sub); + sub->init_cr = new InitCR(_sync_env, sub->self); + sub->init_cr->get(); return sub->self; } - RGWCoroutine *init_cr() { - return new InitCR(sync_env, self); + int call_init_cr(RGWCoroutine *caller) { + return init_cr->execute(caller); } RGWCoroutine *store_event_cr(EventRef& event) { @@ -500,7 +601,7 @@ class PSManager map subs; - class GetSubCR : public RGWCoroutine { + class GetSubCR : public RGWSingletonCR { RGWDataSyncEnv *sync_env; PSManagerRef mgr; const string& sub_name; @@ -513,13 +614,16 @@ class PSManager GetSubCR(RGWDataSyncEnv *_sync_env, PSManagerRef& _mgr, const string& _sub_name, - PSSubscriptionRef *_ref) : RGWCoroutine(_sync_env->cct), + PSSubscriptionRef *_ref) : RGWSingletonCR(_sync_env->cct), sync_env(_sync_env), mgr(_mgr), sub_name(_sub_name), ref(_ref), conf(mgr->env->conf) { } + ~GetSubCR() { + mgr->remove_get_sub(sub_name); + } int operate() override { reenter(this) { @@ -530,20 +634,33 @@ class PSManager *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf); - yield call((*ref)->init_cr()); + yield (*ref)->call_init_cr(this); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: failed to init subscription" << dendl; - ref->reset(); return set_cr_error(retcode); } mgr->subs[sub_name] = *ref; + return set_cr_done(); } return 0; } + + void return_result(PSSubscriptionRef *result) override { + ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl; + if (retcode >= 0) { + *result = *ref; + } + } }; + map get_subs; + + void remove_get_sub(const string& name) { + get_subs.erase(name); + } + bool find_sub_instance(const string& sub_name, PSSubscriptionRef *sub) { auto iter = subs.find(sub_name); if (iter != subs.end()) { @@ -565,12 +682,19 @@ public: return mgr->self; } - RGWCoroutine *get_subscription_cr(const string& sub_name, PSSubscriptionRef *ref) { + int call_get_subscription_cr(RGWCoroutine *caller, const string& sub_name, PSSubscriptionRef *ref) { if (find_sub_instance(sub_name, ref)) { /* found it! nothing to execute */ - return nullptr; + ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl; + return 0; + } + auto& gs = get_subs[sub_name]; + if (!gs) { + ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl; + gs = new GetSubCR(sync_env, self, sub_name, ref); } - return new GetSubCR(sync_env, self, sub_name, ref); + ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl; + return gs->execute(caller, ref); } friend class GetSubCR; @@ -624,6 +748,7 @@ public: } }; + class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { RGWDataSyncEnv *sync_env; PSEnvRef env; @@ -663,7 +788,7 @@ public: for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl; - yield call(env->manager->get_subscription_cr(*siter, &sub)); + yield env->manager->call_get_subscription_cr(this, *siter, &sub); if (retcode < 0) { ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl; continue;