From 9e233880b5bfe7aa66b6b24743e56e60e13dbe70 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 17 Jul 2015 16:29:19 -0700 Subject: [PATCH] rgw: md log cloning completely async hook librados completions into the rgw completion notifier, split the md log write into two different states. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket.cc | 2 +- src/rgw/rgw_metadata.cc | 8 ++-- src/rgw/rgw_metadata.h | 4 +- src/rgw/rgw_rados.cc | 42 +++++++++++-------- src/rgw/rgw_rados.h | 4 +- src/rgw/rgw_sync.cc | 93 ++++++++++++++++++++++++++++++++--------- 6 files changed, 107 insertions(+), 46 deletions(-) diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 5ac344804f358..c448e65adf6ab 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -1263,7 +1263,7 @@ int RGWDataChangesLog::renew_entries() utime_t now = ceph_clock_now(cct); - int ret = store->time_log_add(oids[miter->first], entries); + int ret = store->time_log_add(oids[miter->first], entries, NULL); if (ret < 0) { /* we don't really need to have a special handling for failed cases here, * as this is just an optimization. */ diff --git a/src/rgw/rgw_metadata.cc b/src/rgw/rgw_metadata.cc index 4cf3c2d0cb573..b4cbba750d7ad 100644 --- a/src/rgw/rgw_metadata.cc +++ b/src/rgw/rgw_metadata.cc @@ -99,12 +99,12 @@ int RGWMetadataLog::add_entry(RGWRados *store, RGWMetadataHandler *handler, cons return store->time_log_add(oid, now, section, key, bl); } -int RGWMetadataLog::store_entries_in_shard(RGWRados *store, list& entries, int shard_id) +int RGWMetadataLog::store_entries_in_shard(RGWRados *store, list& entries, int shard_id, librados::AioCompletion *completion) { string oid; store->shard_name(prefix, shard_id, oid); - return store->time_log_add(oid, entries, false); + return store->time_log_add(oid, entries, completion, false); } void RGWMetadataLog::init_list_entries(int shard_id, utime_t& from_time, utime_t& end_time, @@ -267,9 +267,9 @@ RGWMetadataManager::~RGWMetadataManager() delete md_log; } -int RGWMetadataManager::store_md_log_entries(list& entries, int shard_id) +int RGWMetadataManager::store_md_log_entries(list& entries, int shard_id, librados::AioCompletion *completion) { - return md_log->store_entries_in_shard(store, entries, shard_id); + return md_log->store_entries_in_shard(store, entries, shard_id, completion); } int RGWMetadataManager::register_handler(RGWMetadataHandler *handler) diff --git a/src/rgw/rgw_metadata.h b/src/rgw/rgw_metadata.h index ba4b3893271b5..981f43782c52a 100644 --- a/src/rgw/rgw_metadata.h +++ b/src/rgw/rgw_metadata.h @@ -150,7 +150,7 @@ public: RGWMetadataLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), prefix(META_LOG_OBJ_PREFIX) {} int add_entry(RGWRados *store, RGWMetadataHandler *handler, const string& section, const string& key, bufferlist& bl); - int store_entries_in_shard(RGWRados *store, list& entries, int shard_id); + int store_entries_in_shard(RGWRados *store, list& entries, int shard_id, librados::AioCompletion *completion); struct LogListCtx { int cur_shard; @@ -228,7 +228,7 @@ public: RGWMetadataHandler *get_handler(const char *type); - int store_md_log_entries(list& entries, int shard_id); + int store_md_log_entries(list& entries, int shard_id, librados::AioCompletion *completion); int put_entry(RGWMetadataHandler *handler, const string& key, bufferlist& bl, bool exclusive, RGWObjVersionTracker *objv_tracker, time_t mtime, map *pattrs = NULL); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index b36d4cd1d7b95..2c598d0ef7c6c 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2263,10 +2263,8 @@ void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, c cls_log_add_prepare_entry(entry, ut, section, key, bl); } -int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl) +int RGWRados::time_log_add_init(librados::IoCtx& io_ctx) { - librados::IoCtx io_ctx; - const char *log_pool = zone.log_pool.name.c_str(); librados::Rados *rad = get_rados_handle(); int r = rad->ioctx_create(log_pool, io_ctx); @@ -2282,6 +2280,19 @@ int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& s if (r < 0) return r; + return 0; + +} + +int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl) +{ + librados::IoCtx io_ctx; + + int r = time_log_add_init(io_ctx); + if (r < 0) { + return r; + } + ObjectWriteOperation op; cls_log_add(op, ut, section, key, bl); @@ -2289,29 +2300,24 @@ int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& s return r; } -int RGWRados::time_log_add(const string& oid, list& entries, bool monotonic_inc) +int RGWRados::time_log_add(const string& oid, list& entries, + librados::AioCompletion *completion, bool monotonic_inc) { librados::IoCtx io_ctx; - const char *log_pool = zone.log_pool.name.c_str(); - librados::Rados *rad = get_rados_handle(); - int r = rad->ioctx_create(log_pool, io_ctx); - if (r == -ENOENT) { - rgw_bucket pool(log_pool); - r = create_pool(pool); - if (r < 0) - return r; - - // retry - r = rad->ioctx_create(log_pool, io_ctx); - } - if (r < 0) + int r = time_log_add_init(io_ctx); + if (r < 0) { return r; + } ObjectWriteOperation op; cls_log_add(op, entries, monotonic_inc); - r = io_ctx.operate(oid, &op); + if (!completion) { + r = io_ctx.operate(oid, &op); + } else { + r = io_ctx.aio_operate(oid, completion, &op); + } return r; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 1aaa54ba4030f..56cfef29f4c9e 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2167,7 +2167,9 @@ public: void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name); void shard_name(const string& prefix, unsigned shard_id, string& name); void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl); - int time_log_add(const string& oid, list& entries, bool monotonic_inc = true); + int time_log_add_init(librados::IoCtx& io_ctx); + int time_log_add(const string& oid, list& entries, + librados::AioCompletion *completion, bool monotonic_inc = true); int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl); int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time, int max_entries, list& entries, diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 6bf6742959bda..88bc87a61f466 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -1,5 +1,6 @@ #include "common/ceph_json.h" #include "common/RWLock.h" +#include "common/RefCountedObj.h" #include "rgw_common.h" #include "rgw_rados.h" @@ -144,39 +145,80 @@ int RGWRemoteMetaLog::get_shard_info(int shard_id) return 0; } +static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg); + +/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */ +class AioCompletionNotifier : public RefCountedObject { + librados::AioCompletion *c; + RGWCompletionManager *completion_mgr; + void *user_data; + +public: + AioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), user_data(_user_data) { + c = librados::Rados::aio_create_completion((void *)this, _aio_completion_notifier_cb, NULL); + } + + ~AioCompletionNotifier() { + c->release(); + } + + librados::AioCompletion *completion() { + return c; + } + + void cb() { + completion_mgr->complete(user_data); + put(); + } +}; + +static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) +{ + ((AioCompletionNotifier *)arg)->cb(); +} + #define CLONE_MAX_ENTRIES 100 #define CLONE_OPS_WINDOW 16 class RGWCloneMetaLogOp { RGWRados *store; RGWHTTPManager *http_manager; + RGWCompletionManager *completion_mgr; int shard_id; string marker; + bool truncated; int max_entries; RGWRESTReadResource *http_op; + AioCompletionNotifier *md_op_notifier; + bool finished; enum State { Init = 0, SentRESTRequest = 1, ReceivedRESTResponse = 2, - Done = 3, + StoringMDLogEntries = 3, + Done = 4, } state; +#warning need an error state public: - RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, int _id, const string& _marker) : store(_store), - http_manager(_mgr), shard_id(_id), - marker(_marker), max_entries(CLONE_MAX_ENTRIES), - http_op(NULL), finished(false), - state(RGWCloneMetaLogOp::Init) {} + RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, RGWCompletionManager *_completion_mgr, + int _id, const string& _marker) : store(_store), + http_manager(_mgr), completion_mgr(_completion_mgr), shard_id(_id), + marker(_marker), truncated(false), max_entries(CLONE_MAX_ENTRIES), + http_op(NULL), md_op_notifier(NULL), + finished(false), + state(RGWCloneMetaLogOp::Init) {} int operate(bool *need_wait); - int send_clone_shard(); - int finish_clone_shard(bool *need_wait); + int state_init(bool *need_wait); + int state_sent_rest_request(bool *need_wait); + int state_storing_mdlog_entries(bool *need_wait); bool is_done() { return (state == Done); } }; @@ -185,7 +227,7 @@ int RGWRemoteMetaLog::clone_shards() { list ops; for (int i = 0; i < (int)log_info.num_shards; i++) { - RGWCloneMetaLogOp *op = new RGWCloneMetaLogOp(store, &http_manager, i, clone_markers[i]); + RGWCloneMetaLogOp *op = new RGWCloneMetaLogOp(store, &http_manager, &completion_mgr, i, clone_markers[i]); ops.push_back(op); } @@ -237,14 +279,15 @@ int RGWCloneMetaLogOp::operate(bool *need_wait) switch (state) { case Init: ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending request" << dendl; - *need_wait = true; - return send_clone_shard(); + return state_init(need_wait); case SentRESTRequest: ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": handling response" << dendl; - return finish_clone_shard(need_wait); + return state_sent_rest_request(need_wait); case ReceivedRESTResponse: assert(0); break; /* unreachable */ + case StoringMDLogEntries: + return state_storing_mdlog_entries(need_wait); case Done: ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": done" << dendl; break; @@ -253,7 +296,7 @@ int RGWCloneMetaLogOp::operate(bool *need_wait) return 0; } -int RGWCloneMetaLogOp::send_clone_shard() +int RGWCloneMetaLogOp::state_init(bool *need_wait) { RGWRESTConn *conn = store->rest_master_conn; @@ -275,18 +318,19 @@ int RGWCloneMetaLogOp::send_clone_shard() http_op->set_user_info((void *)this); - state = SentRESTRequest; - int ret = http_op->aio_read(); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog data" << dendl; return ret; } + *need_wait = true; + state = SentRESTRequest; + return 0; } -int RGWCloneMetaLogOp::finish_clone_shard(bool *need_wait) +int RGWCloneMetaLogOp::state_sent_rest_request(bool *need_wait) { rgw_mdlog_shard_data data; @@ -301,7 +345,7 @@ int RGWCloneMetaLogOp::finish_clone_shard(bool *need_wait) ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl; - bool truncated = ((int)data.entries.size() == max_entries); + truncated = ((int)data.entries.size() == max_entries); *need_wait = false; if (data.entries.empty()) { @@ -329,16 +373,25 @@ int RGWCloneMetaLogOp::finish_clone_shard(bool *need_wait) marker = entry.id; } - ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id); + state = StoringMDLogEntries; + + md_op_notifier = new AioCompletionNotifier(completion_mgr, (void *)this); + + ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, md_op_notifier->completion()); if (ret < 0) { ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl; return ret; } + *need_wait = true; + return 0; +} +int RGWCloneMetaLogOp::state_storing_mdlog_entries(bool *need_wait) +{ if (truncated) { - *need_wait = true; - return send_clone_shard(); + return state_init(need_wait); } else { + *need_wait = false; state = Done; } -- 2.39.5