]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: paginated mdlog clone
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 7 Jul 2015 23:21:47 +0000 (16:21 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Feb 2016 20:52:07 +0000 (12:52 -0800)
still need to iterate by going through over older entries first

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync.cc

index 7bca46a234e478e9319400487bf39bd11c7278b2..5635d8a7ae04df763b8a96eba93068cceec7d6e2 100644 (file)
@@ -1,4 +1,5 @@
 #include "common/ceph_json.h"
+#include "common/RWLock.h"
 
 #include "rgw_common.h"
 #include "rgw_rados.h"
@@ -60,6 +61,13 @@ int RGWRemoteMetaLog::init()
 
   ldout(store->ctx(), 20) << "remote mdlog, num_shards=" << log_info.num_shards << dendl;
 
+  RWLock::WLocker wl(ts_to_shard_lock);
+  for (int i = 0; i < (int)log_info.num_shards; i++) {
+    clone_markers.push_back(string());
+    utime_shard ut;
+    ut.shard_id = i;
+    ts_to_shard[ut] = i;
+  }
   return 0;
 }
 
@@ -104,27 +112,84 @@ int RGWRemoteMetaLog::list_shard(int shard_id)
   return 0;
 }
 
+int RGWRemoteMetaLog::get_shard_info(int shard_id)
+{
+  conn = store->rest_master_conn;
+
+  char buf[32];
+  snprintf(buf, sizeof(buf), "%d", shard_id);
+
+  rgw_http_param_pair pairs[] = { { "type", "metadata" },
+                                  { "id", buf },
+                                  { "info", NULL },
+                                  { NULL, NULL } };
+
+  RGWMetadataLogInfo info;
+  int ret = conn->get_json_resource("/admin/log", pairs, info);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog info" << dendl;
+    return ret;
+  }
+
+  ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " marker=" << info.marker << dendl;
+
+  return 0;
+}
+
 int RGWRemoteMetaLog::clone_shards()
 {
+  bool truncated;
+
+  list<int> active_shards;
   for (int i = 0; i < (int)log_info.num_shards; i++) {
-    int ret = clone_shard(i);
-    if (ret < 0) {
-      ldout(store->ctx(), 10) << "failed to clone shard: ret=" << ret << dendl;
-    }
+    active_shards.push_back(i);
   }
+  do {
+    truncated = false;
+
+    list<int> next_active_shards;
+
+    for (list<int>::iterator iter = active_shards.begin(); iter != active_shards.end(); ++iter) {
+      int shard_id = *iter;
+      bool shard_truncated;
+      string& shard_marker = clone_markers[shard_id];
+      int ret = clone_shard(shard_id, shard_marker, &shard_marker, &shard_truncated);
+      if (ret < 0) {
+        ldout(store->ctx(), 0) << "ERROR: failed to clone shard: ret=" << ret << dendl;
+      }
+      truncated |= shard_truncated;
+
+      if (shard_truncated) {
+       next_active_shards.push_back(shard_id);
+      }
+    }
+    active_shards.swap(next_active_shards);
+  } while (truncated);
 
   return 0;
 }
 
-int RGWRemoteMetaLog::clone_shard(int shard_id)
+int RGWRemoteMetaLog::clone_shard(int shard_id, const string& marker, string *new_marker, bool *truncated)
 {
+  *truncated = false;
+
   conn = store->rest_master_conn;
 
   char buf[32];
   snprintf(buf, sizeof(buf), "%d", shard_id);
 
+#define CLONE_MAX_ENTRIES 100
+  int max_entries = CLONE_MAX_ENTRIES;
+
+  char max_entries_buf[32];
+  snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
+
+  const char *marker_key = (marker.empty() ? "" : "marker");
+
   rgw_http_param_pair pairs[] = { { "type", "metadata" },
                                   { "id", buf },
+                                  { "max-entries", max_entries_buf },
+                                  { marker_key, marker.c_str() },
                                   { NULL, NULL } };
 
   rgw_mdlog_shard_data data;
@@ -136,6 +201,8 @@ int RGWRemoteMetaLog::clone_shard(int shard_id)
 
   ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
 
+  *truncated = ((int)data.entries.size() == max_entries);
+
   if (data.entries.empty()) {
     return 0;
   }
@@ -156,6 +223,8 @@ int RGWRemoteMetaLog::clone_shard(int shard_id)
     ::encode(entry.log_data, dest_entry.data);
 
     dest_entries.push_back(dest_entry);
+
+    *new_marker = entry.id;
   }
 
   ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id);