From: Yehuda Sadeh Date: Tue, 3 Jul 2018 18:39:39 +0000 (-0700) Subject: rgw: pubsub: fetch user subs X-Git-Tag: v14.1.0~616^2~51 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=17d4e96a6fb4cc0fb5e5f34e7f1b8927c7476c8a;p=ceph.git rgw: pubsub: fetch user subs Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 6a19d5f7e1d7..8ed726159663 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -88,6 +88,14 @@ struct PSSubConfig { /* subscription config */ string data_bucket_name; string data_oid_prefix; + void from_user_conf(const rgw_pubsub_user_sub_config& uc) { + name = uc.name; + topic = uc.topic; + push_endpoint = uc.dest.push_endpoint; + data_bucket_name = uc.dest.bucket_name; + data_oid_prefix = uc.dest.oid_prefix; + } + void dump(Formatter *f) const { encode_json("name", name, f); encode_json("topic", topic, f); @@ -569,19 +577,26 @@ class PSSubscription { }; public: - PSSubscription(RGWDataSyncEnv *_sync_env, PSEnvRef _env, PSSubConfigRef& _sub_conf) : sync_env(_sync_env), env(_env), sub_conf(_sub_conf), data_access(std::make_shared(sync_env->store)) {} + + PSSubscription(RGWDataSyncEnv *_sync_env, + PSEnvRef _env, + rgw_pubsub_user_sub_config& user_sub_conf) : sync_env(_sync_env), + env(_env), + data_access(std::make_shared(sync_env->store)) { + sub_conf->from_user_conf(user_sub_conf); + } virtual ~PSSubscription() { if (init_cr) { init_cr->put(); } } -public: + static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env, PSEnvRef _env, PSSubConfigRef& _sub_conf) { @@ -591,6 +606,15 @@ public: sub->init_cr->get(); return sub->self; } + static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env, + PSEnvRef _env, + rgw_pubsub_user_sub_config& _sub_conf) { + auto sub = new PSSubscription(_sync_env, _env, _sub_conf); + sub->self = std::shared_ptr(sub); + sub->init_cr = new InitCR(_sync_env, sub->self); + sub->init_cr->get(); + return sub->self; + } int call_init_cr(RGWCoroutine *caller) { return init_cr->execute(caller); @@ -616,19 +640,24 @@ class PSManager class GetSubCR : public RGWSingletonCR { RGWDataSyncEnv *sync_env; PSManagerRef mgr; - const string& sub_name; + rgw_user owner; + string sub_name; + string sub_id; PSSubscriptionRef *ref; - PSConfigRef& conf; + PSConfigRef conf; PSSubConfigRef sub_conf; + rgw_pubsub_user_sub_config user_sub_conf; public: GetSubCR(RGWDataSyncEnv *_sync_env, PSManagerRef& _mgr, + const rgw_user& _owner, const string& _sub_name, PSSubscriptionRef *_ref) : RGWSingletonCR(_sync_env->cct), sync_env(_sync_env), mgr(_mgr), + owner(_owner), sub_name(_sub_name), ref(_ref), conf(mgr->env->conf) { @@ -639,12 +668,30 @@ class PSManager int operate() override { reenter(this) { - if (!conf->find_sub(sub_name, &sub_conf)) { - ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl; - return set_cr_error(-ENOENT); - } + if (owner.empty()) { + if (!conf->find_sub(sub_name, &sub_conf)) { + ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl; + return set_cr_error(-ENOENT); + } - *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf); + *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf); + } else { + using ReadInfoCR = RGWSimpleRadosReadCR; + yield { + RGWUserPubSub ups(sync_env->store, owner); + rgw_raw_obj obj; + ups.get_sub_meta_obj(sub_name, &obj); + bool empty_on_enoent = false; + call(new ReadInfoCR(sync_env->async_rados, sync_env->store, + obj, + &user_sub_conf, empty_on_enoent)); + } + if (retcode < 0) { + return set_cr_error(retcode); + } + + *ref = PSSubscription::get_shared(sync_env, mgr->env, user_sub_conf); + } yield (*ref)->call_init_cr(this); if (retcode < 0) { @@ -652,7 +699,9 @@ class PSManager return set_cr_error(retcode); } - mgr->subs[sub_name] = *ref; + if (owner.empty()) { + mgr->subs[sub_name] = *ref; + } return set_cr_done(); } @@ -694,16 +743,21 @@ public: return mgr->self; } - int call_get_subscription_cr(RGWCoroutine *caller, const string& sub_name, PSSubscriptionRef *ref) { - if (find_sub_instance(sub_name, ref)) { + int call_get_subscription_cr(RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) { + string owner_prefix; + if (!owner.empty()) { + owner_prefix = owner.to_str() + "/"; + } + + string sub_instance = owner_prefix + sub_name; + if (find_sub_instance(sub_instance, ref)) { /* found it! nothing to execute */ ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl; - return 0; } - auto& gs = get_subs[sub_name]; + auto& gs = get_subs[sub_instance]; if (!gs) { ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl; - gs = new GetSubCR(sync_env, self, sub_name, ref); + gs = new GetSubCR(sync_env, self, owner, sub_name, ref); } ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl; return gs->execute(caller, ref); @@ -775,13 +829,14 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine { public: RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env, PSEnvRef& _env, - const RGWBucketInfo& _bucket_info, + const rgw_user& _owner, + const rgw_bucket& _bucket, const rgw_obj_key& _key, TopicsRef *_topics) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), env(_env), - owner(_bucket_info.owner), - bucket(_bucket_info.bucket), + owner(_owner), + bucket(_bucket), key(_key), ups(_sync_env->store, owner), topics(_topics) { @@ -823,6 +878,10 @@ public: class RGWPSHandleObjEvent : public RGWCoroutine { RGWDataSyncEnv *sync_env; PSEnvRef env; + std::array owners; + rgw_user& owner; + rgw_user& no_owner; + std::array::iterator oiter; EventRef event; vector::iterator titer; @@ -832,12 +891,16 @@ class RGWPSHandleObjEvent : public RGWCoroutine { public: RGWPSHandleObjEvent(RGWDataSyncEnv *_sync_env, PSEnvRef _env, + const rgw_user& _owner, EventRef& _event, TopicsRef& _topics) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), env(_env), + owner(owners[0]), + no_owner(owners[1]), event(_event), topics(_topics) { + owner = _owner; } int operate() override { reenter(this) { @@ -852,16 +915,21 @@ public: for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl; - yield env->manager->call_get_subscription_cr(this, *siter, &sub); - if (retcode < 0) { - ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl; - continue; - } + for (oiter = owners.begin(); oiter != owners.end(); ++oiter) { + /* + * once for the global subscriptions, once for the user specific subscriptions + */ + yield env->manager->call_get_subscription_cr(this, *oiter, *siter, &sub); + if (retcode < 0) { + ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl; + continue; + } - yield call(sub->store_event_cr(event)); - if (retcode < 0) { - ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; - continue; + yield call(sub->store_event_cr(event)); + if (retcode < 0) { + ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; + continue; + } } #warning push notification @@ -916,7 +984,7 @@ public: } } - yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics)); + yield call(new RGWPSHandleObjEvent(sync_env, env, bucket_info.owner, event, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -969,7 +1037,7 @@ public: int operate() override { reenter(this) { - yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics)); + yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner, bucket_info.bucket, key, &topics)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); @@ -991,7 +1059,8 @@ public: class RGWPSGenericObjEventCBCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; PSEnvRef env; - RGWBucketInfo bucket_info; + rgw_user owner; + rgw_bucket bucket; rgw_obj_key key; ceph::real_time mtime; RGWPubSubEventType event_type; @@ -1004,29 +1073,31 @@ public: RGWPubSubEventType _event_type) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), env(_env), - bucket_info(_bucket_info), key(_key), + owner(_bucket_info.owner), + bucket(_bucket_info.bucket), + key(_key), mtime(_mtime), event_type(_event_type) {} int operate() override { reenter(this) { ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone - << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; - yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics)); + << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl; + yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, &topics)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); } if (topics->empty()) { - ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl; + ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl; return set_cr_done(); } make_event_ref(&event); event->event = event_type; - event->bucket = bucket_info.bucket; + event->bucket = bucket; event->key = key; event->mtime = mtime; event->timestamp = real_clock::now(); - yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics)); + yield call(new RGWPSHandleObjEvent(sync_env, env, owner, event, topics)); if (retcode < 0) { return set_cr_error(retcode); }