]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: fetch topics via a cr, other changes
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 2 Jul 2018 03:20:01 +0000 (20:20 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
this will allow fetching topics info from rados, so that we
don't need to hold in in memory.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync_module_pubsub.cc

index fbd3eb389066241b28e6e252a5e0374c394fd888..1768fb23c33332cefb76d99a602e165972e3c52e 100644 (file)
@@ -102,7 +102,6 @@ struct PSSubConfig { /* subscription config */
     push_endpoint = config["push_endpoint"];
     string default_bucket_name = data_bucket_prefix + name;
     data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
-#warning use data_oid_prefix for oid generation
     data_oid_prefix = config["data_oid_prefix"];
   }
 };
@@ -155,6 +154,8 @@ static string json_str(const char *name, const T& obj, bool pretty = false)
   return ss.str();
 }
 
+using TopicsRef = std::shared_ptr<vector<PSTopicConfig *>>;
+
 
 struct PSConfig {
   string id{"pubsub"};
@@ -236,10 +237,10 @@ struct PSConfig {
     sync_instance = instance_id;
   }
 
-  void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, vector<PSTopicConfig *> *result) {
+  void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
     string path = bucket.name + "/" + key.name;
 
-    result->clear();
+    (*result)->clear();
 
     auto iter = notifications.upper_bound(path);
     if (iter == notifications.begin()) {
@@ -268,7 +269,7 @@ struct PSConfig {
       }
 
       ldout(cct, 10) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id << " target_path=" << target.path << ", topic=" << target.topic << dendl;
-      result->push_back(&topic->second);
+      (*result)->push_back(&topic->second);
     } while (iter != notifications.begin());
   }
 
@@ -718,7 +719,7 @@ class RGWPSInitEnvCBCR : public RGWCoroutine {
   rgw_get_user_info_params get_user_info;
 public:
   RGWPSInitEnvCBCR(RGWDataSyncEnv *_sync_env,
-                       PSEnvRef _env) : RGWCoroutine(_sync_env->cct),
+                       PSEnvRef& _env) : RGWCoroutine(_sync_env->cct),
                                                     sync_env(_sync_env),
                                                     env(_env), conf(env->conf) {}
   int operate() override {
@@ -752,33 +753,63 @@ public:
   }
 };
 
+class RGWPSFindBucketTopicsCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  PSEnvRef env;
+  rgw_bucket bucket;
+  rgw_obj_key key;
+
+  TopicsRef *topics;
+public:
+  RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env,
+                      PSEnvRef& _env,
+                      const rgw_bucket& _bucket,
+                      const rgw_obj_key& _key,
+                      TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
+                                                          sync_env(_sync_env),
+                                                          env(_env),
+                                                          bucket(_bucket),
+                                                          key(_key),
+                                                          topics(_topics) {
+    *topics = std::make_shared<vector<PSTopicConfig *> >();
+  }
+  int operate() override {
+    reenter(this) {
+#warning this will need to change
+      env->conf->get_topics(sync_env->cct, bucket, key, topics);
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
 class RGWPSHandleObjEvent : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
   EventRef event;
 
-  vector<PSTopicConfig *> topics;
   vector<PSTopicConfig *>::iterator titer;
   set<string>::iterator siter;
   PSSubscriptionRef sub;
+  TopicsRef topics;
 public:
   RGWPSHandleObjEvent(RGWDataSyncEnv *_sync_env,
                       PSEnvRef _env,
-                      EventRef& _event) : RGWCoroutine(_sync_env->cct),
+                      EventRef& _event,
+                      TopicsRef& _topics) : RGWCoroutine(_sync_env->cct),
                                           sync_env(_sync_env),
                                           env(_env),
-                                          event(_event) {
-#warning this will need to change
-    env->conf->get_topics(sync_env->cct, event->bucket, event->key, &topics);
+                                          event(_event),
+                                          topics(_topics) {
   }
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 10) << ": handle event: obj: z=" << sync_env->source_zone
                                << " event=" << json_str("event", *event, false) << dendl;
 
-      ldout(sync_env->cct, 20) << "pubsub: " << topics.size() << " topics found for path" << dendl;
+      ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
 
-      for (titer = topics.begin(); titer != topics.end(); ++titer) {
+      for (titer = topics->begin(); titer != topics->end(); ++titer) {
         ldout(sync_env->cct, 10) << ": notification for " << event->bucket << "/" << event->key << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
 
         for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
@@ -814,13 +845,16 @@ class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
   PSEnvRef env;
   uint64_t versioned_epoch;
   EventRef event;
+  TopicsRef topics;
 public:
   RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
                           RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                          PSEnvRef _env, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+                          PSEnvRef _env, uint64_t _versioned_epoch,
+                          TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
                                                                       sync_env(_sync_env),
                                                                       env(_env),
-                                                                      versioned_epoch(_versioned_epoch) {
+                                                                      versioned_epoch(_versioned_epoch),
+                                                                      topics(_topics) {
   }
   int operate() override {
     reenter(this) {
@@ -833,8 +867,19 @@ public:
       event->mtime = mtime;
       event->event = OBJECT_CREATE;
       event->timestamp = real_clock::now();
+      {
+        for (auto& attr : attrs) {
+          string k = attr.first;
+          if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
+            k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
+          }
+          string v = attr.second.to_str();
+          auto p = std::make_pair(k, v);
+          event->attrs.push_back(p);
+        }
+      }
 
-      yield call(new RGWPSHandleObjEvent(sync_env, env, event));
+      yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -847,22 +892,62 @@ public:
 class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
   PSEnvRef env;
   uint64_t versioned_epoch;
+  TopicsRef topics;
 public:
   RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
                         RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                        PSEnvRef _env, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
-                                                           env(_env), versioned_epoch(_versioned_epoch) {
+                        PSEnvRef _env, uint64_t _versioned_epoch,
+                        TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+                                                           env(_env), versioned_epoch(_versioned_epoch),
+                                                           topics(_topics) {
   }
 
   ~RGWPSHandleRemoteObjCR() override {}
 
   RGWStatRemoteObjCBCR *allocate_callback() override {
-#warning things need to change
-    /* FIXME: we need to create a pre_callback coroutine that decides whether object should
-     * actually be handled. Otherwise we fetch info from remote zone about every object, even
-     * if we don't intend to handle it.
-     */
-    return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch);
+    return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch, topics);
+  }
+};
+
+class RGWPSHandleObjCreateCR : public RGWCoroutine {
+  
+  RGWDataSyncEnv *sync_env;
+  RGWBucketInfo bucket_info;
+  rgw_obj_key key;
+  PSEnvRef env;
+  uint64_t versioned_epoch;
+  TopicsRef topics;
+public:
+  RGWPSHandleObjCreateCR(RGWDataSyncEnv *_sync_env,
+                       RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+                       PSEnvRef _env, uint64_t _versioned_epoch) : RGWCoroutine(_sync_env->cct),
+                                                                   sync_env(_sync_env),
+                                                                   bucket_info(_bucket_info),
+                                                                   key(_key),
+                                                                   env(_env),
+                                                                   versioned_epoch(_versioned_epoch) {
+  }
+
+  ~RGWPSHandleObjCreateCR() override {}
+
+  int operate() override {
+    reenter(this) {
+      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics));
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
+        return set_cr_error(retcode);
+      }
+      if (topics->empty()) {
+        ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl;
+        return set_cr_done();
+      }
+      yield call(new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch, topics));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+      return set_cr_done();
+    }
+    return 0;
   }
 };
 
@@ -874,6 +959,7 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine {
   ceph::real_time mtime;
   RGWPubSubEventType event_type;
   EventRef event;
+  TopicsRef topics;
 public:
   RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
                            PSEnvRef _env,
@@ -887,6 +973,15 @@ public:
     reenter(this) {
       ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
                                << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
+      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.bucket, key, &topics));
+      if (retcode < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
+        return set_cr_error(retcode);
+      }
+      if (topics->empty()) {
+        ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl;
+        return set_cr_done();
+      }
       make_event_ref(&event);
       event->event = event_type;
       event->bucket = bucket_info.bucket;
@@ -894,7 +989,7 @@ public:
       event->mtime = mtime;
       event->timestamp = real_clock::now();
 
-      yield call(new RGWPSHandleObjEvent(sync_env, env, event));
+      yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -925,35 +1020,16 @@ public:
   }
   RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
     ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
-#warning this should be done correctly
-#if 0
-    if (!conf->should_handle_operation(bucket_info)) {
-      ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
-      return nullptr;
-    }
-#endif
-    return new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch);
+    return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch);
   }
   RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
     ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-#warning this should be done correctly
-#if 0
-    if (!conf->should_handle_operation(bucket_info)) {
-      ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
-      return nullptr;
-    }
-#endif
     return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, OBJECT_DELETE);
   }
   RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
     ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
                             << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-#warning requests should be filtered correctly
-#if 0
-    ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
-#endif
-#warning delete markers need to be handled too
     return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, DELETE_MARKER_CREATE);
   }
 };