From: Yehuda Sadeh Date: Tue, 3 Jul 2018 21:43:44 +0000 (-0700) Subject: rgw: pubsub: multiple fixes X-Git-Tag: v14.1.0~616^2~49 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=95d94cd59fba85bd42289a94523fc88172890ffe;p=ceph.git rgw: pubsub: multiple fixes Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 8ed726159663..71c0ac2a90ba 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -54,7 +54,7 @@ config: { "tenant": , # default: "uid": , # default: "pubsub" - "data_bucket_prefix": # default: "pubsub" + "data_bucket_prefix": # default: "pubsub-" "data_oid_prefix": # # non-dynamic config @@ -224,10 +224,9 @@ struct PSConfig { void init(CephContext *cct, const JSONFormattable& config) { string uid = config["uid"]("pubsub"); user = rgw_user(config["tenant"], uid); - data_bucket_prefix = config["data_bucket_prefix"]("pubsub"); + data_bucket_prefix = config["data_bucket_prefix"]("pubsub-"); data_oid_prefix = config["data_oid_prefix"]; - /* FIXME: this will be dynamically configured */ for (auto& c : config["notifications"].array()) { PSNotificationConfig nc; nc.id = ++max_id; @@ -454,7 +453,6 @@ class PSSubscription; using PSSubscriptionRef = std::shared_ptr; class PSSubscription { - PSSubscriptionRef self; RGWDataSyncEnv *sync_env; PSEnvRef env; PSSubConfigRef sub_conf; @@ -597,31 +595,22 @@ public: } } - static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env, + template + static PSSubscriptionRef get_shared(RGWDataSyncEnv *_sync_env, PSEnvRef _env, - PSSubConfigRef& _sub_conf) { - auto sub = new PSSubscription(_sync_env, _env, _sub_conf); - sub->self = std::shared_ptr(sub); - sub->init_cr = new InitCR(_sync_env, sub->self); + C& _sub_conf) { + auto sub = std::make_shared(_sync_env, _env, _sub_conf); + sub->init_cr = new InitCR(_sync_env, sub); sub->init_cr->get(); - return sub->self; - } - static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env, - PSEnvRef _env, - rgw_pubsub_user_sub_config& _sub_conf) { - auto sub = new PSSubscription(_sync_env, _env, _sub_conf); - sub->self = std::shared_ptr(sub); - sub->init_cr = new InitCR(_sync_env, sub->self); - sub->init_cr->get(); - return sub->self; + return sub; } int call_init_cr(RGWCoroutine *caller) { return init_cr->execute(caller); } - RGWCoroutine *store_event_cr(EventRef& event) { - return new StoreEventCR(sync_env, self, event); + static RGWCoroutine *store_event_cr(RGWDataSyncEnv *sync_env, PSSubscriptionRef& sub, EventRef& event) { + return new StoreEventCR(sync_env, sub, event); } friend class InitCR; @@ -630,8 +619,6 @@ public: class PSManager { - PSManagerRef self; - RGWDataSyncEnv *sync_env; PSEnvRef env; @@ -663,7 +650,6 @@ class PSManager conf(mgr->env->conf) { } ~GetSubCR() { - mgr->remove_get_sub(sub_name); } int operate() override { @@ -671,6 +657,7 @@ class PSManager if (owner.empty()) { if (!conf->find_sub(sub_name, &sub_conf)) { ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl; + mgr->remove_get_sub(owner, sub_name); return set_cr_error(-ENOENT); } @@ -687,6 +674,7 @@ class PSManager &user_sub_conf, empty_on_enoent)); } if (retcode < 0) { + mgr->remove_get_sub(owner, sub_name); return set_cr_error(retcode); } @@ -696,12 +684,14 @@ class PSManager yield (*ref)->call_init_cr(this); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: failed to init subscription" << dendl; + mgr->remove_get_sub(owner, sub_name); return set_cr_error(retcode); } if (owner.empty()) { mgr->subs[sub_name] = *ref; } + mgr->remove_get_sub(owner, sub_name); return set_cr_done(); } @@ -716,14 +706,27 @@ class PSManager } }; + string sub_id(const rgw_user& owner, const string& sub_name) { + string owner_prefix; + if (!owner.empty()) { + owner_prefix = owner.to_str() + "/"; + } + + return owner_prefix + sub_name; + } + map get_subs; - void remove_get_sub(const string& name) { - get_subs.erase(name); + GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) { + return get_subs[sub_id(owner, name)]; } - bool find_sub_instance(const string& sub_name, PSSubscriptionRef *sub) { - auto iter = subs.find(sub_name); + void remove_get_sub(const rgw_user& owner, const string& name) { + get_subs.erase(sub_id(owner, name)); + } + + bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) { + auto iter = subs.find(sub_id(owner, sub_name)); if (iter != subs.end()) { *sub = iter->second; return true; @@ -736,28 +739,20 @@ class PSManager env(_env) {} public: - static PSManagerRef& get_shared(RGWDataSyncEnv *_sync_env, + static PSManagerRef get_shared(RGWDataSyncEnv *_sync_env, PSEnvRef _env) { - auto mgr = new PSManager(_sync_env, _env); - mgr->self = std::shared_ptr(mgr); - return mgr->self; + return std::shared_ptr(new PSManager(_sync_env, _env)); } - int call_get_subscription_cr(RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) { - string owner_prefix; - if (!owner.empty()) { - owner_prefix = owner.to_str() + "/"; - } - - string sub_instance = owner_prefix + sub_name; - if (find_sub_instance(sub_instance, ref)) { + static int call_get_subscription_cr(RGWDataSyncEnv *sync_env, PSManagerRef& mgr, RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) { + if (mgr->find_sub_instance(owner, sub_name, ref)) { /* found it! nothing to execute */ ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl; } - auto& gs = get_subs[sub_instance]; + auto& gs = mgr->get_get_subs(owner, sub_name); if (!gs) { ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl; - gs = new GetSubCR(sync_env, self, owner, sub_name, ref); + gs = new GetSubCR(sync_env, mgr, owner, sub_name, ref); } ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl; return gs->execute(caller, ref); @@ -919,13 +914,13 @@ public: /* * once for the global subscriptions, once for the user specific subscriptions */ - yield env->manager->call_get_subscription_cr(this, *oiter, *siter, &sub); + yield PSManager::call_get_subscription_cr(sync_env, env->manager, this, *oiter, *siter, &sub); if (retcode < 0) { ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl; continue; } - yield call(sub->store_event_cr(event)); + yield call(PSSubscription::store_event_cr(sync_env, sub, event)); if (retcode < 0) { ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; continue; @@ -1122,8 +1117,8 @@ public: env->init_instance(sync_env->store->get_realm(), instance_id, mgr); } - RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override { - ldout(sync_env->cct, 5) << conf->id << ": init" << dendl; + RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override { + ldout(sync_env->cct, 5) << conf->id << ": start" << dendl; return new RGWPSInitEnvCBCR(sync_env, env); } RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {