From eac4e794717cf5161dc3a2b21c79b07e8752a5ed Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 7 Jul 2015 13:56:03 -0700 Subject: [PATCH] rgw: initial mdlog clone implementation naive handling, not paging, unbalanced. Signed-off-by: Yehuda Sadeh --- src/cls/log/cls_log.cc | 10 +++-- src/cls/log/cls_log_client.cc | 2 +- src/cls/log/cls_log_client.h | 2 +- src/cls/log/cls_log_ops.h | 11 +++-- src/rgw/rgw_admin.cc | 6 +++ src/rgw/rgw_json_enc.cc | 2 + src/rgw/rgw_metadata.cc | 16 +++++++- src/rgw/rgw_metadata.h | 3 ++ src/rgw/rgw_rados.cc | 12 +++++- src/rgw/rgw_rados.h | 3 +- src/rgw/rgw_sync.cc | 77 ++++++++++++++++++++++++++++++++++- 11 files changed, 130 insertions(+), 14 deletions(-) diff --git a/src/cls/log/cls_log.cc b/src/cls/log/cls_log.cc index 23df866ad4837..89745bb8bb73a 100644 --- a/src/cls/log/cls_log.cc +++ b/src/cls/log/cls_log.cc @@ -120,16 +120,20 @@ static int cls_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *ou string index; utime_t timestamp = entry.timestamp; - if (timestamp < header.max_time) + if (op.monotonic_inc && timestamp < header.max_time) timestamp = header.max_time; else if (timestamp > header.max_time) header.max_time = timestamp; - get_index(hctx, timestamp, index); + if (entry.id.empty()) { + get_index(hctx, timestamp, index); + entry.id = index; + } else { + index = entry.id; + } CLS_LOG(0, "storing entry at %s", index.c_str()); - entry.id = index; if (index > header.max_marker) header.max_marker = index; diff --git a/src/cls/log/cls_log_client.cc b/src/cls/log/cls_log_client.cc index 0334deb57d3aa..88e104597330d 100644 --- a/src/cls/log/cls_log_client.cc +++ b/src/cls/log/cls_log_client.cc @@ -10,7 +10,7 @@ using namespace librados; -void cls_log_add(librados::ObjectWriteOperation& op, list& entries) +void cls_log_add(librados::ObjectWriteOperation& op, list& entries, bool monotonic_inc) { bufferlist in; cls_log_add_op call; diff --git a/src/cls/log/cls_log_client.h b/src/cls/log/cls_log_client.h index 16229c992b92a..c96dbb085e01c 100644 --- a/src/cls/log/cls_log_client.h +++ b/src/cls/log/cls_log_client.h @@ -12,7 +12,7 @@ void cls_log_add_prepare_entry(cls_log_entry& entry, const utime_t& timestamp, const string& section, const string& name, bufferlist& bl); -void cls_log_add(librados::ObjectWriteOperation& op, list& entry); +void cls_log_add(librados::ObjectWriteOperation& op, list& entries, bool monotonic_inc); void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry); void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, const string& section, const string& name, bufferlist& bl); diff --git a/src/cls/log/cls_log_ops.h b/src/cls/log/cls_log_ops.h index ad251fdcb8439..3c91523332b84 100644 --- a/src/cls/log/cls_log_ops.h +++ b/src/cls/log/cls_log_ops.h @@ -9,18 +9,23 @@ struct cls_log_add_op { list entries; + bool monotonic_inc; - cls_log_add_op() {} + cls_log_add_op() : monotonic_inc(true) {} void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); ::encode(entries, bl); + ::encode(monotonic_inc, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator& bl) { - DECODE_START(1, bl); + DECODE_START(2, bl); ::decode(entries, bl); + if (struct_v >= 2) { + ::decode(monotonic_inc, bl); + } DECODE_FINISH(bl); } }; diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index dc7985ffd7c1a..f506d0f904f5c 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2899,6 +2899,12 @@ next: return -ret; } + ret = sync.clone_shards(); + if (ret < 0) { + cerr << "ERROR: sync.clone_shards() returned ret=" << ret << std::endl; + return -ret; + } + } if (opt_cmd == OPT_BILOG_LIST) { diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index d16238fda5aaa..608cdb4899f57 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -15,6 +15,8 @@ #include "common/ceph_json.h" #include "common/Formatter.h" +#define dout_subsys ceph_subsys_rgw + void encode_json(const char *name, const obj_version& v, Formatter *f) { f->open_object_section(name); diff --git a/src/rgw/rgw_metadata.cc b/src/rgw/rgw_metadata.cc index 9e4116fe97690..4cf3c2d0cb573 100644 --- a/src/rgw/rgw_metadata.cc +++ b/src/rgw/rgw_metadata.cc @@ -61,7 +61,8 @@ void RGWMetadataLogData::dump(Formatter *f) const { } void decode_json_obj(RGWMDLogStatus& status, JSONObj *obj) { - string s = obj->get_data(); + string s; + JSONDecoder::decode_json("status", s, obj); if (s == "complete") { status = MDLOG_STATUS_COMPLETE; } else if (s == "write") { @@ -98,6 +99,14 @@ int RGWMetadataLog::add_entry(RGWRados *store, RGWMetadataHandler *handler, cons return store->time_log_add(oid, now, section, key, bl); } +int RGWMetadataLog::store_entries_in_shard(RGWRados *store, list& entries, int shard_id) +{ + string oid; + + store->shard_name(prefix, shard_id, oid); + return store->time_log_add(oid, entries, false); +} + void RGWMetadataLog::init_list_entries(int shard_id, utime_t& from_time, utime_t& end_time, string& marker, void **handle) { @@ -258,6 +267,11 @@ RGWMetadataManager::~RGWMetadataManager() delete md_log; } +int RGWMetadataManager::store_md_log_entries(list& entries, int shard_id) +{ + return md_log->store_entries_in_shard(store, entries, shard_id); +} + int RGWMetadataManager::register_handler(RGWMetadataHandler *handler) { string type = handler->get_type(); diff --git a/src/rgw/rgw_metadata.h b/src/rgw/rgw_metadata.h index de40bd8862037..ba4b3893271b5 100644 --- a/src/rgw/rgw_metadata.h +++ b/src/rgw/rgw_metadata.h @@ -150,6 +150,7 @@ public: RGWMetadataLog(CephContext *_cct, RGWRados *_store) : cct(_cct), store(_store), prefix(META_LOG_OBJ_PREFIX) {} int add_entry(RGWRados *store, RGWMetadataHandler *handler, const string& section, const string& key, bufferlist& bl); + int store_entries_in_shard(RGWRados *store, list& entries, int shard_id); struct LogListCtx { int cur_shard; @@ -227,6 +228,8 @@ public: RGWMetadataHandler *get_handler(const char *type); + int store_md_log_entries(list& entries, int shard_id); + int put_entry(RGWMetadataHandler *handler, const string& key, bufferlist& bl, bool exclusive, RGWObjVersionTracker *objv_tracker, time_t mtime, map *pattrs = NULL); int remove_entry(RGWMetadataHandler *handler, string& key, RGWObjVersionTracker *objv_tracker); diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 88de576b8e888..c592ab262a94c 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2249,6 +2249,14 @@ void RGWRados::shard_name(const string& prefix, unsigned max_shards, const strin name = prefix + buf; } +void RGWRados::shard_name(const string& prefix, unsigned shard_id, string& name) +{ + char buf[16]; + snprintf(buf, sizeof(buf), "%u", shard_id); + name = prefix + buf; + +} + void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl) { cls_log_add_prepare_entry(entry, ut, section, key, bl); @@ -2280,7 +2288,7 @@ int RGWRados::time_log_add(const string& oid, const utime_t& ut, const string& s return r; } -int RGWRados::time_log_add(const string& oid, list& entries) +int RGWRados::time_log_add(const string& oid, list& entries, bool monotonic_inc) { librados::IoCtx io_ctx; @@ -2300,7 +2308,7 @@ int RGWRados::time_log_add(const string& oid, list& entries) return r; ObjectWriteOperation op; - cls_log_add(op, entries); + cls_log_add(op, entries, monotonic_inc); r = io_ctx.operate(oid, &op); return r; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index fa4047774b923..06af3af91e7c7 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2165,8 +2165,9 @@ public: void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name); void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name); + void shard_name(const string& prefix, unsigned shard_id, string& name); void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl); - int time_log_add(const string& oid, list& entries); + int time_log_add(const string& oid, list& entries, bool monotonic_inc = true); int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl); int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time, int max_entries, list& entries, diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 6c5594dc299e2..7bca46a234e47 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -36,7 +36,7 @@ void rgw_mdlog_entry::decode_json(JSONObj *obj) { JSONDecoder::decode_json("section", section, obj); JSONDecoder::decode_json("name", name, obj); JSONDecoder::decode_json("timestamp", timestamp, obj); - JSONDecoder::decode_json("log_data", log_data, obj); + JSONDecoder::decode_json("data", log_data, obj); } void rgw_mdlog_shard_data::decode_json(JSONObj *obj) { @@ -60,7 +60,12 @@ int RGWRemoteMetaLog::init() ldout(store->ctx(), 20) << "remote mdlog, num_shards=" << log_info.num_shards << dendl; - for (int i = 0; i < log_info.num_shards; i++) { + return 0; +} + +int RGWRemoteMetaLog::list_shards() +{ + for (int i = 0; i < (int)log_info.num_shards; i++) { int ret = list_shard(i); if (ret < 0) { ldout(store->ctx(), 10) << "failed to list shard: ret=" << ret << dendl; @@ -90,6 +95,74 @@ int RGWRemoteMetaLog::list_shard(int shard_id) ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl; + vector::iterator iter; + for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) { + rgw_mdlog_entry& entry = *iter; + ldout(store->ctx(), 20) << "entry: name=" << entry.name << dendl; + } + + return 0; +} + +int RGWRemoteMetaLog::clone_shards() +{ + for (int i = 0; i < (int)log_info.num_shards; i++) { + int ret = clone_shard(i); + if (ret < 0) { + ldout(store->ctx(), 10) << "failed to clone shard: ret=" << ret << dendl; + } + } + + return 0; +} + +int RGWRemoteMetaLog::clone_shard(int shard_id) +{ + conn = store->rest_master_conn; + + char buf[32]; + snprintf(buf, sizeof(buf), "%d", shard_id); + + rgw_http_param_pair pairs[] = { { "type", "metadata" }, + { "id", buf }, + { NULL, NULL } }; + + rgw_mdlog_shard_data data; + int ret = conn->get_json_resource("/admin/log", pairs, data); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog data" << dendl; + return ret; + } + + ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl; + + if (data.entries.empty()) { + return 0; + } + + list dest_entries; + + vector::iterator iter; + for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) { + rgw_mdlog_entry& entry = *iter; + ldout(store->ctx(), 20) << "entry: name=" << entry.name << dendl; + + cls_log_entry dest_entry; + dest_entry.id = entry.id; + dest_entry.section = entry.section; + dest_entry.name = entry.name; + dest_entry.timestamp = entry.timestamp; + + ::encode(entry.log_data, dest_entry.data); + + dest_entries.push_back(dest_entry); + } + + ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id); + if (ret < 0) { + ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl; + } + return 0; } -- 2.39.5