]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: pubsub: fetch user subs
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 3 Jul 2018 18:39:39 +0000 (11:39 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync_module_pubsub.cc

index 6a19d5f7e1d799962c3402ca4ed4b07de04ceff6..8ed7261596632132a315bef873b27e68b5b9b892 100644 (file)
@@ -88,6 +88,14 @@ struct PSSubConfig { /* subscription config */
   string data_bucket_name;
   string data_oid_prefix;
 
+  void from_user_conf(const rgw_pubsub_user_sub_config& uc) {
+    name = uc.name;
+    topic = uc.topic;
+    push_endpoint = uc.dest.push_endpoint;
+    data_bucket_name = uc.dest.bucket_name;
+    data_oid_prefix = uc.dest.oid_prefix;
+  }
+
   void dump(Formatter *f) const {
     encode_json("name", name, f);
     encode_json("topic", topic, f);
@@ -569,19 +577,26 @@ class PSSubscription {
   };
 
 public:
-
   PSSubscription(RGWDataSyncEnv *_sync_env,
                  PSEnvRef _env,
                  PSSubConfigRef& _sub_conf) : sync_env(_sync_env),
                                       env(_env),
                                       sub_conf(_sub_conf),
                                       data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
+
+  PSSubscription(RGWDataSyncEnv *_sync_env,
+                 PSEnvRef _env,
+                 rgw_pubsub_user_sub_config& user_sub_conf) : sync_env(_sync_env),
+                                      env(_env),
+                                      data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
+    sub_conf->from_user_conf(user_sub_conf);
+  }
   virtual ~PSSubscription() {
     if (init_cr) {
       init_cr->put();
     }
   }
-public:
+
   static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
                                 PSEnvRef _env,
                                 PSSubConfigRef& _sub_conf) {
@@ -591,6 +606,15 @@ public:
     sub->init_cr->get();
     return sub->self;
   }
+  static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
+                                PSEnvRef _env,
+                                rgw_pubsub_user_sub_config& _sub_conf) {
+    auto sub = new PSSubscription(_sync_env, _env, _sub_conf);
+    sub->self = std::shared_ptr<PSSubscription>(sub);
+    sub->init_cr = new InitCR(_sync_env, sub->self);
+    sub->init_cr->get();
+    return sub->self;
+  }
 
   int call_init_cr(RGWCoroutine *caller) {
     return init_cr->execute(caller);
@@ -616,19 +640,24 @@ class PSManager
   class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
     RGWDataSyncEnv *sync_env;
     PSManagerRef mgr;
-    const string& sub_name;
+    rgw_user owner;
+    string sub_name;
+    string sub_id;
     PSSubscriptionRef *ref;
 
-    PSConfigRef& conf;
+    PSConfigRef conf;
 
     PSSubConfigRef sub_conf;
+    rgw_pubsub_user_sub_config user_sub_conf;
   public:
     GetSubCR(RGWDataSyncEnv *_sync_env,
                       PSManagerRef& _mgr,
+                      const rgw_user& _owner,
                       const string& _sub_name,
                       PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sync_env->cct),
                                                  sync_env(_sync_env),
                                                  mgr(_mgr),
+                                                 owner(_owner),
                                                  sub_name(_sub_name),
                                                  ref(_ref),
                                                  conf(mgr->env->conf) {
@@ -639,12 +668,30 @@ class PSManager
 
     int operate() override {
       reenter(this) {
-        if (!conf->find_sub(sub_name, &sub_conf)) {
-          ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
-          return set_cr_error(-ENOENT);
-        }
+        if (owner.empty()) {
+          if (!conf->find_sub(sub_name, &sub_conf)) {
+            ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
+            return set_cr_error(-ENOENT);
+          }
 
-        *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
+          *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
+        } else {
+          using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_sub_config>;
+          yield {
+            RGWUserPubSub ups(sync_env->store, owner);
+            rgw_raw_obj obj;
+            ups.get_sub_meta_obj(sub_name, &obj);
+            bool empty_on_enoent = false;
+            call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
+                                obj,
+                                &user_sub_conf, empty_on_enoent));
+          }
+          if (retcode < 0) {
+            return set_cr_error(retcode);
+          }
+
+          *ref = PSSubscription::get_shared(sync_env, mgr->env, user_sub_conf);
+        }
 
         yield (*ref)->call_init_cr(this);
         if (retcode < 0) {
@@ -652,7 +699,9 @@ class PSManager
           return set_cr_error(retcode);
         }
 
-        mgr->subs[sub_name] = *ref;
+        if (owner.empty()) {
+          mgr->subs[sub_name] = *ref;
+        }
 
         return set_cr_done();
       }
@@ -694,16 +743,21 @@ public:
     return mgr->self;
   }
 
-  int call_get_subscription_cr(RGWCoroutine *caller, const string& sub_name, PSSubscriptionRef *ref) {
-    if (find_sub_instance(sub_name, ref)) {
+  int call_get_subscription_cr(RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
+    string owner_prefix;
+    if (!owner.empty()) {
+      owner_prefix = owner.to_str() + "/";
+    }
+
+    string sub_instance = owner_prefix + sub_name;
+    if (find_sub_instance(sub_instance, ref)) {
       /* found it! nothing to execute */
       ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl;
-      return 0;
     }
-    auto& gs = get_subs[sub_name];
+    auto& gs = get_subs[sub_instance];
     if (!gs) {
       ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl;
-      gs = new GetSubCR(sync_env, self, sub_name, ref);
+      gs = new GetSubCR(sync_env, self, owner, sub_name, ref);
     }
     ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl;
     return gs->execute(caller, ref);
@@ -775,13 +829,14 @@ class RGWPSFindBucketTopicsCR : public RGWCoroutine {
 public:
   RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env,
                       PSEnvRef& _env,
-                      const RGWBucketInfo& _bucket_info,
+                      const rgw_user& _owner,
+                      const rgw_bucket& _bucket,
                       const rgw_obj_key& _key,
                       TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
                                                           sync_env(_sync_env),
                                                           env(_env),
-                                                          owner(_bucket_info.owner),
-                                                          bucket(_bucket_info.bucket),
+                                                          owner(_owner),
+                                                          bucket(_bucket),
                                                           key(_key),
                                                           ups(_sync_env->store, owner),
                                                           topics(_topics) {
@@ -823,6 +878,10 @@ public:
 class RGWPSHandleObjEvent : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
+  std::array<rgw_user, 2> owners;
+  rgw_user& owner;
+  rgw_user& no_owner;
+  std::array<rgw_user, 2>::iterator oiter;
   EventRef event;
 
   vector<PSTopicConfigRef>::iterator titer;
@@ -832,12 +891,16 @@ class RGWPSHandleObjEvent : public RGWCoroutine {
 public:
   RGWPSHandleObjEvent(RGWDataSyncEnv *_sync_env,
                       PSEnvRef _env,
+                      const rgw_user& _owner,
                       EventRef& _event,
                       TopicsRef& _topics) : RGWCoroutine(_sync_env->cct),
                                           sync_env(_sync_env),
                                           env(_env),
+                                          owner(owners[0]),
+                                          no_owner(owners[1]),
                                           event(_event),
                                           topics(_topics) {
+    owner = _owner;
   }
   int operate() override {
     reenter(this) {
@@ -852,16 +915,21 @@ public:
         for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
           ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
 
-          yield env->manager->call_get_subscription_cr(this, *siter, &sub);
-          if (retcode < 0) {
-            ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
-            continue;
-          }
+          for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
+            /*
+             * once for the global subscriptions, once for the user specific subscriptions
+             */
+            yield env->manager->call_get_subscription_cr(this, *oiter, *siter, &sub);
+            if (retcode < 0) {
+              ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
+              continue;
+            }
 
-          yield call(sub->store_event_cr(event));
-          if (retcode < 0) {
-            ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
-            continue;
+            yield call(sub->store_event_cr(event));
+            if (retcode < 0) {
+              ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+              continue;
+            }
           }
 
 #warning push notification
@@ -916,7 +984,7 @@ public:
         }
       }
 
-      yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics));
+      yield call(new RGWPSHandleObjEvent(sync_env, env, bucket_info.owner, event, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -969,7 +1037,7 @@ public:
 
   int operate() override {
     reenter(this) {
-      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics));
+      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner, bucket_info.bucket, key, &topics));
       if (retcode < 0) {
         ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
         return set_cr_error(retcode);
@@ -991,7 +1059,8 @@ public:
 class RGWPSGenericObjEventCBCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
-  RGWBucketInfo bucket_info;
+  rgw_user owner;
+  rgw_bucket bucket;
   rgw_obj_key key;
   ceph::real_time mtime;
   RGWPubSubEventType event_type;
@@ -1004,29 +1073,31 @@ public:
                            RGWPubSubEventType _event_type) : RGWCoroutine(_sync_env->cct),
                                                              sync_env(_sync_env),
                                                              env(_env),
-                                                             bucket_info(_bucket_info), key(_key),
+                                                             owner(_bucket_info.owner),
+                                                             bucket(_bucket_info.bucket),
+                                                             key(_key),
                                                              mtime(_mtime), event_type(_event_type) {}
   int operate() override {
     reenter(this) {
       ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
-                               << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
-      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info, key, &topics));
+                               << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
+      yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, &topics));
       if (retcode < 0) {
         ldout(sync_env->cct, 0) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
         return set_cr_error(retcode);
       }
       if (topics->empty()) {
-        ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl;
+        ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
         return set_cr_done();
       }
       make_event_ref(&event);
       event->event = event_type;
-      event->bucket = bucket_info.bucket;
+      event->bucket = bucket;
       event->key = key;
       event->mtime = mtime;
       event->timestamp = real_clock::now();
 
-      yield call(new RGWPSHandleObjEvent(sync_env, env, event, topics));
+      yield call(new RGWPSHandleObjEvent(sync_env, env, owner, event, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }