]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: build index of all master zone meta keys
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 14 Aug 2015 05:59:51 +0000 (22:59 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Feb 2016 21:40:54 +0000 (13:40 -0800)
still wip, but getting there

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_coroutine.h
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index 133897ae671550f20a23e71110bd2d37e94e3f55..22c66cf8e3f8b3292f0c57bca6ee53be8475ca24 100644 (file)
@@ -236,7 +236,7 @@ void RGWCoroutine::call(RGWCoroutine *op)
   assert(r == 0);
 }
 
-void RGWCoroutine::spawn(RGWCoroutine *op)
+void RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
 {
   op->get();
   spawned_ops.push_back(op);
@@ -248,7 +248,9 @@ void RGWCoroutine::spawn(RGWCoroutine *op)
 
   env->stacks->push_back(stack);
 
-  env->stack->set_blocked_by(stack);
+  if (wait) {
+    env->stack->set_blocked_by(stack);
+  }
 }
 
 int RGWCoroutine::complete_spawned()
index 99e7a39af3012065faddddda9ab475eedb79e77a..a91fae9f203e4e31573c0e7d9b6ec55e50064995 100644 (file)
@@ -91,7 +91,7 @@ protected:
   }
 
   void call(RGWCoroutine *op);
-  void spawn(RGWCoroutine *op);
+  void spawn(RGWCoroutine *op, bool wait);
 
   int complete_spawned();
 
index 3249feb63cab13952dbc7d725ee3d7fc39b031b2..a082cc3e553893612dec190a513c8c36c696ae6e 100644 (file)
@@ -123,6 +123,29 @@ public:
                                                                   bl(_bl), mtime(_mtime) {}
 };
 
+class RGWAsyncWriteOmapKeys : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  rgw_obj obj;
+  map<string, bufferlist> entries;
+
+protected:
+  int _send_request() {
+    librados::IoCtx ioctx;
+    librados::Rados *rados = store->get_rados_handle();
+    int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */
+    if (r < 0) {
+      lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl;
+      return r;
+    }
+
+    return ioctx.omap_set(obj.get_object(), entries);
+  }
+public:
+  RGWAsyncWriteOmapKeys(RGWAioCompletionNotifier *cn, RGWRados *_store, rgw_obj& _obj,
+                    map<string, bufferlist>& _entries) : RGWAsyncRadosRequest(cn), store(_store),
+                                                                obj(_obj), entries(_entries) {}
+};
+
 class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
   RGWRados *store;
   rgw_obj obj;
@@ -546,6 +569,43 @@ public:
   }
 };
 
+class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  map<string, bufferlist> entries;
+
+  rgw_bucket pool;
+  string oid;
+
+  RGWAsyncWriteOmapKeys *req;
+
+public:
+  RGWRadosSetOmapKeysCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     rgw_bucket& _pool, const string& _oid,
+                     map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
+                                                async_rados(_async_rados),
+                                               store(_store),
+                                               entries(_entries),
+                                               pool(_pool), oid(_oid),
+                                                req(NULL) {
+  }
+
+  ~RGWRadosSetOmapKeysCR() {
+    delete req;
+  }
+
+  int send_request() {
+    rgw_obj obj = rgw_obj(pool, oid);
+    req = new RGWAsyncWriteOmapKeys(env->stack->create_completion_notifier(), store, obj, entries);
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() {
+    return req->get_ret_status();
+  }
+};
+
 class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
   RGWAsyncRadosProcessor *async_rados;
   RGWRados *store;
@@ -715,7 +775,7 @@ public:
         for (int i = 0; i < (int)status.num_shards; i++) {
          rgw_meta_sync_marker marker;
           spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
-                                                         RGWMetaSyncStatusManager::shard_obj_name(i), marker));
+                                                         RGWMetaSyncStatusManager::shard_obj_name(i), marker), true);
         }
       }
       yield { /* unlock */
@@ -765,7 +825,7 @@ int RGWReadSyncStatusCoroutine::handle_data(rgw_meta_sync_info& data)
   map<uint32_t, rgw_meta_sync_marker>& markers = sync_status->sync_markers;
   for (int i = 0; i < (int)data.num_shards; i++) {
     spawn(new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
-                                                   RGWMetaSyncStatusManager::shard_obj_name(i), &markers[i]));
+                                                   RGWMetaSyncStatusManager::shard_obj_name(i), &markers[i]), true);
   }
   return 0;
 }
@@ -775,11 +835,89 @@ int RGWReadSyncStatusCoroutine::finish()
   return complete_spawned();
 }
 
+class RGWOmapAppend : public RGWCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  RGWCoroutinesEnv *env;
+
+  rgw_bucket pool;
+  string oid;
+
+  map<string, bufferlist> entries;
+public:
+
+  RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutinesEnv *_env, rgw_bucket& _pool, const string& _oid)
+                      : RGWCoroutine(_store->ctx()), async_rados(_async_rados),
+                       store(_store), env(_env), pool(_pool), oid(_oid) {}
+  int append(const string& entry) {
+    entries[entry] = bufferlist();
+#define OMAP_APPEND_MAX_ENTRIES 10
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+    if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+      int r = flush();
+      if (r < 0) {
+       ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to flush entries" << dendl;
+       return r;
+      }
+      entries.clear();
+    }
+    return 0;
+  }
+
+  int flush() {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+    return env->stack->call(this);
+  }
+
+  int operate() {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+    reenter(this) {
+      yield {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+       call(new RGWRadosSetOmapKeysCR(async_rados, store, pool, oid, entries));
+      }
+      if (get_ret_status() < 0) {
+       return set_state(RGWCoroutine_Error);
+      }
+      return set_state(RGWCoroutine_Done);
+    }
+    return 0;
+  }
+};
+
+class RGWShardedOmapCRManager {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  RGWCoroutinesEnv *env;
+
+  int num_shards;
+
+  vector<RGWOmapAppend *> shards;
+public:
+  RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutinesEnv *_env, int _num_shards, rgw_bucket& pool, const string& oid_prefix)
+                      : async_rados(_async_rados),
+                       store(_store), env(_env), num_shards(_num_shards) {
+    shards.reserve(num_shards);
+    for (int i = 0; i < num_shards; ++i) {
+      char buf[oid_prefix.size() + 16];
+      snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i);
+      RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, env, pool, buf);
+      shards.push_back(shard);
+    }
+  }
+  int append(const string& entry) {
+    static int counter = 0;
+    return (shards[++counter % shards.size()]->append(entry));
+  }
+};
+
 class RGWFetchAllMetaCR : public RGWCoroutine {
   RGWRados *store;
   RGWHTTPManager *http_manager;
+  RGWAsyncRadosProcessor *async_rados;
 
-  int max_entries;
+  int num_shards;
 
 
   int req_ret;
@@ -788,15 +926,23 @@ class RGWFetchAllMetaCR : public RGWCoroutine {
   list<string>::iterator sections_iter;
   list<string> result;
 
+  RGWShardedOmapCRManager *entries_index;
+
 public:
-  RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr) : RGWCoroutine(_store->ctx()), store(_store),
+  RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, int _num_shards) : RGWCoroutine(_store->ctx()), store(_store),
                                                       http_manager(_mgr),
-                                                     req_ret(0) {}
+                                                     async_rados(_async_rados),
+                                                     num_shards(_num_shards),
+                                                     req_ret(0), entries_index(NULL) {
+  }
 
   int operate() {
     RGWRESTConn *conn = store->rest_master_conn;
 
     reenter(this) {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
+      entries_index = new RGWShardedOmapCRManager(async_rados, store, env, num_shards,
+                                                 store->get_zone_params().log_pool, "meta.full-sync.index");
       yield {
        call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), conn, http_manager,
                                       "/admin/metadata", NULL, &sections));
@@ -820,6 +966,12 @@ public:
          }
          for (list<string>::iterator iter = result.begin(); iter != result.end(); ++iter) {
            ldout(store->ctx(), 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
+           string s = *sections_iter + ":" + *iter;
+           int r = entries_index->append(s);
+           if (r < 0) {
+             ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): failed to append entry into index" << dendl;
+             return set_state(RGWCoroutine_Error);
+           }
          }
        }
       }
@@ -914,10 +1066,10 @@ int RGWRemoteMetaLog::init_sync_status(int num_shards)
   return run(new RGWInitSyncStatusCoroutine(async_rados, store, obj_ctx, num_shards));
 }
 
-int RGWRemoteMetaLog::run_sync(rgw_meta_sync_status& sync_status)
+int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status)
 {
   RGWObjectCtx obj_ctx(store, NULL);
-  return run(new RGWFetchAllMetaCR(store, &http_manager));
+  return run(new RGWFetchAllMetaCR(store, &http_manager, async_rados, num_shards));
 }
 
 int RGWCloneMetaLogCoroutine::operate()
index 6b5c2cd1eccfffb0dd91a10974bfab8608b990ef..1de14ef6e90effd6aed90a195ac4e5c2d6b2b40a 100644 (file)
@@ -144,7 +144,7 @@ public:
   int fetch(int num_shards, vector<string>& clone_markers);
   int read_sync_status(rgw_meta_sync_status *sync_status);
   int init_sync_status(int num_shards);
-  int run_sync(rgw_meta_sync_status& sync_status);
+  int run_sync(int num_shards, rgw_meta_sync_status& sync_status);
 };
 
 class RGWMetaSyncStatusManager {
@@ -195,7 +195,7 @@ public:
   int fetch() { return master_log.fetch(num_shards, clone_markers); }
   int clone_shards() { return master_log.clone_shards(num_shards, clone_markers); }
 
-  int run() { return master_log.run_sync(sync_status); }
+  int run() { return master_log.run_sync(num_shards, sync_status); }
 };
 
 #endif