]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: introduce cr singleton, use for subscriptions init
authorYehuda Sadeh <yehuda@redhat.com>
Sun, 1 Jul 2018 23:46:13 +0000 (16:46 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 11 Dec 2018 08:10:42 +0000 (00:10 -0800)
Prevent concurrent initalizations of the same subscription

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h
src/rgw/rgw_sync_module_pubsub.cc

index d756cfd52a7fda556d94986f9278d48f3d655aaa..5910df6de7fb6c353f081c75dea3ceb30a66c5f0 100644 (file)
@@ -222,7 +222,7 @@ int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env)
   RGWCoroutine *op = *pos;
   op->stack = this;
   ldout(cct, 20) << *op << ": operate()" << dendl;
-  int r = op->operate();
+  int r = op->operate_wrapper();
   if (r < 0) {
     ldout(cct, 20) << *op << ": operate() returned r=" << r << dendl;
   }
index 8c1e25dc58a7051908f919c6a14021dfa062c160..6055eb3b70f9c1b00012718076593d77f9fe0244 100644 (file)
@@ -258,6 +258,9 @@ protected:
     return status;
   }
 
+  virtual int operate_wrapper() {
+    return operate();
+  }
 public:
   RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
   ~RGWCoroutine() override;
index 0eec2d94729c1170dfdddeb9a814d00c5f487542..23e5d14c8e86c0bb4c09ed3b002f75001402f475 100644 (file)
@@ -340,6 +340,98 @@ public:
         
 };
 
+template <class T>
+class RGWSingletonCR : public RGWCoroutine {
+  friend class WrapperCR;
+
+  boost::asio::coroutine wrapper_state;
+  bool started{false};
+  int operate_ret{0};
+
+  struct WaiterInfo {
+    RGWCoroutine *cr{nullptr};
+    T *result;
+  };
+  using WaiterInfoRef = std::shared_ptr<WaiterInfo>;
+
+  deque<WaiterInfoRef> waiters;
+
+  void add_waiter(RGWCoroutine *cr, T *result) {
+    auto waiter = std::make_shared<WaiterInfo>();
+    waiter->cr = cr;
+    waiter->result = result;
+    waiters.push_back(waiter);
+  };
+
+  bool get_next_waiter(WaiterInfoRef *waiter) {
+    if (waiters.empty()) {
+      waiter->reset();
+      return false;
+    }
+
+    *waiter = waiters.front();
+    waiters.pop_front();
+    return true;
+  }
+
+  int operate_wrapper() override {
+    reenter(&wrapper_state) {
+      while (!is_done()) {
+        ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl;
+        operate_ret = operate();
+        if (operate_ret < 0) {
+          ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl;
+        }
+        if (!is_done()) {
+          yield;
+        }
+      }
+
+      ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl;
+      /* we're done, can't yield anymore */
+
+      WaiterInfoRef waiter;
+      while (get_next_waiter(&waiter)) {
+        ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
+        waiter->cr->set_retcode(retcode);
+        waiter->cr->set_sleeping(false);
+        return_result(waiter->result);
+        put();
+      }
+
+      return retcode;
+    }
+    return 0;
+  }
+
+  virtual void return_result(T *result) {}
+
+public:
+  RGWSingletonCR(CephContext *_cct)
+    : RGWCoroutine(_cct) {}
+
+  int execute(RGWCoroutine *caller, T *result = nullptr) {
+    if (!started) {
+      ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl;
+      started = true;
+      caller->call(this);
+      return 0;
+    } else if (!is_done()) {
+      ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
+      get();
+      add_waiter(caller, result);
+      caller->set_sleeping(true);
+      return 0;
+    }
+
+    ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
+    caller->set_retcode(retcode);
+    return_result(result);
+    return retcode;
+  }
+};
+
+
 class PSSubscription;
 using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
 
@@ -353,9 +445,10 @@ class PSSubscription {
   RGWDataAccessRef data_access;
   RGWDataAccess::BucketRef bucket;
 
-public:
+  class InitCR;
+  InitCR *init_cr{nullptr};
 
-  class InitCR : public RGWCoroutine {
+  class InitCR : public RGWSingletonCR<bool> {
     RGWDataSyncEnv *sync_env;
     PSSubscriptionRef sub;
     rgw_get_bucket_info_params get_bucket_info;
@@ -365,7 +458,7 @@ public:
     int i;
   public:
     InitCR(RGWDataSyncEnv *_sync_env,
-           PSSubscriptionRef& _sub) : RGWCoroutine(_sync_env->cct),
+           PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sync_env->cct),
                                     sync_env(_sync_env),
                                     sub(_sub), conf(sub->env->conf),
                                     sub_conf(sub->sub_conf) {
@@ -409,7 +502,6 @@ public:
               << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
             return set_cr_error(retcode);
           }
-ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
 
           /* second iteration: we got -ENOENT and created a bucket */
         }
@@ -464,23 +556,32 @@ ldout(sync_env->cct, 20) << "pubsub: bucket create: after user info: " << json_s
     }
   };
 
+public:
+
   PSSubscription(RGWDataSyncEnv *_sync_env,
                  PSEnvRef _env,
                  PSSubConfigRef& _sub_conf) : sync_env(_sync_env),
                                       env(_env),
                                       sub_conf(_sub_conf),
                                       data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
+  virtual ~PSSubscription() {
+    if (init_cr) {
+      init_cr->put();
+    }
+  }
 public:
   static PSSubscriptionRef& get_shared(RGWDataSyncEnv *_sync_env,
                                 PSEnvRef _env,
                                 PSSubConfigRef& _sub_conf) {
     auto sub = new PSSubscription(_sync_env, _env, _sub_conf);
     sub->self = std::shared_ptr<PSSubscription>(sub);
+    sub->init_cr = new InitCR(_sync_env, sub->self);
+    sub->init_cr->get();
     return sub->self;
   }
 
-  RGWCoroutine *init_cr() {
-    return new InitCR(sync_env, self);
+  int call_init_cr(RGWCoroutine *caller) {
+    return init_cr->execute(caller);
   }
 
   RGWCoroutine *store_event_cr(EventRef& event) {
@@ -500,7 +601,7 @@ class PSManager
 
   map<string, PSSubscriptionRef> subs;
 
-  class GetSubCR : public RGWCoroutine {
+  class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
     RGWDataSyncEnv *sync_env;
     PSManagerRef mgr;
     const string& sub_name;
@@ -513,13 +614,16 @@ class PSManager
     GetSubCR(RGWDataSyncEnv *_sync_env,
                       PSManagerRef& _mgr,
                       const string& _sub_name,
-                      PSSubscriptionRef *_ref) : RGWCoroutine(_sync_env->cct),
+                      PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sync_env->cct),
                                                  sync_env(_sync_env),
                                                  mgr(_mgr),
                                                  sub_name(_sub_name),
                                                  ref(_ref),
                                                  conf(mgr->env->conf) {
     }
+    ~GetSubCR() {
+      mgr->remove_get_sub(sub_name);
+    }
 
     int operate() override {
       reenter(this) {
@@ -530,20 +634,33 @@ class PSManager
 
         *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
 
-        yield call((*ref)->init_cr());
+        yield (*ref)->call_init_cr(this);
         if (retcode < 0) {
           ldout(sync_env->cct, 0) << "ERROR: failed to init subscription" << dendl;
-          ref->reset();
           return set_cr_error(retcode);
         }
 
         mgr->subs[sub_name] = *ref;
+
         return set_cr_done();
       }
       return 0;
     }
+
+    void return_result(PSSubscriptionRef *result) override {
+      ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
+      if (retcode >= 0) {
+        *result = *ref;
+      }
+    }
   };
 
+  map<string, GetSubCR *> get_subs;
+
+  void remove_get_sub(const string& name) {
+    get_subs.erase(name);
+  }
+
   bool find_sub_instance(const string& sub_name, PSSubscriptionRef *sub) {
     auto iter = subs.find(sub_name);
     if (iter != subs.end()) {
@@ -565,12 +682,19 @@ public:
     return mgr->self;
   }
 
-  RGWCoroutine *get_subscription_cr(const string& sub_name, PSSubscriptionRef *ref) {
+  int call_get_subscription_cr(RGWCoroutine *caller, const string& sub_name, PSSubscriptionRef *ref) {
     if (find_sub_instance(sub_name, ref)) {
       /* found it! nothing to execute */
-      return nullptr;
+      ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl;
+      return 0;
+    }
+    auto& gs = get_subs[sub_name];
+    if (!gs) {
+      ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl;
+      gs = new GetSubCR(sync_env, self, sub_name, ref);
     }
-    return new GetSubCR(sync_env, self, sub_name, ref);
+    ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl;
+    return gs->execute(caller, ref);
   }
 
   friend class GetSubCR;
@@ -624,6 +748,7 @@ public:
   }
 };
 
+
 class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
   RGWDataSyncEnv *sync_env;
   PSEnvRef env;
@@ -663,7 +788,7 @@ public:
         for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
           ldout(sync_env->cct, 10) << ": subscription: " << *siter << dendl;
 
-          yield call(env->manager->get_subscription_cr(*siter, &sub));
+          yield env->manager->call_get_subscription_cr(this, *siter, &sub);
           if (retcode < 0) {
             ldout(sync_env->cct, 10) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl;
             continue;