From 9171db739ce396e51fee01d68ae05d26d4c8a0a8 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 13 Aug 2015 22:59:51 -0700 Subject: [PATCH] rgw: build index of all master zone meta keys still wip, but getting there Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 6 +- src/rgw/rgw_coroutine.h | 2 +- src/rgw/rgw_sync.cc | 166 +++++++++++++++++++++++++++++++++++++-- src/rgw/rgw_sync.h | 4 +- 4 files changed, 166 insertions(+), 12 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 133897ae67155..22c66cf8e3f8b 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -236,7 +236,7 @@ void RGWCoroutine::call(RGWCoroutine *op) assert(r == 0); } -void RGWCoroutine::spawn(RGWCoroutine *op) +void RGWCoroutine::spawn(RGWCoroutine *op, bool wait) { op->get(); spawned_ops.push_back(op); @@ -248,7 +248,9 @@ void RGWCoroutine::spawn(RGWCoroutine *op) env->stacks->push_back(stack); - env->stack->set_blocked_by(stack); + if (wait) { + env->stack->set_blocked_by(stack); + } } int RGWCoroutine::complete_spawned() diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 99e7a39af3012..a91fae9f203e4 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -91,7 +91,7 @@ protected: } void call(RGWCoroutine *op); - void spawn(RGWCoroutine *op); + void spawn(RGWCoroutine *op, bool wait); int complete_spawned(); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 3249feb63cab1..a082cc3e55389 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -123,6 +123,29 @@ public: bl(_bl), mtime(_mtime) {} }; +class RGWAsyncWriteOmapKeys : public RGWAsyncRadosRequest { + RGWRados *store; + rgw_obj obj; + map entries; + +protected: + int _send_request() { + librados::IoCtx ioctx; + librados::Rados *rados = store->get_rados_handle(); + int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */ + if (r < 0) { + lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl; + return r; + } + + return ioctx.omap_set(obj.get_object(), entries); + } +public: + RGWAsyncWriteOmapKeys(RGWAioCompletionNotifier *cn, RGWRados *_store, rgw_obj& _obj, + map& _entries) : RGWAsyncRadosRequest(cn), store(_store), + obj(_obj), entries(_entries) {} +}; + class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest { RGWRados *store; rgw_obj obj; @@ -546,6 +569,43 @@ public: } }; +class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + map entries; + + rgw_bucket pool; + string oid; + + RGWAsyncWriteOmapKeys *req; + +public: + RGWRadosSetOmapKeysCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + rgw_bucket& _pool, const string& _oid, + map& _entries) : RGWSimpleCoroutine(_store->ctx()), + async_rados(_async_rados), + store(_store), + entries(_entries), + pool(_pool), oid(_oid), + req(NULL) { + } + + ~RGWRadosSetOmapKeysCR() { + delete req; + } + + int send_request() { + rgw_obj obj = rgw_obj(pool, oid); + req = new RGWAsyncWriteOmapKeys(env->stack->create_completion_notifier(), store, obj, entries); + async_rados->queue(req); + return 0; + } + + int request_complete() { + return req->get_ret_status(); + } +}; + class RGWSimpleRadosLockCR : public RGWSimpleCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; @@ -715,7 +775,7 @@ public: for (int i = 0; i < (int)status.num_shards; i++) { rgw_meta_sync_marker marker; spawn(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, - RGWMetaSyncStatusManager::shard_obj_name(i), marker)); + RGWMetaSyncStatusManager::shard_obj_name(i), marker), true); } } yield { /* unlock */ @@ -765,7 +825,7 @@ int RGWReadSyncStatusCoroutine::handle_data(rgw_meta_sync_info& data) map& markers = sync_status->sync_markers; for (int i = 0; i < (int)data.num_shards; i++) { spawn(new RGWSimpleRadosReadCR(async_rados, store, obj_ctx, store->get_zone_params().log_pool, - RGWMetaSyncStatusManager::shard_obj_name(i), &markers[i])); + RGWMetaSyncStatusManager::shard_obj_name(i), &markers[i]), true); } return 0; } @@ -775,11 +835,89 @@ int RGWReadSyncStatusCoroutine::finish() return complete_spawned(); } +class RGWOmapAppend : public RGWCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + RGWCoroutinesEnv *env; + + rgw_bucket pool; + string oid; + + map entries; +public: + + RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutinesEnv *_env, rgw_bucket& _pool, const string& _oid) + : RGWCoroutine(_store->ctx()), async_rados(_async_rados), + store(_store), env(_env), pool(_pool), oid(_oid) {} + int append(const string& entry) { + entries[entry] = bufferlist(); +#define OMAP_APPEND_MAX_ENTRIES 10 +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; + if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) { +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; + int r = flush(); + if (r < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to flush entries" << dendl; + return r; + } + entries.clear(); + } + return 0; + } + + int flush() { +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; + return env->stack->call(this); + } + + int operate() { +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; + reenter(this) { + yield { +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; + call(new RGWRadosSetOmapKeysCR(async_rados, store, pool, oid, entries)); + } + if (get_ret_status() < 0) { + return set_state(RGWCoroutine_Error); + } + return set_state(RGWCoroutine_Done); + } + return 0; + } +}; + +class RGWShardedOmapCRManager { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + RGWCoroutinesEnv *env; + + int num_shards; + + vector shards; +public: + RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutinesEnv *_env, int _num_shards, rgw_bucket& pool, const string& oid_prefix) + : async_rados(_async_rados), + store(_store), env(_env), num_shards(_num_shards) { + shards.reserve(num_shards); + for (int i = 0; i < num_shards; ++i) { + char buf[oid_prefix.size() + 16]; + snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i); + RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, env, pool, buf); + shards.push_back(shard); + } + } + int append(const string& entry) { + static int counter = 0; + return (shards[++counter % shards.size()]->append(entry)); + } +}; + class RGWFetchAllMetaCR : public RGWCoroutine { RGWRados *store; RGWHTTPManager *http_manager; + RGWAsyncRadosProcessor *async_rados; - int max_entries; + int num_shards; int req_ret; @@ -788,15 +926,23 @@ class RGWFetchAllMetaCR : public RGWCoroutine { list::iterator sections_iter; list result; + RGWShardedOmapCRManager *entries_index; + public: - RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr) : RGWCoroutine(_store->ctx()), store(_store), + RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, int _num_shards) : RGWCoroutine(_store->ctx()), store(_store), http_manager(_mgr), - req_ret(0) {} + async_rados(_async_rados), + num_shards(_num_shards), + req_ret(0), entries_index(NULL) { + } int operate() { RGWRESTConn *conn = store->rest_master_conn; reenter(this) { +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; + entries_index = new RGWShardedOmapCRManager(async_rados, store, env, num_shards, + store->get_zone_params().log_pool, "meta.full-sync.index"); yield { call(new RGWReadRESTResourceCR >(store->ctx(), conn, http_manager, "/admin/metadata", NULL, §ions)); @@ -820,6 +966,12 @@ public: } for (list::iterator iter = result.begin(); iter != result.end(); ++iter) { ldout(store->ctx(), 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl; + string s = *sections_iter + ":" + *iter; + int r = entries_index->append(s); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): failed to append entry into index" << dendl; + return set_state(RGWCoroutine_Error); + } } } } @@ -914,10 +1066,10 @@ int RGWRemoteMetaLog::init_sync_status(int num_shards) return run(new RGWInitSyncStatusCoroutine(async_rados, store, obj_ctx, num_shards)); } -int RGWRemoteMetaLog::run_sync(rgw_meta_sync_status& sync_status) +int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status) { RGWObjectCtx obj_ctx(store, NULL); - return run(new RGWFetchAllMetaCR(store, &http_manager)); + return run(new RGWFetchAllMetaCR(store, &http_manager, async_rados, num_shards)); } int RGWCloneMetaLogCoroutine::operate() diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 6b5c2cd1eccff..1de14ef6e90ef 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -144,7 +144,7 @@ public: int fetch(int num_shards, vector& clone_markers); int read_sync_status(rgw_meta_sync_status *sync_status); int init_sync_status(int num_shards); - int run_sync(rgw_meta_sync_status& sync_status); + int run_sync(int num_shards, rgw_meta_sync_status& sync_status); }; class RGWMetaSyncStatusManager { @@ -195,7 +195,7 @@ public: int fetch() { return master_log.fetch(num_shards, clone_markers); } int clone_shards() { return master_log.clone_shards(num_shards, clone_markers); } - int run() { return master_log.run_sync(sync_status); } + int run() { return master_log.run_sync(num_shards, sync_status); } }; #endif -- 2.39.5