]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw_admin: report oldest data not synced
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 10 Mar 2016 00:01:51 +0000 (16:01 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 10 Mar 2016 01:02:33 +0000 (17:02 -0800)
in radosgw-admin sync status

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index 3e933a647b852e4b3af81cf65b12933034b5d8ae..bce9f5859afd394a5bb727419d56a323deeaea13 100644 (file)
@@ -1832,6 +1832,30 @@ static void get_data_sync_status(const string& source_zone, list<string>& status
     status.push_back("data is caught up with master");
   } else {
     push_ss(ss, status, tab) << "data is behind on " << total_behind << " shards";
+
+    map<int, rgw_datalog_shard_data> master_pos;
+    ret = sync.read_source_log_shards_next(shards_behind, &master_pos);
+    if (ret < 0) {
+      derr << "ERROR: failed to fetch next positions (" << cpp_strerror(-ret) << ")" << dendl;
+    } else {
+      utime_t oldest;
+      for (auto iter : master_pos) {
+        rgw_datalog_shard_data& shard_data = iter.second;
+
+        if (!shard_data.entries.empty()) {
+          rgw_datalog_entry& entry = shard_data.entries.front();
+          if (oldest.is_zero()) {
+            oldest = entry.timestamp;
+          } else if (!entry.timestamp.is_zero() && entry.timestamp < oldest) {
+            oldest = entry.timestamp;
+          }
+        }
+      }
+
+      if (!oldest.is_zero()) {
+        push_ss(ss, status, tab) << "oldest change not applied: " << oldest;
+      }
+    }
   }
 
   flush_ss(ss, status);
index e922e629bf8ea7355977ecc36360b226c78a087c..314cd682c99c5da61ccb5f5e094f4d3f0dd0a264 100644 (file)
@@ -30,21 +30,6 @@ void rgw_datalog_info::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("num_objects", num_shards, obj);
 }
 
-struct rgw_datalog_entry {
-  string key;
-  utime_t timestamp;
-
-  void decode_json(JSONObj *obj);
-};
-
-struct rgw_datalog_shard_data {
-  string marker;
-  bool truncated;
-  vector<rgw_datalog_entry> entries;
-
-  void decode_json(JSONObj *obj);
-};
-
 
 void rgw_datalog_entry::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("key", key, obj);
@@ -261,6 +246,99 @@ bool RGWReadRemoteDataLogInfoCR::spawn_next() {
   return true;
 }
 
+class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
+  RGWDataSyncEnv *sync_env;
+  RGWRESTReadResource *http_op;
+
+  int shard_id;
+  string marker;
+  uint32_t max_entries;
+  rgw_datalog_shard_data *result;
+
+public:
+  RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
+                              const string& _marker, uint32_t _max_entries,
+                              rgw_datalog_shard_data *_result)
+    : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
+      shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
+
+  int send_request() {
+    RGWRESTConn *conn = sync_env->conn;
+    RGWRados *store = sync_env->store;
+
+    char buf[32];
+    snprintf(buf, sizeof(buf), "%d", shard_id);
+
+    char max_entries_buf[32];
+    snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
+
+    const char *marker_key = (marker.empty() ? "" : "marker");
+
+    rgw_http_param_pair pairs[] = { { "type", "data" },
+      { "id", buf },
+      { "max-entries", max_entries_buf },
+      { marker_key, marker.c_str() },
+      { NULL, NULL } };
+
+    string p = "/admin/log/";
+
+    http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
+    http_op->set_user_info((void *)stack);
+
+    int ret = http_op->aio_read();
+    if (ret < 0) {
+      ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
+      log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
+      http_op->put();
+      return ret;
+    }
+
+    return 0;
+  }
+
+  int request_complete() {
+    int ret = http_op->wait(result);
+    if (ret < 0 && ret != -ENOENT) {
+      ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
+      return ret;
+    }
+    return 0;
+  }
+};
+
+class RGWListRemoteDataLogCR : public RGWShardCollectCR {
+  RGWDataSyncEnv *sync_env;
+
+  map<int, string> shards;
+  int max_entries_per_shard;
+  map<int, rgw_datalog_shard_data> *result;
+
+  map<int, string>::iterator iter;
+#define READ_DATALOG_MAX_CONCURRENT 10
+
+public:
+  RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
+                     map<int, string>& _shards,
+                     int _max_entries_per_shard,
+                     map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
+                                                                 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
+                                                                 result(_result) {
+    shards.swap(_shards);
+    iter = shards.begin();
+  }
+  bool spawn_next();
+};
+
+bool RGWListRemoteDataLogCR::spawn_next() {
+  if (iter == shards.end()) {
+    return false;
+  }
+
+  spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
+  ++iter;
+  return true;
+}
+
 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
 
@@ -391,6 +469,15 @@ int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo
   return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
 }
 
+int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
+{
+  if (store->is_meta_master()) {
+    return 0;
+  }
+
+  return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
+}
+
 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger)
 {
   if (initialized) {
index ee5d268c08809b71717099a1f3f8b0def9859305..ac3abfb9454f53efdd8a28d0040e48b42e78948a 100644 (file)
@@ -138,6 +138,21 @@ struct rgw_data_sync_status {
 };
 WRITE_CLASS_ENCODER(rgw_data_sync_status)
 
+struct rgw_datalog_entry {
+  string key;
+  utime_t timestamp;
+
+  void decode_json(JSONObj *obj);
+};
+
+struct rgw_datalog_shard_data {
+  string marker;
+  bool truncated;
+  vector<rgw_datalog_entry> entries;
+
+  void decode_json(JSONObj *obj);
+};
+
 class RGWAsyncRadosProcessor;
 class RGWDataSyncStatusManager;
 class RGWDataSyncControlCR;
@@ -193,6 +208,7 @@ public:
 
   int read_log_info(rgw_datalog_info *log_info);
   int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
+  int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
   int get_shard_info(int shard_id);
   int read_sync_status(rgw_data_sync_status *sync_status);
   int init_sync_status(int num_shards);
@@ -244,6 +260,9 @@ public:
   int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
     return source_log.read_source_log_shards_info(shards_info);
   }
+  int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) {
+    return source_log.read_source_log_shards_next(shard_markers, result);
+  }
 
   int run() { return source_log.run_sync(num_shards, sync_status); }