]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_admin: show more data sync info
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 9 Mar 2016 23:41:13 +0000 (15:41 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 10 Mar 2016 01:02:33 +0000 (17:02 -0800)
in radosgw-admin sync status command

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index a5170a5b5a751458329142b7811bafb436683d8d..3e933a647b852e4b3af81cf65b12933034b5d8ae 100644 (file)
@@ -1582,9 +1582,12 @@ void flush_ss(stringstream& ss, list<string>& l)
   ss.str("");
 }
 
-stringstream& push_ss(stringstream& ss, list<string>& l)
+stringstream& push_ss(stringstream& ss, list<string>& l, int tab = 0)
 {
   flush_ss(ss, l);
+  if (tab > 0) {
+    ss << setw(tab) << "" << setw(1);
+  }
   return ss;
 }
 
@@ -1724,6 +1727,116 @@ static void get_md_sync_status(list<string>& status)
   flush_ss(ss, status);
 }
 
+static void get_data_sync_status(const string& source_zone, list<string>& status, int tab)
+{
+  RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone);
+
+  stringstream ss;
+
+  int ret = sync.init();
+  if (ret < 0) {
+    push_ss(ss, status, tab) << string("failed to retrieve sync info: ") + cpp_strerror(-ret);
+    flush_ss(ss, status);
+    return;
+  }
+
+  ret = sync.read_sync_status();
+  if (ret < 0) {
+    status.push_back(string("failed to read sync status: ") + cpp_strerror(-ret));
+    return;
+  }
+
+  const rgw_data_sync_status& sync_status = sync.get_sync_status();
+
+  string status_str;
+  switch (sync_status.sync_info.state) {
+    case rgw_data_sync_info::StateInit:
+      status_str = "init";
+      break;
+    case rgw_data_sync_info::StateBuildingFullSyncMaps:
+      status_str = "preparing for full sync";
+      break;
+    case rgw_data_sync_info::StateSync:
+      status_str = "syncing";
+      break;
+    default:
+      status_str = "unknown";
+  }
+
+  push_ss(ss, status, tab) << status_str;
+  
+  uint64_t full_total = 0;
+  uint64_t full_complete = 0;
+
+  int num_full = 0;
+  int num_inc = 0;
+  int total_shards = 0;
+
+  for (auto marker_iter : sync_status.sync_markers) {
+    full_total += marker_iter.second.total_entries;
+    total_shards++;
+    if (marker_iter.second.state == rgw_data_sync_marker::SyncState::FullSync) {
+      num_full++;
+      full_complete += marker_iter.second.pos;
+    } else {
+      full_complete += marker_iter.second.total_entries;
+    }
+    if (marker_iter.second.state == rgw_data_sync_marker::SyncState::IncrementalSync) {
+      num_inc++;
+    }
+  }
+
+  push_ss(ss, status, tab) << "full sync: " << num_full << "/" << total_shards << " shards";
+
+  if (num_full > 0) {
+    push_ss(ss, status, tab) << "full sync: " << full_total - full_complete << " buckets to sync";
+  }
+
+  push_ss(ss, status, tab) << "incremental sync: " << num_inc << "/" << total_shards << " shards";
+
+  rgw_datalog_info log_info;
+  ret = sync.read_log_info(&log_info);
+  if (ret < 0) {
+    status.push_back(string("failed to fetch local sync status: ") + cpp_strerror(-ret));
+    return;
+  }
+
+
+  map<int, RGWDataChangesLogInfo> source_shards_info;
+
+  ret = sync.read_source_log_shards_info(&source_shards_info);
+  if (ret < 0) {
+    status.push_back(string("failed to fetch master sync status: ") + cpp_strerror(-ret));
+    return;
+  }
+
+  map<int, string> shards_behind;
+
+  for (auto local_iter : sync_status.sync_markers) {
+    int shard_id = local_iter.first;
+    auto iter = source_shards_info.find(shard_id);
+
+    if (iter == source_shards_info.end()) {
+      /* huh? */
+      derr << "ERROR: could not find remote sync shard status for shard_id=" << shard_id << dendl;
+      continue;
+    }
+    auto master_marker = iter->second.marker;
+    if (master_marker > local_iter.second.marker) {
+      shards_behind[shard_id] = local_iter.second.marker;
+    }
+  }
+
+  int total_behind = shards_behind.size() + (sync_status.sync_info.num_shards - num_inc);
+  if (total_behind == 0) {
+    status.push_back("data is caught up with master");
+  } else {
+    push_ss(ss, status, tab) << "data is behind on " << total_behind << " shards";
+  }
+
+  flush_ss(ss, status);
+}
+
 static void tab_dump(const string& header, int width, const list<string>& entries)
 {
   string s = header;
@@ -1754,6 +1867,23 @@ static void sync_status(Formatter *formatter)
   }
 
   tab_dump("metadata sync", width, md_status);
+
+  list<string> data_status;
+
+  for (auto iter : store->zone_conn_map) {
+    const string& source_id = iter.first;
+    string zone_name;
+    string source_str = "source: ";
+    string s = source_str + source_id;
+    auto siter = store->zone_name_by_id.find(source_id);
+    if (siter != store->zone_name_by_id.end()) {
+      s += string(" (") + siter->second + ")";
+    }
+    data_status.push_back(s);
+    get_data_sync_status(source_id, data_status, source_str.size());
+  }
+
+  tab_dump("data sync", width, data_status);
 }
 
 int main(int argc, char **argv) 
index 437cbc656107c2269d79b506bda6b2813e2f104c..e922e629bf8ea7355977ecc36360b226c78a087c 100644 (file)
@@ -234,6 +234,33 @@ public:
   }
 };
 
+class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
+  RGWDataSyncEnv *sync_env;
+
+  int num_shards;
+  map<int, RGWDataChangesLogInfo> *datalog_info;
+
+  int shard_id;
+#define READ_DATALOG_MAX_CONCURRENT 10
+
+public:
+  RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
+                     int _num_shards,
+                     map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
+                                                                 sync_env(_sync_env), num_shards(_num_shards),
+                                                                 datalog_info(_datalog_info), shard_id(0) {}
+  bool spawn_next();
+};
+
+bool RGWReadRemoteDataLogInfoCR::spawn_next() {
+  if (shard_id >= num_shards) {
+    return false;
+  }
+  spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
+  shard_id++;
+  return true;
+}
+
 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
 
@@ -353,6 +380,17 @@ int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
   return 0;
 }
 
+int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
+{
+  rgw_datalog_info log_info;
+  int ret = read_log_info(&log_info);
+  if (ret < 0) {
+    return ret;
+  }
+
+  return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
+}
+
 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger)
 {
   if (initialized) {
index 1574a7fecd25815c6c5efcfb9e9f140cb222f27d..ee5d268c08809b71717099a1f3f8b0def9859305 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "rgw_coroutine.h"
 #include "rgw_http_client.h"
+#include "rgw_bucket.h"
 
 #include "common/RWLock.h"
 
@@ -191,6 +192,7 @@ public:
   void finish();
 
   int read_log_info(rgw_datalog_info *log_info);
+  int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
   int get_shard_info(int shard_id);
   int read_sync_status(rgw_data_sync_status *sync_status);
   int init_sync_status(int num_shards);
@@ -236,6 +238,13 @@ public:
   int read_sync_status() { return source_log.read_sync_status(&sync_status); }
   int init_sync_status() { return source_log.init_sync_status(num_shards); }
 
+  int read_log_info(rgw_datalog_info *log_info) {
+    return source_log.read_log_info(log_info);
+  }
+  int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
+    return source_log.read_source_log_shards_info(shards_info);
+  }
+
   int run() { return source_log.run_sync(num_shards, sync_status); }
 
   void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }
index 4c4137103613ad31b922106336c6ad1ca6b06f89..732885b36138d38512c488b8fb7abddeee60ac3e 100644 (file)
@@ -3641,6 +3641,7 @@ int RGWRados::init_complete()
     const string& id = ziter->first;
     RGWZone& z = ziter->second;
     zone_id_by_name[z.name] = id;
+    zone_name_by_id[id] = z.name;
     if (id != zone_id()) {
       if (!z.endpoints.empty()) {
         ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl;
index 18f1b917f3e7264f1855b55cb09ab682663e5040..f2be37d153539a94b6f87dbba0b1c788be878bba 100644 (file)
@@ -1868,6 +1868,7 @@ public:
   map<string, RGWRESTConn *> zonegroup_conn_map;
 
   map<string, string> zone_id_by_name;
+  map<string, string> zone_name_by_id;
 
   RGWRESTConn *get_zone_conn_by_id(const string& id) {
     auto citer = zone_conn_map.find(id);
index 65fd89bed1f496c30800a57f058de9538b86cb59..7fafad3f41e91d5b0c84c3a144b38e313dac82d0 100644 (file)
@@ -137,40 +137,12 @@ void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("entries", entries, obj);
 };
 
-class RGWShardCollectCR : public RGWCoroutine {
-  CephContext *cct;
-
-  int cur_shard;
-  int current_running;
-  int max_concurrent;
-  int status;
-
-public:
-  RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
-                                                             current_running(0),
-                                                             max_concurrent(_max_concurrent),
-                                                             status(0) {}
-
-  virtual bool spawn_next() = 0;
+int RGWShardCollectCR::operate() {
+  reenter(this) {
+    while (spawn_next()) {
+      current_running++;
 
-  int operate() {
-    reenter(this) {
-      while (spawn_next()) {
-        current_running++;
-
-        while (current_running >= max_concurrent) {
-          int child_ret;
-          yield wait_for_child();
-          if (collect_next(&child_ret)) {
-            current_running--;
-            if (child_ret < 0 && child_ret != -ENOENT) {
-              ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
-              status = child_ret;
-            }
-          }
-        }
-      }
-      while (current_running > 0) {
+      while (current_running >= max_concurrent) {
         int child_ret;
         yield wait_for_child();
         if (collect_next(&child_ret)) {
@@ -181,15 +153,25 @@ public:
           }
         }
       }
-      if (status < 0) {
-        return set_cr_error(status);
+    }
+    while (current_running > 0) {
+      int child_ret;
+      yield wait_for_child();
+      if (collect_next(&child_ret)) {
+        current_running--;
+        if (child_ret < 0 && child_ret != -ENOENT) {
+          ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
+          status = child_ret;
+        }
       }
-      return set_cr_done();
     }
-    return 0;
+    if (status < 0) {
+      return set_cr_error(status);
+    }
+    return set_cr_done();
   }
-
-};
+  return 0;
+}
 
 class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
   RGWMetaSyncEnv *sync_env;
index 43ef906bdc32b2dd8587fdefb14da34f74584e69..1ab130b9a2b629cb18cb1ac7621cecd614be218c 100644 (file)
@@ -418,6 +418,23 @@ public:
   int operate();
 };
 
+class RGWShardCollectCR : public RGWCoroutine {
+  CephContext *cct;
+
+  int cur_shard;
+  int current_running;
+  int max_concurrent;
+  int status;
+
+public:
+  RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
+                                                             current_running(0),
+                                                             max_concurrent(_max_concurrent),
+                                                             status(0) {}
+
+  virtual bool spawn_next() = 0;
+  int operate();
+};
 
 
 #endif