]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: read full sync indexes
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 18 Aug 2015 23:41:55 +0000 (16:41 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Feb 2016 21:40:55 +0000 (13:40 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index f2c9d7862dcc333c555d12686b8b41a74ab6c9a8..6be3865924f372cfe353669225a86cf922f0f68a 100644 (file)
@@ -21,6 +21,7 @@
 
 static string mdlog_sync_status_oid = "mdlog.sync-status";
 static string mdlog_sync_status_shard_prefix = "mdlog.sync-status.shard";
+static string mdlog_sync_full_sync_index_prefix = "meta.full-sync.index";
 
 void rgw_mdlog_info::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("num_objects", num_shards, obj);
@@ -547,7 +548,6 @@ public:
 };
 
 class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
-  RGWAsyncRadosProcessor *async_rados;
   RGWRados *store;
   map<string, bufferlist> entries;
 
@@ -557,10 +557,9 @@ class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
   RGWAioCompletionNotifier *cn;
 
 public:
-  RGWRadosSetOmapKeysCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+  RGWRadosSetOmapKeysCR(RGWRados *_store,
                      rgw_bucket& _pool, const string& _oid,
                      map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
-                                                async_rados(_async_rados),
                                                store(_store),
                                                entries(_entries),
                                                pool(_pool), oid(_oid), cn(NULL) {
@@ -594,7 +593,6 @@ public:
 };
 
 class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
-  RGWAsyncRadosProcessor *async_rados;
   RGWRados *store;
 
   string marker;
@@ -602,6 +600,7 @@ class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
   int max_entries;
 
   int rval;
+  librados::IoCtx ioctx;
 
   rgw_bucket pool;
   string oid;
@@ -609,11 +608,10 @@ class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
   RGWAioCompletionNotifier *cn;
 
 public:
-  RGWRadosGetOmapKeysCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+  RGWRadosGetOmapKeysCR(RGWRados *_store,
                      rgw_bucket& _pool, const string& _oid,
                      const string& _marker,
                      map<string, bufferlist> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
-                                                async_rados(_async_rados),
                                                store(_store),
                                                marker(_marker),
                                                entries(_entries), max_entries(_max_entries), rval(0),
@@ -624,7 +622,6 @@ public:
   }
 
   int send_request() {
-    librados::IoCtx ioctx;
     librados::Rados *rados = store->get_rados_handle();
     int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */
     if (r < 0) {
@@ -917,7 +914,7 @@ public:
            }
          }
          if (entries.size() >= OMAP_APPEND_MAX_ENTRIES || going_down) {
-           call(new RGWRadosSetOmapKeysCR(async_rados, store, pool, oid, entries));
+           call(new RGWRadosSetOmapKeysCR(store, pool, oid, entries));
            entries.clear();
          }
        }
@@ -1011,7 +1008,7 @@ public:
 
     reenter(this) {
       entries_index = new RGWShardedOmapCRManager(async_rados, store, this, num_shards,
-                                                 store->get_zone_params().log_pool, "meta.full-sync.index");
+                                                 store->get_zone_params().log_pool, mdlog_sync_full_sync_index_prefix);
       yield {
        call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), conn, http_manager,
                                       "/admin/metadata", NULL, &sections));
@@ -1055,6 +1052,115 @@ public:
   }
 };
 
+static string full_sync_index_shard_oid(int shard_id)
+{
+  char buf[mdlog_sync_full_sync_index_prefix.size() + 16];
+  snprintf(buf, sizeof(buf), "%s.%d", mdlog_sync_full_sync_index_prefix.c_str(), shard_id);
+  return string(buf);
+}
+
+class RGWMetaSyncShardCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager *http_manager;
+  RGWAsyncRadosProcessor *async_rados;
+
+  rgw_bucket pool;
+
+  uint32_t shard_id;
+  rgw_meta_sync_marker sync_marker;
+
+  map<string, bufferlist> entries;
+  map<string, bufferlist>::iterator iter;
+
+  string oid;
+
+public:
+  RGWMetaSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+                    rgw_bucket& _pool,
+                    uint32_t _shard_id, rgw_meta_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store),
+                                                      http_manager(_mgr),
+                                                     async_rados(_async_rados),
+                                                     pool(_pool),
+                                                     shard_id(_shard_id),
+                                                     sync_marker(_marker) {
+  }
+
+  int operate() {
+    RGWRESTConn *conn = store->rest_master_conn;
+
+#define OMAP_GET_MAX_ENTRIES 100
+    int max_entries = OMAP_GET_MAX_ENTRIES;
+    reenter(this) {
+      if (sync_marker.state == rgw_meta_sync_marker::FullSync) {
+        oid = full_sync_index_shard_oid(shard_id);
+        do {
+          yield call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
+          if (retcode < 0) {
+            ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
+            return set_state(RGWCoroutine_Error, retcode);
+          }
+          iter = entries.begin();
+          yield {
+            for (; iter != entries.end(); ++iter) {
+              ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
+              // fetch remote
+              // write local
+              // update shard marker
+            }
+          }
+        } while (entries.size() == max_entries);
+        // update shard state
+        return set_state(RGWCoroutine_Done, 0);
+      } else if (sync_marker.state == rgw_meta_sync_marker::IncrementalSync) {
+        int r = incremental_sync();
+      } else {
+        return set_state(RGWCoroutine_Error, -EIO);
+      }
+    }
+    return 0;
+  }
+
+  int incremental_sync() {
+    /* TODO */
+    return -ENOTSUP;
+  }
+};
+
+class RGWMetaSyncCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager *http_manager;
+  RGWAsyncRadosProcessor *async_rados;
+
+  rgw_meta_sync_status sync_status;
+
+
+public:
+  RGWMetaSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, rgw_meta_sync_status& _sync_status) : RGWCoroutine(_store->ctx()), store(_store),
+                                                      http_manager(_mgr),
+                                                     async_rados(_async_rados),
+                                                     sync_status(_sync_status) {
+  }
+
+  int operate() {
+    RGWRESTConn *conn = store->rest_master_conn;
+
+    reenter(this) {
+      yield {
+       map<uint32_t, rgw_meta_sync_marker>::iterator iter = sync_status.sync_markers.begin();
+       for (; iter != sync_status.sync_markers.end(); ++iter) {
+         uint32_t shard_id = iter->first;
+         rgw_meta_sync_marker marker;
+          spawn(new RGWMetaSyncShardCR(store, http_manager, async_rados, store->get_zone_params().log_pool,
+                                      shard_id,
+                                      sync_status.sync_markers[shard_id]), true);
+        }
+      }
+      yield return set_state(RGWCoroutine_Done);
+    }
+    return 0;
+  }
+};
+
 class RGWCloneMetaLogCoroutine : public RGWCoroutine {
   RGWRados *store;
   RGWMetadataLog *mdlog;
@@ -1165,6 +1271,11 @@ int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status
       /* fall through */
     case rgw_meta_sync_info::StateSync:
       ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl;
+      r = run(new RGWMetaSyncCR(store, &http_manager, async_rados, sync_status));
+      if (r < 0) {
+        ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
+        return r;
+      }
       break;
     default:
       ldout(store->ctx(), 0) << "ERROR: bad sync state!" << dendl;
index 1de14ef6e90effd6aed90a195ac4e5c2d6b2b40a..147eb51f07d0b0960734258e962b035621a69ed0 100644 (file)
@@ -63,10 +63,14 @@ struct rgw_meta_sync_info {
 WRITE_CLASS_ENCODER(rgw_meta_sync_info)
 
 struct rgw_meta_sync_marker {
+  enum SyncState {
+    FullSync = 0,
+    IncrementalSync = 1,
+  };
   uint16_t state;
   string marker;
 
-  rgw_meta_sync_marker() : state(0) {}
+  rgw_meta_sync_marker() : state(FullSync) {}
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);