]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: pubsub: multiple fixes
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 3 Jul 2018 21:43:44 +0000 (14:43 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync_module_pubsub.cc

index 8ed7261596632132a315bef873b27e68b5b9b892..71c0ac2a90bacc9eef658bcbf9d771d9927dd154 100644 (file)
@@ -54,7 +54,7 @@ config:
 {
    "tenant": <tenant>,             # default: <empty>
    "uid": <uid>,                   # default: "pubsub"
-   "data_bucket_prefix": <prefix>  # default: "pubsub"
+   "data_bucket_prefix": <prefix>  # default: "pubsub-"
    "data_oid_prefix": <prefix>     #
 
     # non-dynamic config
@@ -224,10 +224,9 @@ struct PSConfig {
   void init(CephContext *cct, const JSONFormattable& config) {
     string uid = config["uid"]("pubsub");
     user = rgw_user(config["tenant"], uid);
-    data_bucket_prefix = config["data_bucket_prefix"]("pubsub");
+    data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
     data_oid_prefix = config["data_oid_prefix"];
 
-    /* FIXME: this will be dynamically configured */
     for (auto& c : config["notifications"].array()) {
       PSNotificationConfig nc;
       nc.id = ++max_id;
@@ -454,7 +453,6 @@ class PSSubscription;
 using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
 
 class PSSubscription {
-  PSSubscriptionRef self;
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
   PSSubConfigRef sub_conf;
@@ -597,31 +595,22 @@ public:
     }
   }
 
-  static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
+  template <class C>
+  static PSSubscriptionRef get_shared(RGWDataSyncEnv *_sync_env,
                                 PSEnvRef _env,
-                                PSSubConfigRef& _sub_conf) {
-    auto sub = new PSSubscription(_sync_env, _env, _sub_conf);
-    sub->self = std::shared_ptr<PSSubscription>(sub);
-    sub->init_cr = new InitCR(_sync_env, sub->self);
+                                C& _sub_conf) {
+    auto sub = std::make_shared<PSSubscription>(_sync_env, _env, _sub_conf);
+    sub->init_cr = new InitCR(_sync_env, sub);
     sub->init_cr->get();
-    return sub->self;
-  }
-  static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
-                                PSEnvRef _env,
-                                rgw_pubsub_user_sub_config& _sub_conf) {
-    auto sub = new PSSubscription(_sync_env, _env, _sub_conf);
-    sub->self = std::shared_ptr<PSSubscription>(sub);
-    sub->init_cr = new InitCR(_sync_env, sub->self);
-    sub->init_cr->get();
-    return sub->self;
+    return sub;
   }
 
   int call_init_cr(RGWCoroutine *caller) {
     return init_cr->execute(caller);
   }
 
-  RGWCoroutine *store_event_cr(EventRef& event) {
-    return new StoreEventCR(sync_env, self, event);
+  static RGWCoroutine *store_event_cr(RGWDataSyncEnv *sync_env, PSSubscriptionRef& sub, EventRef& event) {
+    return new StoreEventCR(sync_env, sub, event);
   }
 
   friend class InitCR;
@@ -630,8 +619,6 @@ public:
 
 class PSManager
 {
-  PSManagerRef self;
-
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
 
@@ -663,7 +650,6 @@ class PSManager
                                                  conf(mgr->env->conf) {
     }
     ~GetSubCR() {
-      mgr->remove_get_sub(sub_name);
     }
 
     int operate() override {
@@ -671,6 +657,7 @@ class PSManager
         if (owner.empty()) {
           if (!conf->find_sub(sub_name, &sub_conf)) {
             ldout(sync_env->cct, 0) << "ERROR: could not find subscription config: name=" << sub_name << dendl;
+            mgr->remove_get_sub(owner, sub_name);
             return set_cr_error(-ENOENT);
           }
 
@@ -687,6 +674,7 @@ class PSManager
                                 &user_sub_conf, empty_on_enoent));
           }
           if (retcode < 0) {
+            mgr->remove_get_sub(owner, sub_name);
             return set_cr_error(retcode);
           }
 
@@ -696,12 +684,14 @@ class PSManager
         yield (*ref)->call_init_cr(this);
         if (retcode < 0) {
           ldout(sync_env->cct, 0) << "ERROR: failed to init subscription" << dendl;
+          mgr->remove_get_sub(owner, sub_name);
           return set_cr_error(retcode);
         }
 
         if (owner.empty()) {
           mgr->subs[sub_name] = *ref;
         }
+        mgr->remove_get_sub(owner, sub_name);
 
         return set_cr_done();
       }
@@ -716,14 +706,27 @@ class PSManager
     }
   };
 
+  string sub_id(const rgw_user& owner, const string& sub_name) {
+    string owner_prefix;
+    if (!owner.empty()) {
+      owner_prefix = owner.to_str() + "/";
+    }
+
+    return owner_prefix + sub_name;
+  }
+
   map<string, GetSubCR *> get_subs;
 
-  void remove_get_sub(const string& name) {
-    get_subs.erase(name);
+  GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
+    return get_subs[sub_id(owner, name)];
   }
 
-  bool find_sub_instance(const string& sub_name, PSSubscriptionRef *sub) {
-    auto iter = subs.find(sub_name);
+  void remove_get_sub(const rgw_user& owner, const string& name) {
+    get_subs.erase(sub_id(owner, name));
+  }
+
+  bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) {
+    auto iter = subs.find(sub_id(owner, sub_name));
     if (iter != subs.end()) {
       *sub = iter->second;
       return true;
@@ -736,28 +739,20 @@ class PSManager
                              env(_env) {}
 
 public:
-  static PSManagerRef& get_shared(RGWDataSyncEnv *_sync_env,
+  static PSManagerRef get_shared(RGWDataSyncEnv *_sync_env,
                                  PSEnvRef _env) {
-    auto mgr = new PSManager(_sync_env, _env);
-    mgr->self = std::shared_ptr<PSManager>(mgr);
-    return mgr->self;
+    return std::shared_ptr<PSManager>(new PSManager(_sync_env, _env));
   }
 
-  int call_get_subscription_cr(RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
-    string owner_prefix;
-    if (!owner.empty()) {
-      owner_prefix = owner.to_str() + "/";
-    }
-
-    string sub_instance = owner_prefix + sub_name;
-    if (find_sub_instance(sub_instance, ref)) {
+  static int call_get_subscription_cr(RGWDataSyncEnv *sync_env, PSManagerRef& mgr, RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
+    if (mgr->find_sub_instance(owner, sub_name, ref)) {
       /* found it! nothing to execute */
       ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl;
     }
-    auto& gs = get_subs[sub_instance];
+    auto& gs = mgr->get_get_subs(owner, sub_name);
     if (!gs) {
       ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl;
-      gs = new GetSubCR(sync_env, self, owner, sub_name, ref);
+      gs = new GetSubCR(sync_env, mgr, owner, sub_name, ref);
     }
     ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl;
     return gs->execute(caller, ref);
@@ -919,13 +914,13 @@ public:
             /*
              * once for the global subscriptions, once for the user specific subscriptions
              */
-            yield env->manager->call_get_subscription_cr(this, *oiter, *siter, &sub);
+            yield PSManager::call_get_subscription_cr(sync_env, env->manager, this, *oiter, *siter, &sub);
             if (retcode < 0) {
               ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
               continue;
             }
 
-            yield call(sub->store_event_cr(event));
+            yield call(PSSubscription::store_event_cr(sync_env, sub, event));
             if (retcode < 0) {
               ldout(sync_env->cct, 10) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
               continue;
@@ -1122,8 +1117,8 @@ public:
     env->init_instance(sync_env->store->get_realm(), instance_id, mgr);
   }
 
-  RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
-    ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
+  RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override {
+    ldout(sync_env->cct, 5) << conf->id << ": start" << dendl;
     return new RGWPSInitEnvCBCR(sync_env, env);
   }
   RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {