From: Yehuda Sadeh Date: Wed, 9 Mar 2016 23:41:13 +0000 (-0800) Subject: rgw_admin: show more data sync info X-Git-Tag: v10.1.0~140^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=eea64de61c99ce9be50c53b2aec636f7f1d5483a;p=ceph.git rgw_admin: show more data sync info in radosgw-admin sync status command Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index a5170a5b5a75..3e933a647b85 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1582,9 +1582,12 @@ void flush_ss(stringstream& ss, list& l) ss.str(""); } -stringstream& push_ss(stringstream& ss, list& l) +stringstream& push_ss(stringstream& ss, list& l, int tab = 0) { flush_ss(ss, l); + if (tab > 0) { + ss << setw(tab) << "" << setw(1); + } return ss; } @@ -1724,6 +1727,116 @@ static void get_md_sync_status(list& status) flush_ss(ss, status); } +static void get_data_sync_status(const string& source_zone, list& status, int tab) +{ + RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone); + + stringstream ss; + + int ret = sync.init(); + if (ret < 0) { + push_ss(ss, status, tab) << string("failed to retrieve sync info: ") + cpp_strerror(-ret); + flush_ss(ss, status); + return; + } + + ret = sync.read_sync_status(); + if (ret < 0) { + status.push_back(string("failed to read sync status: ") + cpp_strerror(-ret)); + return; + } + + const rgw_data_sync_status& sync_status = sync.get_sync_status(); + + string status_str; + switch (sync_status.sync_info.state) { + case rgw_data_sync_info::StateInit: + status_str = "init"; + break; + case rgw_data_sync_info::StateBuildingFullSyncMaps: + status_str = "preparing for full sync"; + break; + case rgw_data_sync_info::StateSync: + status_str = "syncing"; + break; + default: + status_str = "unknown"; + } + + push_ss(ss, status, tab) << status_str; + + uint64_t full_total = 0; + uint64_t full_complete = 0; + + int num_full = 0; + int num_inc = 0; + int total_shards = 0; + + for (auto marker_iter : sync_status.sync_markers) { + full_total += marker_iter.second.total_entries; + total_shards++; + if (marker_iter.second.state == rgw_data_sync_marker::SyncState::FullSync) { + num_full++; + full_complete += marker_iter.second.pos; + } else { + full_complete += marker_iter.second.total_entries; + } + if (marker_iter.second.state == rgw_data_sync_marker::SyncState::IncrementalSync) { + num_inc++; + } + } + + push_ss(ss, status, tab) << "full sync: " << num_full << "/" << total_shards << " shards"; + + if (num_full > 0) { + push_ss(ss, status, tab) << "full sync: " << full_total - full_complete << " buckets to sync"; + } + + push_ss(ss, status, tab) << "incremental sync: " << num_inc << "/" << total_shards << " shards"; + + rgw_datalog_info log_info; + ret = sync.read_log_info(&log_info); + if (ret < 0) { + status.push_back(string("failed to fetch local sync status: ") + cpp_strerror(-ret)); + return; + } + + + map source_shards_info; + + ret = sync.read_source_log_shards_info(&source_shards_info); + if (ret < 0) { + status.push_back(string("failed to fetch master sync status: ") + cpp_strerror(-ret)); + return; + } + + map shards_behind; + + for (auto local_iter : sync_status.sync_markers) { + int shard_id = local_iter.first; + auto iter = source_shards_info.find(shard_id); + + if (iter == source_shards_info.end()) { + /* huh? */ + derr << "ERROR: could not find remote sync shard status for shard_id=" << shard_id << dendl; + continue; + } + auto master_marker = iter->second.marker; + if (master_marker > local_iter.second.marker) { + shards_behind[shard_id] = local_iter.second.marker; + } + } + + int total_behind = shards_behind.size() + (sync_status.sync_info.num_shards - num_inc); + if (total_behind == 0) { + status.push_back("data is caught up with master"); + } else { + push_ss(ss, status, tab) << "data is behind on " << total_behind << " shards"; + } + + flush_ss(ss, status); +} + static void tab_dump(const string& header, int width, const list& entries) { string s = header; @@ -1754,6 +1867,23 @@ static void sync_status(Formatter *formatter) } tab_dump("metadata sync", width, md_status); + + list data_status; + + for (auto iter : store->zone_conn_map) { + const string& source_id = iter.first; + string zone_name; + string source_str = "source: "; + string s = source_str + source_id; + auto siter = store->zone_name_by_id.find(source_id); + if (siter != store->zone_name_by_id.end()) { + s += string(" (") + siter->second + ")"; + } + data_status.push_back(s); + get_data_sync_status(source_id, data_status, source_str.size()); + } + + tab_dump("data sync", width, data_status); } int main(int argc, char **argv) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 437cbc656107..e922e629bf8e 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -234,6 +234,33 @@ public: } }; +class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR { + RGWDataSyncEnv *sync_env; + + int num_shards; + map *datalog_info; + + int shard_id; +#define READ_DATALOG_MAX_CONCURRENT 10 + +public: + RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env, + int _num_shards, + map *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT), + sync_env(_sync_env), num_shards(_num_shards), + datalog_info(_datalog_info), shard_id(0) {} + bool spawn_next(); +}; + +bool RGWReadRemoteDataLogInfoCR::spawn_next() { + if (shard_id >= num_shards) { + return false; + } + spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false); + shard_id++; + return true; +} + class RGWInitDataSyncStatusCoroutine : public RGWCoroutine { RGWDataSyncEnv *sync_env; @@ -353,6 +380,17 @@ int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info) return 0; } +int RGWRemoteDataLog::read_source_log_shards_info(map *shards_info) +{ + rgw_datalog_info log_info; + int ret = read_log_info(&log_info); + if (ret < 0) { + return ret; + } + + return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info)); +} + int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger) { if (initialized) { diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 1574a7fecd25..ee5d268c0880 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -3,6 +3,7 @@ #include "rgw_coroutine.h" #include "rgw_http_client.h" +#include "rgw_bucket.h" #include "common/RWLock.h" @@ -191,6 +192,7 @@ public: void finish(); int read_log_info(rgw_datalog_info *log_info); + int read_source_log_shards_info(map *shards_info); int get_shard_info(int shard_id); int read_sync_status(rgw_data_sync_status *sync_status); int init_sync_status(int num_shards); @@ -236,6 +238,13 @@ public: int read_sync_status() { return source_log.read_sync_status(&sync_status); } int init_sync_status() { return source_log.init_sync_status(num_shards); } + int read_log_info(rgw_datalog_info *log_info) { + return source_log.read_log_info(log_info); + } + int read_source_log_shards_info(map *shards_info) { + return source_log.read_source_log_shards_info(shards_info); + } + int run() { return source_log.run_sync(num_shards, sync_status); } void wakeup(int shard_id, set& keys) { return source_log.wakeup(shard_id, keys); } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 4c4137103613..732885b36138 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -3641,6 +3641,7 @@ int RGWRados::init_complete() const string& id = ziter->first; RGWZone& z = ziter->second; zone_id_by_name[z.name] = id; + zone_name_by_id[id] = z.name; if (id != zone_id()) { if (!z.endpoints.empty()) { ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 18f1b917f3e7..f2be37d15353 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1868,6 +1868,7 @@ public: map zonegroup_conn_map; map zone_id_by_name; + map zone_name_by_id; RGWRESTConn *get_zone_conn_by_id(const string& id) { auto citer = zone_conn_map.find(id); diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 65fd89bed1f4..7fafad3f41e9 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -137,40 +137,12 @@ void rgw_mdlog_shard_data::decode_json(JSONObj *obj) { JSONDecoder::decode_json("entries", entries, obj); }; -class RGWShardCollectCR : public RGWCoroutine { - CephContext *cct; - - int cur_shard; - int current_running; - int max_concurrent; - int status; - -public: - RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct), - current_running(0), - max_concurrent(_max_concurrent), - status(0) {} - - virtual bool spawn_next() = 0; +int RGWShardCollectCR::operate() { + reenter(this) { + while (spawn_next()) { + current_running++; - int operate() { - reenter(this) { - while (spawn_next()) { - current_running++; - - while (current_running >= max_concurrent) { - int child_ret; - yield wait_for_child(); - if (collect_next(&child_ret)) { - current_running--; - if (child_ret < 0 && child_ret != -ENOENT) { - ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl; - status = child_ret; - } - } - } - } - while (current_running > 0) { + while (current_running >= max_concurrent) { int child_ret; yield wait_for_child(); if (collect_next(&child_ret)) { @@ -181,15 +153,25 @@ public: } } } - if (status < 0) { - return set_cr_error(status); + } + while (current_running > 0) { + int child_ret; + yield wait_for_child(); + if (collect_next(&child_ret)) { + current_running--; + if (child_ret < 0 && child_ret != -ENOENT) { + ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl; + status = child_ret; + } } - return set_cr_done(); } - return 0; + if (status < 0) { + return set_cr_error(status); + } + return set_cr_done(); } - -}; + return 0; +} class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR { RGWMetaSyncEnv *sync_env; diff --git a/src/rgw/rgw_sync.h b/src/rgw/rgw_sync.h index 43ef906bdc32..1ab130b9a2b6 100644 --- a/src/rgw/rgw_sync.h +++ b/src/rgw/rgw_sync.h @@ -418,6 +418,23 @@ public: int operate(); }; +class RGWShardCollectCR : public RGWCoroutine { + CephContext *cct; + + int cur_shard; + int current_running; + int max_concurrent; + int status; + +public: + RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct), + current_running(0), + max_concurrent(_max_concurrent), + status(0) {} + + virtual bool spawn_next() = 0; + int operate(); +}; #endif