]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: md log cloning completely async
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 17 Jul 2015 23:29:19 +0000 (16:29 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Feb 2016 20:58:59 +0000 (12:58 -0800)
hook librados completions into the rgw completion notifier, split the
md log write into two different states.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket.cc
src/rgw/rgw_metadata.cc
src/rgw/rgw_metadata.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_sync.cc

index 5ac344804f358ac5b583f56dd880ebc0f09946c7..c448e65adf6ab71ff49d76368a6cb8d5c86e5be4 100644 (file)
@@ -1263,7 +1263,7 @@ int RGWDataChangesLog::renew_entries()
 
     utime_t now = ceph_clock_now(cct);
 
-    int ret = store->time_log_add(oids[miter->first], entries);
+    int ret = store->time_log_add(oids[miter->first], entries, NULL);
     if (ret < 0) {
       /* we don't really need to have a special handling for failed cases here,
        * as this is just an optimization. */
index 4cf3c2d0cb5733b84e509422a6597f47945e2c71..b4cbba750d7ad18a095d6e05d0d79ed84973fae3 100644 (file)
@@ -99,12 +99,12 @@ int RGWMetadataLog::add_entry(RGWRados *store, RGWMetadataHandler *handler, cons
   return store->time_log_add(oid, now, section, key, bl);
 }
 
-int RGWMetadataLog::store_entries_in_shard(RGWRados *store, list<cls_log_entry>& entries, int shard_id)
+int RGWMetadataLog::store_entries_in_shard(RGWRados *store, list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion)
 {
   string oid;
 
   store->shard_name(prefix, shard_id, oid);
-  return store->time_log_add(oid, entries, false);
+  return store->time_log_add(oid, entries, completion, false);
 }
 
 void RGWMetadataLog::init_list_entries(int shard_id, utime_t& from_time, utime_t& end_time, 
@@ -267,9 +267,9 @@ RGWMetadataManager::~RGWMetadataManager()
   delete md_log;
 }
 
-int RGWMetadataManager::store_md_log_entries(list<cls_log_entry>& entries, int shard_id)
+int RGWMetadataManager::store_md_log_entries(list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion)
 {
-  return md_log->store_entries_in_shard(store, entries, shard_id);
+  return md_log->store_entries_in_shard(store, entries, shard_id, completion);
 }
 
 int RGWMetadataManager::register_handler(RGWMetadataHandler *handler)
index ba4b3893271b51dd6ea93f9673a94001d222206d..981f43782c52aae948eea7c03fe5241a28eb143c 100644 (file)
@@ -150,7 +150,7 @@ public:
   RGWMetadataLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), prefix(META_LOG_OBJ_PREFIX) {}
 
   int add_entry(RGWRados *store, RGWMetadataHandler *handler, const string& section, const string& key, bufferlist& bl);
-  int store_entries_in_shard(RGWRados *store, list<cls_log_entry>& entries, int shard_id);
+  int store_entries_in_shard(RGWRados *store, list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion);
 
   struct LogListCtx {
     int cur_shard;
@@ -228,7 +228,7 @@ public:
 
   RGWMetadataHandler *get_handler(const char *type);
 
-  int store_md_log_entries(list<cls_log_entry>& entries, int shard_id);
+  int store_md_log_entries(list<cls_log_entry>& entries, int shard_id, librados::AioCompletion *completion);
 
   int put_entry(RGWMetadataHandler *handler, const string& key, bufferlist& bl, bool exclusive,
                 RGWObjVersionTracker *objv_tracker, time_t mtime, map<string, bufferlist> *pattrs = NULL);
index b36d4cd1d7b95b664d00fe56188e46f7fb7d243b..2c598d0ef7c6c73ed33b8818fde3f0e68f3d81c4 100644 (file)
@@ -2263,10 +2263,8 @@ void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, c
   cls_log_add_prepare_entry(entry, ut, section, key, bl);
 }
 
-int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl)
+int RGWRados::time_log_add_init(librados::IoCtx& io_ctx)
 {
-  librados::IoCtx io_ctx;
-
   const char *log_pool = zone.log_pool.name.c_str();
   librados::Rados *rad = get_rados_handle();
   int r = rad->ioctx_create(log_pool, io_ctx);
@@ -2282,6 +2280,19 @@ int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& s
   if (r < 0)
     return r;
 
+  return 0;
+
+}
+
+int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl)
+{
+  librados::IoCtx io_ctx;
+
+  int r = time_log_add_init(io_ctx);
+  if (r < 0) {
+    return r;
+  }
+
   ObjectWriteOperation op;
   cls_log_add(op, ut, section, key, bl);
 
@@ -2289,29 +2300,24 @@ int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& s
   return r;
 }
 
-int RGWRados::time_log_add(const string& oid, list<cls_log_entry>& entries, bool monotonic_inc)
+int RGWRados::time_log_add(const string& oid, list<cls_log_entry>& entries,
+                          librados::AioCompletion *completion, bool monotonic_inc)
 {
   librados::IoCtx io_ctx;
 
-  const char *log_pool = zone.log_pool.name.c_str();
-  librados::Rados *rad = get_rados_handle();
-  int r = rad->ioctx_create(log_pool, io_ctx);
-  if (r == -ENOENT) {
-    rgw_bucket pool(log_pool);
-    r = create_pool(pool);
-    if (r < 0)
-      return r;
-    // retry
-    r = rad->ioctx_create(log_pool, io_ctx);
-  }
-  if (r < 0)
+  int r = time_log_add_init(io_ctx);
+  if (r < 0) {
     return r;
+  }
 
   ObjectWriteOperation op;
   cls_log_add(op, entries, monotonic_inc);
 
-  r = io_ctx.operate(oid, &op);
+  if (!completion) {
+    r = io_ctx.operate(oid, &op);
+  } else {
+    r = io_ctx.aio_operate(oid, completion, &op);
+  }
   return r;
 }
 
index 1aaa54ba4030f7efa96c4150502fdd09e63693c8..56cfef29f4c9e50573f8982b026f694ae91a4336 100644 (file)
@@ -2167,7 +2167,9 @@ public:
   void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
   void shard_name(const string& prefix, unsigned shard_id, string& name);
   void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
-  int time_log_add(const string& oid, list<cls_log_entry>& entries, bool monotonic_inc = true);
+  int time_log_add_init(librados::IoCtx& io_ctx);
+  int time_log_add(const string& oid, list<cls_log_entry>& entries,
+                  librados::AioCompletion *completion, bool monotonic_inc = true);
   int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
   int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,
                     int max_entries, list<cls_log_entry>& entries,
index 6bf6742959bda8f82b013393d61d71914cb5905e..88bc87a61f466282d72aaed69809edfcbb6d8d3f 100644 (file)
@@ -1,5 +1,6 @@
 #include "common/ceph_json.h"
 #include "common/RWLock.h"
+#include "common/RefCountedObj.h"
 
 #include "rgw_common.h"
 #include "rgw_rados.h"
@@ -144,39 +145,80 @@ int RGWRemoteMetaLog::get_shard_info(int shard_id)
   return 0;
 }
 
+static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg);
+
+/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
+class AioCompletionNotifier : public RefCountedObject {
+  librados::AioCompletion *c;
+  RGWCompletionManager *completion_mgr;
+  void *user_data;
+
+public:
+  AioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) {
+    c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL);
+  }
+
+  ~AioCompletionNotifier() {
+    c->release();
+  }
+
+  librados::AioCompletion *completion() {
+    return c;
+  }
+
+  void cb() {
+    completion_mgr->complete(user_data);
+    put();
+  }
+};
+
+static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
+{
+  ((AioCompletionNotifier *)arg)->cb();
+}
+
 #define CLONE_MAX_ENTRIES 100
 #define CLONE_OPS_WINDOW 16
 
 class RGWCloneMetaLogOp {
   RGWRados *store;
   RGWHTTPManager *http_manager;
+  RGWCompletionManager *completion_mgr;
 
   int shard_id;
   string marker;
+  bool truncated;
 
   int max_entries;
 
   RGWRESTReadResource *http_op;
 
+  AioCompletionNotifier *md_op_notifier;
+
   bool finished;
 
   enum State {
     Init = 0,
     SentRESTRequest = 1,
     ReceivedRESTResponse = 2,
-    Done = 3,
+    StoringMDLogEntries = 3,
+    Done = 4,
   } state;
+#warning need an error state
 public:
-  RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, int _id, const string& _marker) : store(_store),
-                                                            http_manager(_mgr), shard_id(_id),
-                                                            marker(_marker), max_entries(CLONE_MAX_ENTRIES),
-                                                           http_op(NULL), finished(false),
-                                                            state(RGWCloneMetaLogOp::Init) {}
+  RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, RGWCompletionManager *_completion_mgr,
+                   int _id, const string& _marker) : store(_store),
+                                                      http_manager(_mgr), completion_mgr(_completion_mgr), shard_id(_id),
+                                                      marker(_marker), truncated(false), max_entries(CLONE_MAX_ENTRIES),
+                                                     http_op(NULL), md_op_notifier(NULL),
+                                                     finished(false),
+                                                      state(RGWCloneMetaLogOp::Init) {}
 
   int operate(bool *need_wait);
 
-  int send_clone_shard();
-  int finish_clone_shard(bool *need_wait);
+  int state_init(bool *need_wait);
+  int state_sent_rest_request(bool *need_wait);
+  int state_storing_mdlog_entries(bool *need_wait);
 
   bool is_done() { return (state == Done); }
 };
@@ -185,7 +227,7 @@ int RGWRemoteMetaLog::clone_shards()
 {
   list<RGWCloneMetaLogOp *> ops;
   for (int i = 0; i < (int)log_info.num_shards; i++) {
-    RGWCloneMetaLogOp *op = new RGWCloneMetaLogOp(store, &http_manager, i, clone_markers[i]);
+    RGWCloneMetaLogOp *op = new RGWCloneMetaLogOp(store, &http_manager, &completion_mgr, i, clone_markers[i]);
     ops.push_back(op);
   }
 
@@ -237,14 +279,15 @@ int RGWCloneMetaLogOp::operate(bool *need_wait)
   switch (state) {
     case Init:
       ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending request" << dendl;
-      *need_wait = true;
-      return send_clone_shard();
+      return state_init(need_wait);
     case SentRESTRequest:
       ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": handling response" << dendl;
-      return finish_clone_shard(need_wait);
+      return state_sent_rest_request(need_wait);
     case ReceivedRESTResponse:
       assert(0);
       break; /* unreachable */
+    case StoringMDLogEntries:
+      return state_storing_mdlog_entries(need_wait);
     case Done:
       ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": done" << dendl;
       break;
@@ -253,7 +296,7 @@ int RGWCloneMetaLogOp::operate(bool *need_wait)
   return 0;
 }
 
-int RGWCloneMetaLogOp::send_clone_shard()
+int RGWCloneMetaLogOp::state_init(bool *need_wait)
 {
   RGWRESTConn *conn = store->rest_master_conn;
 
@@ -275,18 +318,19 @@ int RGWCloneMetaLogOp::send_clone_shard()
 
   http_op->set_user_info((void *)this);
 
-  state = SentRESTRequest;
-
   int ret = http_op->aio_read();
   if (ret < 0) {
     ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog data" << dendl;
     return ret;
   }
 
+  *need_wait = true;
+  state = SentRESTRequest;
+
   return 0;
 }
 
-int RGWCloneMetaLogOp::finish_clone_shard(bool *need_wait)
+int RGWCloneMetaLogOp::state_sent_rest_request(bool *need_wait)
 {
   rgw_mdlog_shard_data data;
 
@@ -301,7 +345,7 @@ int RGWCloneMetaLogOp::finish_clone_shard(bool *need_wait)
 
   ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
 
-  bool truncated = ((int)data.entries.size() == max_entries);
+  truncated = ((int)data.entries.size() == max_entries);
 
   *need_wait = false;
   if (data.entries.empty()) {
@@ -329,16 +373,25 @@ int RGWCloneMetaLogOp::finish_clone_shard(bool *need_wait)
     marker = entry.id;
   }
 
-  ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id);
+  state = StoringMDLogEntries;
+
+  md_op_notifier = new AioCompletionNotifier(completion_mgr, (void *)this);
+
+  ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, md_op_notifier->completion());
   if (ret < 0) {
     ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
     return ret;
   }
+  *need_wait = true;
+  return 0;
+}
 
+int RGWCloneMetaLogOp::state_storing_mdlog_entries(bool *need_wait)
+{
   if (truncated) {
-    *need_wait = true;
-    return send_clone_shard();
+    return state_init(need_wait);
   } else {
+    *need_wait = false;
     state = Done;
   }