]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add pause/unpause to RGWFrontend interface
authorCasey Bodley <cbodley@redhat.com>
Tue, 15 Sep 2015 19:42:10 +0000 (15:42 -0400)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:22 +0000 (16:13 -0800)
RGWProcessFrontend implements pause/unpause through the ThreadPool
interface. RGWMongooseFrontend implements them with a reader/writer
lock that blocks the civetweb_callback() until unpause

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_main.cc

index 22db78e57d116ab465f8f19c0816d23a61cd2e22..4989c993e358c76cc7d791644cbadae48ba86671 100644 (file)
@@ -254,6 +254,15 @@ public:
   virtual void run() = 0;
   virtual void handle_request(RGWRequest *req) = 0;
 
+  void pause() {
+    m_tp.pause();
+  }
+
+  void unpause_with_new_config(RGWRados *store) {
+    this->store = store;
+    m_tp.unpause();
+  }
+
   void close_fd() {
     if (sock_fd >= 0) {
       ::close(sock_fd);
@@ -751,24 +760,36 @@ void RGWLoadGenProcess::handle_request(RGWRequest *r)
   delete req;
 }
 
+struct RGWMongooseEnv : public RGWProcessEnv {
+  // every request holds a read lock, so we need to prioritize write locks to
+  // avoid starving pause_for_new_config()
+  static constexpr bool prioritize_write = true;
+  RWLock mutex;
+  RGWMongooseEnv(const RGWProcessEnv &env)
+    : RGWProcessEnv(env),
+      mutex("RGWMongooseFrontend", false, true, prioritize_write) {}
+};
 
 static int civetweb_callback(struct mg_connection *conn) {
   struct mg_request_info *req_info = mg_get_request_info(conn);
-  RGWProcessEnv *pe = static_cast<RGWProcessEnv *>(req_info->user_data);
+  RGWMongooseEnv *pe = static_cast<RGWMongooseEnv *>(req_info->user_data);
   RGWRados *store = pe->store;
   RGWREST *rest = pe->rest;
   OpsLogSocket *olog = pe->olog;
 
-  RGWRequest *req = new RGWRequest(store->get_new_req_id());
+  RGWRequest req(store->get_new_req_id());
   RGWMongoose client_io(conn, pe->port);
 
-  int ret = process_request(store, rest, req, &client_io, olog);
-  if (ret < 0) {
-    /* we don't really care about return code */
-    dout(20) << "process_request() returned " << ret << dendl;
-  }
+  {
+    // hold a read lock over access to pe->store for reconfiguration
+    RWLock::RLocker lock(pe->mutex);
 
-  delete req;
+    int ret = process_request(pe->store, rest, &req, &client_io, olog);
+    if (ret < 0) {
+      /* we don't really care about return code */
+      dout(20) << "process_request() returned " << ret << dendl;
+    }
+  }
 
 // Mark as processed
   return 1;
@@ -895,6 +916,9 @@ public:
   virtual int run() = 0;
   virtual void stop() = 0;
   virtual void join() = 0;
+
+  virtual void pause_for_new_config() = 0;
+  virtual void unpause_with_new_config(RGWRados *store) = 0;
 };
 
 class RGWProcessControlThread : public Thread {
@@ -939,6 +963,15 @@ public:
   void join() {
     thread->join();
   }
+
+  void pause_for_new_config() override {
+    pprocess->pause();
+  }
+
+  void unpause_with_new_config(RGWRados *store) override {
+    env.store = store;
+    pprocess->unpause_with_new_config(store);
+  }
 };
 
 class RGWFCGXFrontend : public RGWProcessFrontend {
@@ -993,7 +1026,7 @@ public:
 class RGWMongooseFrontend : public RGWFrontend {
   RGWFrontendConfig *conf;
   struct mg_context *ctx;
-  RGWProcessEnv env;
+  RGWMongooseEnv env;
 
   void set_conf_default(map<string, string>& m, const string& key, const string& def_val) {
     if (m.find(key) == m.end()) {
@@ -1053,6 +1086,17 @@ public:
 
   void join() {
   }
+
+  void pause_for_new_config() override {
+    // block callbacks until unpause
+    env.mutex.get_write();
+  }
+
+  void unpause_with_new_config(RGWRados *store) override {
+    env.store = store;
+    // unpause callbacks
+    env.mutex.put_write();
+  }
 };
 
 /*