]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: cr: introduce io channels
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 2 Nov 2017 00:25:02 +0000 (17:25 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:39 +0000 (08:05 -0700)
ios can have multiple channels, so that we can differentiate between
waiting on read/write/control events. We can then block on a specific,
any, or multiple events.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h
src/rgw/rgw_cr_rest.cc
src/rgw/rgw_cr_rest.h
src/rgw/rgw_http_client.cc
src/rgw/rgw_http_client.h
src/rgw/rgw_rest_conn.h

index 852fee41f38359cfec1d270856d3ac4987f63337..ecc30fad808253fc4d7704a102a120e076c8e5d6 100644 (file)
@@ -34,7 +34,7 @@ RGWCompletionManager::~RGWCompletionManager()
   timer.shutdown();
 }
 
-void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info)
+void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
 {
   Mutex::Locker l(lock);
   _complete(cn, io_id, user_info);
@@ -56,7 +56,7 @@ void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifi
   }
 }
 
-void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info)
+void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
 {
   if (cn) {
     cns.erase(cn);
@@ -121,7 +121,7 @@ void RGWCompletionManager::_wakeup(void *opaque)
   if (iter != waiters.end()) {
     void *user_id = iter->second;
     waiters.erase(iter);
-    _complete(NULL, -1 /* no IO id */, user_id);
+    _complete(NULL, rgw_io_id() /* no IO id */, user_id);
   }
 }
 
@@ -145,6 +145,10 @@ void RGWCoroutine::set_sleeping(bool flag) {
 }
 
 int RGWCoroutine::io_block(int ret, int64_t io_id) {
+  return io_block(ret, rgw_io_id{io_id, -1});
+}
+
+int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) {
   if (stack->consume_io_finish(io_id)) {
     return 0;
   }
@@ -153,7 +157,7 @@ int RGWCoroutine::io_block(int ret, int64_t io_id) {
   return ret;
 }
 
-void RGWCoroutine::io_complete(int64_t io_id) {
+void RGWCoroutine::io_complete(const rgw_io_id& io_id) {
   stack->io_complete(io_id);
 }
 
@@ -311,7 +315,7 @@ void RGWCoroutinesStack::wakeup()
   completion_mgr->wakeup((void *)this);
 }
 
-void RGWCoroutinesStack::io_complete(int64_t io_id)
+void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id)
 {
   RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
   completion_mgr->complete(nullptr, io_id, (void *)this);
@@ -414,7 +418,7 @@ static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
   ((RGWAioCompletionNotifier *)arg)->cb();
 }
 
-RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, int64_t _io_id, void *_user_data) : completion_mgr(_mgr),
+RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
                                                                          io_id(_io_id),
                                                                          user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
   c = librados::Rados::aio_create_completion((void *)this, NULL,
@@ -472,27 +476,40 @@ void RGWCoroutinesStack::dump(Formatter *f) const {
 void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider)
 {
   io_provider->set_io_user_info((void *)this);
-  io_provider->set_io_id(env->manager->get_next_io_id());
+  io_provider->assign_io(env->manager->get_io_id_provider());
 }
 
-bool RGWCoroutinesStack::try_io_unblock(int64_t io_id)
+bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id)
 {
   if (!can_io_unblock(io_id)) {
-    io_finish_ids.insert(io_id);
+#warning io_finish_ids needs to be cleaned up when owning stack finishes
+    auto p = io_finish_ids.emplace(io_id.id, io_id);
+    auto& iter = p.first;
+    bool inserted = p.second;
+    if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */
+      iter->second.channels |= io_id.channels;
+    }
     return false;
   }
 
   return true;
 }
 
-bool RGWCoroutinesStack::consume_io_finish(int64_t io_id)
+bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id)
 {
-  auto iter = io_finish_ids.find(io_id);
+  auto iter = io_finish_ids.find(io_id.id);
   if (iter == io_finish_ids.end()) {
     return false;
   }
-  io_finish_ids.erase(iter);
-  return true;
+  int finish_mask = iter->second.channels;
+  bool found = (finish_mask & io_id.channels) != 0;
+
+  finish_mask &= ~(finish_mask & io_id.channels);
+
+  if (finish_mask == 0) {
+    io_finish_ids.erase(iter);
+  }
+  return found;
 }
 
 
@@ -546,7 +563,7 @@ void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag)
   cr->set_sleeping(flag);
 }
 
-void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, int64_t io_id)
+void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id)
 {
   RWLock::WLocker wl(lock);
   cr->io_complete(io_id);
@@ -576,6 +593,7 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
   env.scheduled_stacks = &scheduled_stacks;
 
   for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
+    RGWCompletionManager::io_completion io;
     RGWCoroutinesStack *stack = *iter;
     ++iter;
     scheduled_stacks.pop_front();
@@ -647,7 +665,6 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
       stack->run_count = 0;
     }
 
-    RGWCompletionManager::io_completion io;
     while (completion_mgr->try_get_next(&io)) {
       handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
     }
@@ -741,7 +758,8 @@ int RGWCoroutinesManager::run(RGWCoroutine *op)
 
 RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
 {
-  RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, get_next_io_id(), (void *)stack);
+  rgw_io_id io_id{get_next_io_id(), -1};
+  RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack);
   completion_mgr->register_completion_notifier(cn);
   return cn;
 }
index 25dc55660ea7da21277f1f8cc4f4aa794d342100..a7b2797264c9c0fe740bc55384b3f717c168cf07 100644 (file)
@@ -39,7 +39,7 @@ class RGWCompletionManager : public RefCountedObject {
   CephContext *cct;
 
   struct io_completion {
-    int64_t io_id;
+    rgw_io_id io_id;
     void *user_info;
   };
   list<io_completion> complete_reqs;
@@ -59,12 +59,12 @@ class RGWCompletionManager : public RefCountedObject {
 
 protected:
   void _wakeup(void *opaque);
-  void _complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info);
+  void _complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
 public:
   RGWCompletionManager(CephContext *_cct);
   ~RGWCompletionManager() override;
 
-  void complete(RGWAioCompletionNotifier *cn, int64_t io_id, void *user_info);
+  void complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
   int get_next(io_completion *io);
   bool try_get_next(io_completion *io);
 
@@ -84,13 +84,13 @@ public:
 class RGWAioCompletionNotifier : public RefCountedObject {
   librados::AioCompletion *c;
   RGWCompletionManager *completion_mgr;
-  int64_t io_id;
+  rgw_io_id io_id;
   void *user_data;
   Mutex lock;
   bool registered;
 
 public:
-  RGWAioCompletionNotifier(RGWCompletionManager *_mgr, int64_t _io_id, void *_user_data);
+  RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data);
   ~RGWAioCompletionNotifier() override {
     c->release();
     lock.Lock();
@@ -293,10 +293,17 @@ public:
 
   void dump(Formatter *f) const;
 
-  void init_new_io(RGWIOProvider *io_provider);
+  void init_new_io(RGWIOProvider *io_provider); /* only links the default io id */
 
-  int io_block(int ret = 0, int64_t io_id = -1);
-  void io_complete(int64_t io_id = -1);
+  int io_block(int ret = 0) {
+    return io_block(ret, -1);
+  }
+  int io_block(int ret, int64_t io_id);
+  int io_block(int ret, const rgw_io_id& io_id);
+  void io_complete() {
+    io_complete(rgw_io_id{});
+  }
+  void io_complete(const rgw_io_id& io_id);
 };
 
 ostream& operator<<(ostream& out, const RGWCoroutine& cr);
@@ -367,8 +374,8 @@ class RGWCoroutinesStack : public RefCountedObject {
   set<RGWCoroutinesStack *> blocked_by_stack;
   set<RGWCoroutinesStack *> blocking_stacks;
 
-  set<int64_t> io_finish_ids;
-  int64_t io_blocked_id{-1};
+  map<int64_t, rgw_io_id> io_finish_ids;
+  rgw_io_id io_blocked_id;
 
   bool done_flag;
   bool error_flag;
@@ -409,18 +416,18 @@ public:
   void set_io_blocked(bool flag) {
     blocked_flag = flag;
   }
-  void set_io_blocked_id(int64_t io_id) {
+  void set_io_blocked_id(const rgw_io_id& io_id) {
     io_blocked_id = io_id;
   }
   bool is_io_blocked() {
     return blocked_flag && !done_flag;
   }
-  bool can_io_unblock(int64_t io_id) {
-    return (io_blocked_id == io_id) ||
-           (io_blocked_id < 0);
+  bool can_io_unblock(const rgw_io_id& io_id) {
+    return ((io_blocked_id.id < 0) ||
+            io_blocked_id.intersects(io_id));
   }
-  bool try_io_unblock(int64_t io_id);
-  bool consume_io_finish(int64_t io_id);
+  bool try_io_unblock(const rgw_io_id& io_id);
+  bool consume_io_finish(const rgw_io_id& io_id);
   void set_interval_wait(bool flag) {
     interval_wait_flag = flag;
   }
@@ -461,7 +468,10 @@ public:
 
   int wait(const utime_t& interval);
   void wakeup();
-  void io_complete(int64_t io_id = -1);
+  void io_complete() {
+    io_complete(rgw_io_id{});
+  }
+  void io_complete(const rgw_io_id& io_id);
 
   bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
 
@@ -542,6 +552,8 @@ class RGWCoroutinesManager {
 
   RWLock lock;
 
+  RGWIOIDProvider io_id_provider;
+
   void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
                               RGWCompletionManager::io_completion& io, int *waiting_count);
 protected:
@@ -590,10 +602,14 @@ public:
   int64_t get_next_io_id();
 
   void set_sleeping(RGWCoroutine *cr, bool flag);
-  void io_complete(RGWCoroutine *cr, int64_t io_id = -1);
+  void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
 
   virtual string get_id();
   void dump(Formatter *f) const;
+
+  RGWIOIDProvider& get_io_id_provider() {
+    return io_id_provider;
+  }
 };
 
 class RGWSimpleCoroutine : public RGWCoroutine {
index 3c0b597fb979b4a42411aa084464833324ecdae2..f489c239ad27cacf54d03f07780bef3168df7505 100644 (file)
@@ -14,12 +14,12 @@ class RGWCRHTTPGetDataCB : public RGWGetDataCB {
   Mutex lock;
   RGWCoroutinesEnv *env;
   RGWCoroutine *cr;
-  int64_t io_id;
+  rgw_io_id io_id;
   bufferlist data;
   bufferlist extra_data;
   bool got_all_extra_data{false};
 public:
-  RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, int64_t _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
+  RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, const rgw_io_id& _io_id) : lock("RGWCRHTTPGetDataCB"), env(_env), cr(_cr), io_id(_io_id) {}
 
   int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
     {
@@ -86,7 +86,7 @@ int RGWStreamReadHTTPResourceCRF::init()
 {
   env->stack->init_new_io(req);
 
-  in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id());
+  in_cb = new RGWCRHTTPGetDataCB(env, caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ));
 
   req->set_in_cb(in_cb);
 
@@ -134,11 +134,12 @@ int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers,
 int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
 {
     reenter(&read_state) {
+    io_read_mask = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
     while (!req->is_done() ||
            in_cb->has_data()) {
       *io_pending = true;
       if (!in_cb->has_data()) {
-        yield caller->io_block(0, req->get_io_id());
+        yield caller->io_block(0, io_read_mask);
       }
       got_attrs = true;
       if (need_extra_data() && !got_extra_data) {
@@ -202,7 +203,7 @@ int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry)
     yield req->finish_write();
     *need_retry = !req->is_done();
     while (!req->is_done()) {
-      yield caller->io_block(0, req->get_io_id());
+      yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
       *need_retry = !req->is_done();
     }
 
index 005ed10cbac77bd2c43dfb98bb0f1d180d047d61..c8bf4763ae5e15d1c37a0e928308eb9c3cd7cbbd 100644 (file)
@@ -368,6 +368,8 @@ class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
   bool got_attrs{false};
   bool got_extra_data{false};
 
+  rgw_io_id io_read_mask;
+
 protected:
   rgw_rest_obj rest_obj;
 
index 4fdf635e5153dd65a8e11a19bbf92e2e45cbd25b..757c6a5c7bffb243ad6bdab6d217638e1fb89125 100644 (file)
@@ -34,7 +34,7 @@ struct rgw_http_req_data : public RefCountedObject {
   int ret{0};
   std::atomic<bool> done = { false };
   RGWHTTPClient *client{nullptr};
-  int64_t io_id{-1};
+  rgw_io_id control_io_id;
   void *user_info{nullptr};
   bool registered{false};
   RGWHTTPManager *mgr{nullptr};
@@ -268,6 +268,13 @@ void rgw_release_all_curl_handles()
   delete handles;
 }
 
+void RGWIOProvider::assign_io(RGWIOIDProvider& io_id_provider, int io_type)
+{
+  if (id == 0) {
+    id = io_id_provider.get_next();
+  }
+}
+
 /*
  * the simple set of callbacks will be called on RGWHTTPClient::process()
  */
@@ -828,7 +835,7 @@ void RGWHTTPManager::_complete_request(rgw_http_req_data *req_data)
     req_data->mgr = nullptr;
   }
   if (completion_mgr) {
-    completion_mgr->complete(NULL, req_data->io_id, req_data->user_info);
+    completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
   }
 
   req_data->put();
@@ -949,7 +956,7 @@ int RGWHTTPManager::add_request(RGWHTTPClient *client, bool send_data_hint)
 
   req_data->mgr = this;
   req_data->client = client;
-  req_data->io_id = client->get_io_id();
+  req_data->control_io_id = client->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
   req_data->user_info = client->get_io_user_info();
 
   register_request(req_data);
index b5a8e32143be465a1da55032a470cac5af2f5eb5..ad1301531fa5ac406d9c3a886d99d5020923c5a7 100644 (file)
@@ -20,14 +20,42 @@ void rgw_http_client_cleanup();
 struct rgw_http_req_data;
 class RGWHTTPManager;
 
+class RGWIOIDProvider
+{
+  std::atomic<int64_t> max = {0};
+
+public:
+  RGWIOIDProvider() {}
+  int64_t get_next() {
+    return ++max;
+  }
+};
+
+struct rgw_io_id {
+  int64_t id{0};
+  int channels{0};
+
+  rgw_io_id() {}
+  rgw_io_id(int64_t _id, int _channels) : id(_id), channels(_channels) {}
+
+  bool intersects(const rgw_io_id& rhs) {
+    return (id == rhs.id && ((channels | rhs.channels) != 0));
+  }
+};
+
 class RGWIOProvider
 {
+  int64_t id{-1};
+
 public:
   RGWIOProvider() {}
 
-  virtual void set_io_id(int64_t _io_id) = 0;
+  void assign_io(RGWIOIDProvider& io_id_provider, int io_type = -1);
+  rgw_io_id get_io_id(int io_type) {
+    return rgw_io_id{id, io_type};
+  }
+
   virtual void set_io_user_info(void *_user_info) = 0;
-  virtual int64_t get_io_id() = 0;
   virtual void *get_io_user_info() = 0;
 };
 
@@ -41,7 +69,6 @@ class RGWHTTPClient : public RGWIOProvider
   bool has_send_len;
   long http_status;
 
-  int64_t io_id{-1};
   void *user_info{nullptr};
 
   rgw_http_req_data *req_data;
@@ -113,6 +140,10 @@ public:
   static const long HTTP_STATUS_UNAUTHORIZED = 401;
   static const long HTTP_STATUS_NOTFOUND     = 404;
 
+  static constexpr int HTTPCLIENT_IO_READ    = 0x1;
+  static constexpr int HTTPCLIENT_IO_WRITE   = 0x2;
+  static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
+
   virtual ~RGWHTTPClient();
   explicit RGWHTTPClient(CephContext *cct,
                          const string& _method,
@@ -164,18 +195,10 @@ public:
     method = _method;
   }
 
-  void set_io_id(int64_t _io_id) override {
-    io_id = _io_id;
-  }
-
   void set_io_user_info(void *_user_info) override {
     user_info = _user_info;
   }
 
-  int64_t get_io_id() override {
-    return io_id;
-  }
-
   void *get_io_user_info() override {
     return user_info;
   }
index aa91c65faeff3ddc55f230f38d503c8391068e9d..537077bb83f75d49df2dde23388b61d927b0a580 100644 (file)
@@ -217,12 +217,8 @@ public:
                      param_vec_t *extra_headers,
                      RGWHTTPManager *_mgr);
 
-  void set_io_id(int64_t _io_id) override {
-    req.set_io_id(_io_id);
-  }
-
-  int64_t get_io_id() override {
-    return req.get_io_id();
+  rgw_io_id get_io_id(int io_type) {
+    return req.get_io_id(io_type);
   }
 
   void set_io_user_info(void *user_info) override {
@@ -343,12 +339,8 @@ public:
                      param_vec_t *extra_headers,
                      RGWHTTPManager *_mgr);
 
-  void set_io_id(int64_t _io_id) override {
-    req.set_io_id(_io_id);
-  }
-
-  int64_t get_io_id() override {
-    return req.get_io_id();
+  rgw_io_id get_io_id(int io_type) {
+    return req.get_io_id(io_type);
   }
 
   void set_io_user_info(void *user_info) override {