]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: distribution of events to subscriptions
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 30 Jun 2018 04:10:10 +0000 (21:10 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
more work required

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync_module_pubsub.cc

index a6e9aaf103811843c62fb5d73be682ea915ade36..3f1cf0109880f595e0e8f72606b361aefa5f880d 100644 (file)
@@ -165,10 +165,10 @@ struct PSConfig {
     sync_instance = instance_id;
   }
 
-  void get_notifs(const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector<PSNotificationConfig *> *notifs) {
+  void get_topics(CephContext *cct, const RGWBucketInfo& bucket_info, const rgw_obj_key& key, vector<PSTopicConfig *> *result) {
     string path = bucket_info.bucket.name + "/" + key.name;
 
-    notifs->clear();
+    result->clear();
 
     auto iter = notifications.upper_bound(path);
     if (iter == notifications.begin()) {
@@ -184,14 +184,20 @@ struct PSConfig {
         break;
       }
 
-      PSNotificationConfig *target = &iter->second;
+      PSNotificationConfig& target = iter->second;
 
-      if (!target->is_prefix &&
+      if (!target.is_prefix &&
           path.size() != iter->first.size()) {
         continue;
       }
 
-      notifs->push_back(target);
+      auto topic = topics.find(target.topic);
+      if (topic == topics.end()) {
+        continue;
+      }
+
+      ldout(cct, 10) << ": found topic for path=" << bucket_info.bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
+      result->push_back(&topic->second);
     } while (iter != notifications.begin());
   }
 
@@ -208,6 +214,10 @@ struct PSConfig {
 using PSConfigRef = std::shared_ptr<PSConfig>;
 using EventRef = std::shared_ptr<rgw_pubsub_event>;
 
+static void make_event_ref(EventRef *event) {
+  *event = std::make_shared<rgw_pubsub_event>();
+}
+
 class PSManager;
 using PSManagerRef = std::shared_ptr<PSManager>;
 
@@ -530,39 +540,57 @@ public:
 };
 
 class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+  RGWDataSyncEnv *sync_env;
   PSEnvRef env;
   uint64_t versioned_epoch;
-  vector<PSNotificationConfig *> notifs;
-  vector<PSNotificationConfig *>::iterator niter;
+  vector<PSTopicConfig *> topics;
+  vector<PSTopicConfig *>::iterator titer;
+  set<string>::iterator siter;
+  PSSubscriptionRef sub;
+  EventRef event;
 public:
   RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
                           RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                          PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), env(_env),
-                                                                               versioned_epoch(_versioned_epoch) {
+                          PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+                                                                      sync_env(_sync_env),
+                                                                      env(_env),
+                                                                      versioned_epoch(_versioned_epoch) {
 #warning this will need to change obviously
-    env->conf->get_notifs(_bucket_info, _key, &notifs);
+    env->conf->get_topics(sync_env->cct, _bucket_info, _key, &topics);
   }
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
                                << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
                                << " attrs=" << attrs << dendl;
+      make_event_ref(&event);
+      event->bucket = bucket_info.bucket;
+      event->key = key;
+      event->event = OBJECT_CREATE;
+      event->timestamp = real_clock::now();
 
+      ldout(sync_env->cct, 20) << "pubsub: " << topics.size() << " topics found for path" << dendl;
+#warning more event init
 
-      for (niter = notifs.begin(); niter != notifs.end(); ++niter) {
-        yield {
-          ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": id=" << (*niter)->id << " path=" << (*niter)->path << ", topic=" << (*niter)->topic << dendl;
+      for (titer = topics.begin(); titer != topics.end(); ++titer) {
+        ldout(sync_env->cct, 10) << ": notification for " << bucket_info.bucket << "/" << key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
 
-#warning publish notification
-#if 0
-        string path = conf->get_obj_path(bucket_info, key);
-        es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
+        for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
+          ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
 
-        call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
-                                                            sync_env->http_manager,
-                                                            path, nullptr /* params */,
-                                                            doc, nullptr /* result */));
-#endif
+          yield call(env->manager->get_subscription_cr(*siter, &sub));
+          if (retcode < 0) {
+            ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
+            continue;
+          }
+
+          yield call(sub->store_event_cr(event));
+          if (retcode < 0) {
+            ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+            continue;
+          }
+
+#warning publish notification
         }
         if (retcode < 0) {
           return set_cr_error(retcode);