From d5371502cd2da777a9c8f73a026f009f0d5209b4 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Sun, 1 Jul 2018 20:20:01 -0700 Subject: [PATCH] rgw: fetch topics via a cr, other changes this will allow fetching topics info from rados, so that we don't need to hold in in memory. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_sync_module_pubsub.cc | 164 ++++++++++++++++++++++-------- 1 file changed, 120 insertions(+), 44 deletions(-) diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index fbd3eb38906..1768fb23c33 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -102,7 +102,6 @@ struct PSSubConfig { /* subscription config */ push_endpoint = config["push_endpoint"]; string default_bucket_name = data_bucket_prefix + name; data_bucket_name = config["data_bucket"](default_bucket_name.c_str()); -#warning use data_oid_prefix for oid generation data_oid_prefix = config["data_oid_prefix"]; } }; @@ -155,6 +154,8 @@ static string json_str(const char *name, const T& obj, bool pretty = false) return ss.str(); } +using TopicsRef = std::shared_ptr>; + struct PSConfig { string id{"pubsub"}; @@ -236,10 +237,10 @@ struct PSConfig { sync_instance = instance_id; } - void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, vector *result) { + void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) { string path = bucket.name + "/" + key.name; - result->clear(); + (*result)->clear(); auto iter = notifications.upper_bound(path); if (iter == notifications.begin()) { @@ -268,7 +269,7 @@ struct PSConfig { } ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl; - result->push_back(&topic->second); + (*result)->push_back(&topic->second); } while (iter != notifications.begin()); } @@ -718,7 +719,7 @@ class RGWPSInitEnvCBCR : public RGWCoroutine { rgw_get_user_info_params get_user_info; public: RGWPSInitEnvCBCR(RGWDataSyncEnv *_sync_env, - PSEnvRef _env) : RGWCoroutine(_sync_env->cct), + PSEnvRef& _env) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), env(_env), conf(env->conf) {} int operate() override { @@ -752,33 +753,63 @@ public: } }; +class RGWPSFindBucketTopicsCR : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + PSEnvRef env; + rgw_bucket bucket; + rgw_obj_key key; + + TopicsRef *topics; +public: + RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env, + PSEnvRef& _env, + const rgw_bucket& _bucket, + const rgw_obj_key& _key, + TopicsRef *_topics) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + env(_env), + bucket(_bucket), + key(_key), + topics(_topics) { + *topics = std::make_shared >(); + } + int operate() override { + reenter(this) { +#warning this will need to change + env->conf->get_topics(sync_env->cct, bucket, key, topics); + return set_cr_done(); + } + return 0; + } +}; + class RGWPSHandleObjEvent : public RGWCoroutine { RGWDataSyncEnv *sync_env; PSEnvRef env; EventRef event; - vector topics; vector::iterator titer; set::iterator siter; PSSubscriptionRef sub; + TopicsRef topics; public: RGWPSHandleObjEvent(RGWDataSyncEnv *_sync_env, PSEnvRef _env, - EventRef& _event) : RGWCoroutine(_sync_env->cct), + EventRef& _event, + TopicsRef& _topics) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), env(_env), - event(_event) { -#warning this will need to change - env->conf->get_topics(sync_env->cct, event->bucket, event->key, &topics); + event(_event), + topics(_topics) { } int operate() override { reenter(this) { ldout(sync_env->cct, 10) << ": handle event: obj: z=" << sync_env->source_zone << " event=" << json_str("event", *event, false) << dendl; - ldout(sync_env->cct, 20) << "pubsub: " << topics.size() << " topics found for path" << dendl; + ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl; - for (titer = topics.begin(); titer != topics.end(); ++titer) { + for (titer = topics->begin(); titer != topics->end(); ++titer) { ldout(sync_env->cct, 10) << ": notification for " << event->bucket << "/" << event->key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { @@ -814,13 +845,16 @@ class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { PSEnvRef env; uint64_t versioned_epoch; EventRef event; + TopicsRef topics; public: RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), + PSEnvRef _env, uint64_t _versioned_epoch, + TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), sync_env(_sync_env), env(_env), - versioned_epoch(_versioned_epoch) { + versioned_epoch(_versioned_epoch), + topics(_topics) { } int operate() override { reenter(this) { @@ -833,8 +867,19 @@ public: event->mtime = mtime; event->event = OBJECT_CREATE; event->timestamp = real_clock::now(); + { + for (auto& attr : attrs) { + string k = attr.first; + if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) { + k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1); + } + string v = attr.second.to_str(); + auto p = std::make_pair(k, v); + event->attrs.push_back(p); + } + } - yield call(new RGWPSHandleObjEvent(sync_env, env, event)); + yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -847,22 +892,62 @@ public: class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { PSEnvRef env; uint64_t versioned_epoch; + TopicsRef topics; public: RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - PSEnvRef _env, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), - env(_env), versioned_epoch(_versioned_epoch) { + PSEnvRef _env, uint64_t _versioned_epoch, + TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + env(_env), versioned_epoch(_versioned_epoch), + topics(_topics) { } ~RGWPSHandleRemoteObjCR() override {} RGWStatRemoteObjCBCR *allocate_callback() override { -#warning things need to change - /* FIXME: we need to create a pre_callback coroutine that decides whether object should - * actually be handled. Otherwise we fetch info from remote zone about every object, even - * if we don't intend to handle it. - */ - return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch); + return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch, topics); + } +}; + +class RGWPSHandleObjCreateCR : public RGWCoroutine { + + RGWDataSyncEnv *sync_env; + RGWBucketInfo bucket_info; + rgw_obj_key key; + PSEnvRef env; + uint64_t versioned_epoch; + TopicsRef topics; +public: + RGWPSHandleObjCreateCR(RGWDataSyncEnv *_sync_env, + RGWBucketInfo& _bucket_info, rgw_obj_key& _key, + PSEnvRef _env, uint64_t _versioned_epoch) : RGWCoroutine(_sync_env->cct), + sync_env(_sync_env), + bucket_info(_bucket_info), + key(_key), + env(_env), + versioned_epoch(_versioned_epoch) { + } + + ~RGWPSHandleObjCreateCR() override {} + + int operate() override { + reenter(this) { + yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; + return set_cr_error(retcode); + } + if (topics->empty()) { + ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl; + return set_cr_done(); + } + yield call(new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch, topics)); + if (retcode < 0) { + return set_cr_error(retcode); + } + return set_cr_done(); + } + return 0; } }; @@ -874,6 +959,7 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine { ceph::real_time mtime; RGWPubSubEventType event_type; EventRef event; + TopicsRef topics; public: RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env, PSEnvRef _env, @@ -887,6 +973,15 @@ public: reenter(this) { ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; + yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics)); + if (retcode < 0) { + ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; + return set_cr_error(retcode); + } + if (topics->empty()) { + ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl; + return set_cr_done(); + } make_event_ref(&event); event->event = event_type; event->bucket = bucket_info.bucket; @@ -894,7 +989,7 @@ public: event->mtime = mtime; event->timestamp = real_clock::now(); - yield call(new RGWPSHandleObjEvent(sync_env, env, event)); + yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics)); if (retcode < 0) { return set_cr_error(retcode); } @@ -925,35 +1020,16 @@ public: } RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl; -#warning this should be done correctly -#if 0 - if (!conf->should_handle_operation(bucket_info)) { - ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; - return nullptr; - } -#endif - return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch); + return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch); } RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; -#warning this should be done correctly -#if 0 - if (!conf->should_handle_operation(bucket_info)) { - ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl; - return nullptr; - } -#endif return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, OBJECT_DELETE); } RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; -#warning requests should be filtered correctly -#if 0 - ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl; -#endif -#warning delete markers need to be handled too return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE); } }; -- 2.39.5