From c3206b9b1ccb8259ba676ec9d1c2b058ccf655ad Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 16 Jul 2015 16:44:39 -0700 Subject: [PATCH] rgw: rework metadata log sync to use async capabilities We now have a window of operations. The next step will be to asynchronously write to the local mdlog, so that we don't just hold the work on a synchronous request. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_sync.cc | 156 +++++++++++++++++++++++++++++++++++--------- src/rgw/rgw_sync.h | 81 +++++++++++++++++++++++ 2 files changed, 205 insertions(+), 32 deletions(-) create mode 100644 src/rgw/rgw_sync.h diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 4da713b7ca301..6bf6742959bda 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -144,51 +144,122 @@ int RGWRemoteMetaLog::get_shard_info(int shard_id) return 0; } +#define CLONE_MAX_ENTRIES 100 +#define CLONE_OPS_WINDOW 16 + +class RGWCloneMetaLogOp { + RGWRados *store; + RGWHTTPManager *http_manager; + + int shard_id; + string marker; + + int max_entries; + + RGWRESTReadResource *http_op; + + bool finished; + + enum State { + Init = 0, + SentRESTRequest = 1, + ReceivedRESTResponse = 2, + Done = 3, + } state; +public: + RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, int _id, const string& _marker) : store(_store), + http_manager(_mgr), shard_id(_id), + marker(_marker), max_entries(CLONE_MAX_ENTRIES), + http_op(NULL), finished(false), + state(RGWCloneMetaLogOp::Init) {} + + int operate(bool *need_wait); + + int send_clone_shard(); + int finish_clone_shard(bool *need_wait); + + bool is_done() { return (state == Done); } +}; + int RGWRemoteMetaLog::clone_shards() { - bool truncated; - - list active_shards; + list ops; for (int i = 0; i < (int)log_info.num_shards; i++) { - active_shards.push_back(i); + RGWCloneMetaLogOp *op = new RGWCloneMetaLogOp(store, &http_manager, i, clone_markers[i]); + ops.push_back(op); } - do { - truncated = false; - list next_active_shards; + int waiting_count = 0; + for (list::iterator iter = ops.begin(); iter != ops.end(); ++iter) { + bool need_wait; + RGWCloneMetaLogOp *op = *iter; + int ret = op->operate(&need_wait); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: op->operate() returned ret=" << ret << dendl; + } - for (list::iterator iter = active_shards.begin(); iter != active_shards.end(); ++iter) { - int shard_id = *iter; - bool shard_truncated; - string& shard_marker = clone_markers[shard_id]; - int ret = clone_shard(shard_id, shard_marker, &shard_marker, &shard_truncated); + if (need_wait) { + waiting_count++; + } + + if (waiting_count >= ops_window) { + RGWCloneMetaLogOp *op; + int ret = completion_mgr.get_next((void **)&op); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to clone shard: ret=" << ret << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; + } else { + waiting_count--; } - truncated |= shard_truncated; - - if (shard_truncated) { - next_active_shards.push_back(shard_id); + if (!op->is_done()) { + ops.push_back(op); + } else { + delete op; } } - active_shards.swap(next_active_shards); - } while (truncated); + } + + while (waiting_count > 0) { + RGWCloneMetaLogOp *op; + int ret = completion_mgr.get_next((void **)&op); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; + return ret; + } else { + waiting_count--; + } + } return 0; } -int RGWRemoteMetaLog::clone_shard(int shard_id, const string& marker, string *new_marker, bool *truncated) +int RGWCloneMetaLogOp::operate(bool *need_wait) { - *truncated = false; + switch (state) { + case Init: + ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending request" << dendl; + *need_wait = true; + return send_clone_shard(); + case SentRESTRequest: + ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": handling response" << dendl; + return finish_clone_shard(need_wait); + case ReceivedRESTResponse: + assert(0); + break; /* unreachable */ + case Done: + ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": done" << dendl; + break; + } - conn = store->rest_master_conn; + return 0; +} + +int RGWCloneMetaLogOp::send_clone_shard() +{ + RGWRESTConn *conn = store->rest_master_conn; char buf[32]; snprintf(buf, sizeof(buf), "%d", shard_id); -#define CLONE_MAX_ENTRIES 100 - int max_entries = CLONE_MAX_ENTRIES; - char max_entries_buf[32]; snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries); @@ -200,27 +271,41 @@ int RGWRemoteMetaLog::clone_shard(int shard_id, const string& marker, string *ne { marker_key, marker.c_str() }, { NULL, NULL } }; - RGWRESTReadResource http_op(conn, "/admin/log", pairs, NULL, &http_manager); + http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, http_manager); - rgw_mdlog_shard_data data; + http_op->set_user_info((void *)this); + + state = SentRESTRequest; - int ret = http_op.aio_read(); + int ret = http_op->aio_read(); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog data" << dendl; return ret; } - ret = http_op.wait(&data); + return 0; +} + +int RGWCloneMetaLogOp::finish_clone_shard(bool *need_wait) +{ + rgw_mdlog_shard_data data; + + int ret = http_op->wait(&data); + delete http_op; if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to wait for op, ret=" << ret << dendl; return ret; } + state = ReceivedRESTResponse; + ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl; - *truncated = ((int)data.entries.size() == max_entries); + bool truncated = ((int)data.entries.size() == max_entries); + *need_wait = false; if (data.entries.empty()) { + state = Done; return 0; } @@ -241,19 +326,26 @@ int RGWRemoteMetaLog::clone_shard(int shard_id, const string& marker, string *ne dest_entries.push_back(dest_entry); - *new_marker = entry.id; + marker = entry.id; } ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id); if (ret < 0) { ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl; + return ret; + } + + if (truncated) { + *need_wait = true; + return send_clone_shard(); + } else { + state = Done; } return 0; } - int RGWMetadataSync::init() { if (store->is_meta_master()) { diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h new file mode 100644 index 0000000000000..d04d2286517c8 --- /dev/null +++ b/src/rgw/rgw_sync.h @@ -0,0 +1,81 @@ +#ifndef CEPH_RGW_SYNC_H +#define CEPH_RGW_SYNC_H + +#include "rgw_common.h" +#include "rgw_rados.h" +#include "rgw_metadata.h" +#include "rgw_http_client.h" + +#include "common/RWLock.h" + + +#define dout_subsys ceph_subsys_rgw + + +struct rgw_mdlog_info { + uint32_t num_shards; + + rgw_mdlog_info() : num_shards(0) {} + + void decode_json(JSONObj *obj); +}; + +#define RGW_META_LOG_OPS_WINDOW 16 + +class RGWRemoteMetaLog { + RGWRados *store; + RGWRESTConn *conn; + + rgw_mdlog_info log_info; + + struct utime_shard { + utime_t ts; + int shard_id; + + utime_shard() : shard_id(-1) {} + + bool operator<(const utime_shard& rhs) const { + if (ts == rhs.ts) { + return shard_id < rhs.shard_id; + } + return ts < rhs.ts; + } + }; + + RWLock ts_to_shard_lock; + map ts_to_shard; + vector clone_markers; + + RGWCompletionManager completion_mgr; + RGWHTTPManager http_manager; + + int ops_window; + +public: + RGWRemoteMetaLog(RGWRados *_store) : store(_store), conn(NULL), ts_to_shard_lock("ts_to_shard_lock"), + http_manager(store->ctx(), &completion_mgr), + ops_window(RGW_META_LOG_OPS_WINDOW) {} + + int init(); + + int list_shard(int shard_id); + int list_shards(); + int get_shard_info(int shard_id); + int clone_shard(int shard_id, const string& marker, string *new_marker, bool *truncated); + int clone_shards(); +}; + +class RGWMetadataSync { + RGWRados *store; + + RGWRemoteMetaLog master_log; +public: + RGWMetadataSync(RGWRados *_store) : store(_store), master_log(store) {} + + int init(); + + int clone_shards() { return master_log.clone_shards(); } + +}; + +#endif -- 2.39.5