From: Yehuda Sadeh Date: Sat, 30 Jun 2018 04:10:10 +0000 (-0700) Subject: rgw: distribution of events to subscriptions X-Git-Tag: v14.1.0~616^2~60 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=6cff97f871bd922fb6d795a6c1d7224f9cd89d0c;p=ceph.git rgw: distribution of events to subscriptions more work required Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index a6e9aaf10381..3f1cf0109880 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -165,10 +165,10 @@ struct PSConfig { sync_instance = instance_id; } - void get_notifs(const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector *notifs) { + void get_topics(CephContext *cct, const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector *result) { string path = bucket_info.bucket.name + "/" + key.name; - notifs->clear(); + result->clear(); auto iter = notifications.upper_bound(path); if (iter == notifications.begin()) { @@ -184,14 +184,20 @@ struct PSConfig { break; } - PSNotificationConfig *target = &iter->second; + PSNotificationConfig& target = iter->second; - if (!target->is_prefix && + if (!target.is_prefix && path.size() != iter->first.size()) { continue; } - notifs->push_back(target); + auto topic = topics.find(target.topic); + if (topic == topics.end()) { + continue; + } + + ldout(cct, 10) << ": found topic for path=" << bucket_info.bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl; + result->push_back(&topic->second); } while (iter != notifications.begin()); } @@ -208,6 +214,10 @@ struct PSConfig { using PSConfigRef = std::shared_ptr; using EventRef = std::shared_ptr; +static void make_event_ref(EventRef *event) { + *event = std::make_shared(); +} + class PSManager; using PSManagerRef = std::shared_ptr; @@ -530,39 +540,57 @@ public: }; class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { + RGWDataSyncEnv *sync_env; PSEnvRef env; uint64_t versioned_epoch; - vector notifs; - vector::iterator niter; + vector topics; + vector::iterator titer; + set::iterator siter; + PSSubscriptionRef sub; + EventRef event; public: RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), env(_env), - versioned_epoch(_versioned_epoch) { + PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), + sync_env(_sync_env), + env(_env), + versioned_epoch(_versioned_epoch) { #warning this will need to change obviously - env->conf->get_notifs(_bucket_info, _key, ¬ifs); + env->conf->get_topics(sync_env->cct, _bucket_info, _key, &topics); } int operate() override { reenter(this) { ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " attrs=" << attrs << dendl; + make_event_ref(&event); + event->bucket = bucket_info.bucket; + event->key = key; + event->event = OBJECT_CREATE; + event->timestamp = real_clock::now(); + ldout(sync_env->cct, 20) << "pubsub: " << topics.size() << " topics found for path" << dendl; +#warning more event init - for (niter = notifs.begin(); niter != notifs.end(); ++niter) { - yield { - ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": id=" << (*niter)->id << " path=" << (*niter)->path << ", topic=" << (*niter)->topic << dendl; + for (titer = topics.begin(); titer != topics.end(); ++titer) { + ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; -#warning publish notification -#if 0 - string path = conf->get_obj_path(bucket_info, key); - es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch); + for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { + ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl; - call(new RGWPutRESTResourceCR(sync_env->cct, conf->conn.get(), - sync_env->http_manager, - path, nullptr /* params */, - doc, nullptr /* result */)); -#endif + yield call(env->manager->get_subscription_cr(*siter, &sub)); + if (retcode < 0) { + ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl; + continue; + } + + yield call(sub->store_event_cr(event)); + if (retcode < 0) { + ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; + continue; + } + +#warning publish notification } if (retcode < 0) { return set_cr_error(retcode);