From 8ff397986671fb423c2fb86de482f0be1e005c89 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 6 Oct 2015 13:52:11 -0700 Subject: [PATCH] rgw: incremental data sync still needs some more work, but basic structure is there. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 17 +++- src/rgw/rgw_bucket.cc | 36 ++++++-- src/rgw/rgw_bucket.h | 32 ++++++- src/rgw/rgw_data_sync.cc | 179 +++++++++++++++++++++++++++++---------- src/rgw/rgw_rest_log.cc | 12 ++- src/rgw/rgw_rest_log.h | 5 +- 6 files changed, 221 insertions(+), 60 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 8774e94779bfc..9214b05e52900 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1413,6 +1413,8 @@ int main(int argc, char **argv) int sync_stats = false; int reset_regions = false; + int extra_info = false; + uint64_t min_rewrite_size = 4 * 1024 * 1024; uint64_t max_rewrite_size = ULLONG_MAX; uint64_t min_rewrite_stripe_size = 0; @@ -1614,6 +1616,9 @@ int main(int argc, char **argv) // do nothing } else if (ceph_argparse_binary_flag(args, i, &reset_regions, NULL, "--reset-regions", (char*)NULL)) { // do nothing + } else if (ceph_argparse_binary_flag(args, i, &extra_info, NULL, "--extra-info", (char*)NULL)) { + // do nothing + } else if (ceph_argparse_binary_flag(args, i, &reset_regions, NULL, "--reset-regions", (char*)NULL)) { } else if (ceph_argparse_witharg(args, i, &val, "--caps", (char*)NULL)) { caps = val; } else if (ceph_argparse_witharg(args, i, &val, "-i", "--infile", (char*)NULL)) { @@ -4093,7 +4098,7 @@ next: RGWDataChangesLog::LogMarker marker; do { - list entries; + list entries; ret = log->list_entries(start_time, end_time, max_entries - count, entries, marker, &truncated); if (ret < 0) { cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl; @@ -4102,9 +4107,13 @@ next: count += entries.size(); - for (list::iterator iter = entries.begin(); iter != entries.end(); ++iter) { - rgw_data_change& entry = *iter; - encode_json("entry", entry, formatter); + for (list::iterator iter = entries.begin(); iter != entries.end(); ++iter) { + rgw_data_change_log_entry& entry = *iter; + if (!extra_info) { + encode_json("entry", entry.entry, formatter); + } else { + encode_json("entry", entry, formatter); + } } formatter->flush(cout); } while (truncated && count < max_entries); diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 5cae3851d0401..d5a016f6fb28e 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -1204,6 +1204,30 @@ void rgw_data_change::dump(Formatter *f) const encode_json("timestamp", timestamp, f); } +void rgw_data_change::decode_json(JSONObj *obj) { + string s; + JSONDecoder::decode_json("entity_type", s, obj); + if (s == "bucket") { + entity_type = ENTITY_TYPE_BUCKET; + } else { + entity_type = ENTITY_TYPE_UNKNOWN; + } + JSONDecoder::decode_json("key", key, obj); + JSONDecoder::decode_json("timestamp", timestamp, obj); +} + +void rgw_data_change_log_entry::dump(Formatter *f) const +{ + encode_json("log_id", log_id, f); + encode_json("log_timestamp", log_timestamp, f); + encode_json("entry", entry, f); +} + +void rgw_data_change_log_entry::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("log_id", log_id, obj); + JSONDecoder::decode_json("log_timestamp", log_timestamp, obj); + JSONDecoder::decode_json("entry", entry, obj); +} int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) { const string& name = bs.bucket.name; @@ -1408,7 +1432,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { } int RGWDataChangesLog::list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, - list& entries, + list& entries, const string& marker, string *out_marker, bool *truncated) { @@ -1423,22 +1447,24 @@ int RGWDataChangesLog::list_entries(int shard, utime_t& start_time, utime_t& end list::iterator iter; for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) { - rgw_data_change entry; + rgw_data_change_log_entry log_entry; + log_entry.log_id = iter->id; + log_entry.log_timestamp = iter->timestamp; bufferlist::iterator liter = iter->data.begin(); try { - ::decode(entry, liter); + ::decode(log_entry.entry, liter); } catch (buffer::error& err) { lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl; return -EIO; } - entries.push_back(entry); + entries.push_back(log_entry); } return 0; } int RGWDataChangesLog::list_entries(utime_t& start_time, utime_t& end_time, int max_entries, - list& entries, LogMarker& marker, bool *ptruncated) { + list& entries, LogMarker& marker, bool *ptruncated) { bool truncated; entries.clear(); diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 65e40641e361d..a9be9b94a2ba7 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -306,6 +306,7 @@ public: enum DataLogEntityType { + ENTITY_TYPE_UNKNOWN = 0, ENTITY_TYPE_BUCKET = 1, }; @@ -334,9 +335,36 @@ struct rgw_data_change { } void dump(Formatter *f) const; + void decode_json(JSONObj *obj); }; WRITE_CLASS_ENCODER(rgw_data_change) +struct rgw_data_change_log_entry { + string log_id; + utime_t log_timestamp; + rgw_data_change entry; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(log_id, bl); + ::encode(log_timestamp, bl); + ::encode(entry, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(log_id, bl); + ::decode(log_timestamp, bl); + ::decode(entry, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; +WRITE_CLASS_ENCODER(rgw_data_change_log_entry) + struct RGWDataChangesLogInfo { string marker; utime_t last_update; @@ -426,7 +454,7 @@ public: int add_entry(rgw_bucket& bucket, int shard_id); int renew_entries(); int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, - list& entries, + list& entries, const string& marker, string *out_marker, bool *truncated); @@ -448,7 +476,7 @@ public: LogMarker() : shard(0) {} }; int list_entries(utime_t& start_time, utime_t& end_time, int max_entries, - list& entries, LogMarker& marker, bool *ptruncated); + list& entries, LogMarker& marker, bool *ptruncated); bool going_down(); }; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 209a119444d67..7df764e457442 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -155,6 +155,89 @@ public: } }; +struct read_remote_data_log_response { + string marker; + bool truncated; + list entries; + + read_remote_data_log_response() : truncated(false) {} + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("marker", marker, obj); + JSONDecoder::decode_json("truncated", truncated, obj); + JSONDecoder::decode_json("entries", entries, obj); + }; +}; + +class RGWReadRemoteDataLogShardCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http_manager; + RGWAsyncRadosProcessor *async_rados; + + RGWRESTReadResource *http_op; + + int shard_id; + string marker; + list *entries; + bool *truncated; + + read_remote_data_log_response response; + +public: + RGWReadRemoteDataLogShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + int _shard_id, const string& _marker, list *_entries, bool *_truncated) : RGWCoroutine(_store->ctx()), store(_store), + http_manager(_mgr), + async_rados(_async_rados), + http_op(NULL), + shard_id(_shard_id), + marker(_marker), + entries(_entries), + truncated(_truncated) { + } + + int operate() { + RGWRESTConn *conn = store->rest_master_conn; + reenter(this) { + yield { + char buf[16]; + snprintf(buf, sizeof(buf), "%d", shard_id); + rgw_http_param_pair pairs[] = { { "type" , "data" }, + { "id", buf }, + { "marker", marker.c_str() }, + { "extra-info", "true" }, + { NULL, NULL } }; + + string p = "/admin/log/"; + + http_op = new RGWRESTReadResource(conn, p, pairs, NULL, http_manager); + + http_op->set_user_info((void *)stack); + + int ret = http_op->aio_read(); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl; + log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; + http_op->put(); + return set_state(RGWCoroutine_Error, ret); + } + + return io_block(0); + } + yield { + int ret = http_op->wait(&response); + if (ret < 0) { + return set_state(RGWCoroutine_Error, ret); + } + entries->clear(); + entries->swap(response.entries); + *truncated = response.truncated; + return set_state(RGWCoroutine_Done, 0); + } + } + return 0; + } +}; + class RGWInitDataSyncStatusCoroutine : public RGWCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; @@ -546,6 +629,26 @@ public: int operate(); }; +static int parse_bucket_shard(CephContext *cct, const string& raw_key, string *bucket_name, string *bucket_instance, int *shard_id) +{ + ssize_t pos = raw_key.find(':'); + *bucket_name = raw_key.substr(0, pos); + *bucket_instance = raw_key.substr(pos + 1); + pos = bucket_instance->find(':'); + *shard_id = -1; + if (pos >= 0) { + string err; + string s = bucket_instance->substr(pos + 1); + *shard_id = strict_strtol(s.c_str(), 10, &err); + if (!err.empty()) { + ldout(cct, 0) << "ERROR: failed to parse bucket instance key: " << *bucket_instance << dendl; + return -EINVAL; + } + + *bucket_instance = bucket_instance->substr(0, pos); + } + return 0; +} class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWRados *store; @@ -558,7 +661,6 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { string raw_key; string entry_marker; - ssize_t pos; string bucket_name; string bucket_instance; @@ -576,30 +678,19 @@ public: async_rados(_async_rados), conn(_conn), source_zone(_source_zone), raw_key(_raw_key), entry_marker(_entry_marker), - pos(0), sync_status(0), + sync_status(0), marker_tracker(_marker_tracker) { } int operate() { reenter(this) { yield { - pos = raw_key.find(':'); - bucket_name = raw_key.substr(0, pos); - bucket_instance = raw_key.substr(pos + 1); - pos = bucket_instance.find(':'); - int shard_id = -1; - if (pos >= 0) { - string err; - string s = bucket_instance.substr(pos + 1); - shard_id = strict_strtol(s.c_str(), 10, &err); - if (!err.empty()) { - ldout(store->ctx(), 0) << "ERROR: failed to parse bucket instance key: " << bucket_instance << dendl; - return set_state(RGWCoroutine_Error, -EIO); - } - - bucket_instance = bucket_instance.substr(0, pos); + int shard_id; + int ret = parse_bucket_shard(store->ctx(), raw_key, &bucket_name, &bucket_instance, &shard_id); + if (ret < 0) { + return set_state(RGWCoroutine_Error, -EIO); } - int ret = call(new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_instance, shard_id)); + ret = call(new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_instance, shard_id)); if (ret < 0) { #warning failed syncing bucket, need to log return set_state(RGWCoroutine_Error, sync_status); @@ -647,12 +738,12 @@ class RGWDataSyncShardCR : public RGWCoroutine { RGWDataSyncShardMarkerTrack *marker_tracker; - list log_entries; - list::iterator log_iter; + list log_entries; + list::iterator log_iter; bool truncated; - string mdlog_marker; - string raw_key; + RGWDataChangesLogInfo shard_info; + string datalog_marker; Mutex inc_lock; Cond inc_cond; @@ -746,43 +837,43 @@ public: int incremental_sync() { -#if 0 reenter(&incremental_cr) { - mdlog_marker = sync_marker.marker; set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados, - RGWDataSyncStatusManager::shard_obj_name(shard_id), + RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id), sync_marker)); do { + yield { + int ret = call(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, shard_id, &shard_info)); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadRemoteDataLogShardInfoCR() ret=" << ret << dendl; + return set_state(RGWCoroutine_Error, ret); + } + } + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + datalog_marker = shard_info.marker; #define INCREMENTAL_MAX_ENTRIES 100 - ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; - if (mdlog_marker <= sync_marker.marker) { - /* we're at the tip, try to bring more entries */ - ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl; - yield call(new RGWCloneMetaLogCoroutine(store, http_manager, shard_id, mdlog_marker, &mdlog_marker)); - } - ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; - if (mdlog_marker > sync_marker.marker) { - yield call(new RGWReadMDLogEntriesCR(async_rados, store, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated)); + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + if (datalog_marker > sync_marker.marker) { + yield call(new RGWReadRemoteDataLogShardCR(store, http_manager, async_rados, shard_id, sync_marker.marker, &log_entries, &truncated)); for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) { - ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl; - marker_tracker->start(log_iter->id); - raw_key = log_iter->section + ":" + log_iter->name; - yield spawn(new RGWMetaSyncSingleEntryCR(store, http_manager, async_rados, raw_key, log_iter->id, marker_tracker), false); + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl; + marker_tracker->start(log_iter->log_id); + yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false); if (retcode < 0) { return set_state(RGWCoroutine_Error, retcode); - } + } } } - ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; - if (mdlog_marker == sync_marker.marker) { + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + if (datalog_marker == sync_marker.marker) { #define INCREMENTAL_INTERVAL 20 yield wait(utime_t(INCREMENTAL_INTERVAL, 0)); } } while (true); } - /* TODO */ - return 0; -#endif return 0; } }; diff --git a/src/rgw/rgw_rest_log.cc b/src/rgw/rgw_rest_log.cc index 96a628c9089db..dcfd92efeee23 100644 --- a/src/rgw/rgw_rest_log.cc +++ b/src/rgw/rgw_rest_log.cc @@ -514,6 +514,8 @@ void RGWOp_DATALog_List::execute() { ut_et; unsigned shard_id, max_entries = LOG_CLASS_LIST_MAX_ENTRIES; + s->info.args.get_bool("extra-info", &extra_info, false); + shard_id = (unsigned)strict_strtol(shard.c_str(), 10, &err); if (!err.empty()) { dout(5) << "Error parsing shard_id " << shard << dendl; @@ -567,10 +569,14 @@ void RGWOp_DATALog_List::send_response() { s->formatter->dump_bool("truncated", truncated); { s->formatter->open_array_section("entries"); - for (list::iterator iter = entries.begin(); + for (list::iterator iter = entries.begin(); iter != entries.end(); ++iter) { - rgw_data_change& entry = *iter; - encode_json("entry", entry, s->formatter); + rgw_data_change_log_entry& entry = *iter; + if (!extra_info) { + encode_json("entry", entry.entry, s->formatter); + } else { + encode_json("entry", entry, s->formatter); + } flusher.flush(); } s->formatter->close_section(); diff --git a/src/rgw/rgw_rest_log.h b/src/rgw/rgw_rest_log.h index b71b72712f782..ead5d489a737e 100644 --- a/src/rgw/rgw_rest_log.h +++ b/src/rgw/rgw_rest_log.h @@ -188,11 +188,12 @@ public: }; class RGWOp_DATALog_List : public RGWRESTOp { - list entries; + list entries; string last_marker; bool truncated; + bool extra_info; public: - RGWOp_DATALog_List() : truncated(false) {} + RGWOp_DATALog_List() : truncated(false), extra_info(false) {} ~RGWOp_DATALog_List() {} int check_caps(RGWUserCaps& caps) { -- 2.39.5