]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync work
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 2 Oct 2015 01:57:02 +0000 (18:57 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:12:56 +0000 (16:12 -0800)
Instead of just keeping map of bucket instances, keep map of all the shards.
Also prepare the CRs that will call into the bucket sync.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc

index f50b25db821a9729bffd4dece17cc7f45fef6f91..d7362b7fc0ffb24b0787a6fced218611b509b97b 100644 (file)
@@ -373,6 +373,29 @@ int RGWRemoteDataLog::set_sync_info(const rgw_data_sync_info& sync_info)
                                 datalog_sync_status_oid, sync_info));
 }
 
+static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
+{
+  char buf[datalog_sync_full_sync_index_prefix.size() + 16];
+  snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
+  return string(buf);
+}
+
+struct bucket_instance_meta_info {
+  string key;
+  obj_version ver;
+  time_t mtime;
+  RGWBucketInstanceMetadataObject data;
+
+  bucket_instance_meta_info() : mtime(0) {}
+
+  void decode_json(JSONObj *obj) {
+    JSONDecoder::decode_json("key", key, obj);
+    JSONDecoder::decode_json("ver", ver, obj);
+    JSONDecoder::decode_json("mtime", mtime, obj);
+    JSONDecoder::decode_json("data", data, obj);
+  }
+};
+
 class RGWListBucketIndexesCR : public RGWCoroutine {
   RGWRados *store;
   RGWHTTPManager *http_manager;
@@ -385,11 +408,18 @@ class RGWListBucketIndexesCR : public RGWCoroutine {
   int req_ret;
 
   list<string> result;
+  list<string>::iterator iter;
 
   RGWShardedOmapCRManager *entries_index;
 
   string oid_prefix;
 
+  string path;
+  bucket_instance_meta_info meta_info;
+  string key;
+  string s;
+  int i;
+
 public:
   RGWListBucketIndexesCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
                          RGWRESTConn *_conn,
@@ -397,8 +427,9 @@ public:
                                                       http_manager(_mgr),
                                                      async_rados(_async_rados),
                                                      conn(_conn), source_zone(_source_zone), num_shards(_num_shards),
-                                                     req_ret(0), entries_index(NULL) {
+                                                     req_ret(0), entries_index(NULL), i(0) {
     oid_prefix = datalog_sync_full_sync_index_prefix + "." + source_zone; 
+    path = "/admin/metadata/bucket.instance";
   }
   ~RGWListBucketIndexesCR() {
     delete entries_index;
@@ -415,17 +446,40 @@ public:
         call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), conn, http_manager,
                                                       entrypoint, NULL, &result));
       }
-      yield {
-        if (get_ret_status() < 0) {
-          ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
-          return set_state(RGWCoroutine_Error);
+      if (get_ret_status() < 0) {
+        ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
+        return set_state(RGWCoroutine_Error);
+      }
+      for (iter = result.begin(); iter != result.end(); ++iter) {
+        ldout(store->ctx(), 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
+
+        key = *iter;
+
+        yield {
+          rgw_http_param_pair pairs[] = { { "key", key.c_str() },
+                                          { NULL, NULL } };
+
+          int ret = call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, http_manager, path, pairs, &meta_info));
+          if (ret < 0) {
+            ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
+            return ret;
+          }
         }
-        for (list<string>::iterator iter = result.begin(); iter != result.end(); ++iter) {
-          ldout(store->ctx(), 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
-          entries_index->append(*iter);
+
+        num_shards = meta_info.data.get_bucket_info().num_shards;
 #warning error handling of shards
+        if (num_shards > 0) {
+          for (i = 0; i < num_shards; i++) {
+            char buf[16];
+            snprintf(buf, sizeof(buf), ":%d", i);
+            s = key + buf;
+            yield entries_index->append(s);
+          }
+        } else {
+          yield entries_index->append(key);
         }
       }
+
       yield entries_index->finish();
       int ret;
       while (collect(&ret)) {
@@ -440,6 +494,306 @@ public:
   }
 };
 
+#define DATA_SYNC_UPDATE_MARKER_WINDOW 10
+
+class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string> {
+  RGWRados *store;
+  RGWAsyncRadosProcessor *async_rados;
+
+  string marker_oid;
+  rgw_data_sync_marker sync_marker;
+
+
+public:
+  RGWDataSyncShardMarkerTrack(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+                         const string& _marker_oid,
+                         const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
+                                                                store(_store),
+                                                                async_rados(_async_rados),
+                                                                marker_oid(_marker_oid),
+                                                                sync_marker(_marker) {}
+
+  RGWCoroutine *store_marker(const string& new_marker) {
+    sync_marker.marker = new_marker;
+
+    ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+    return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+                                marker_oid, sync_marker);
+  }
+};
+
+class RGWRunBucketSyncCoroutine : public RGWCoroutine {
+  RGWHTTPManager *http_manager;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRESTConn *conn;
+  RGWRados *store;
+  RGWObjectCtx& obj_ctx;
+  string source_zone;
+  string bucket_name;
+  string bucket_id;
+  RGWBucketInfo bucket_info;
+  int shard_id;
+  rgw_bucket_shard_sync_info sync_status;
+
+public:
+  RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+                            RGWRESTConn *_conn, RGWRados *_store,
+                            RGWObjectCtx& _obj_ctx, const string& _source_zone,
+                            const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()),
+                                                                            http_manager(_mgr), async_rados(_async_rados), conn(_conn),
+                                                                            store(_store),
+                                                                           obj_ctx(_obj_ctx), source_zone(_source_zone),
+                                                                            bucket_name(_bucket_name),
+                                                                           bucket_id(_bucket_id), shard_id(_shard_id) {}
+
+  int operate();
+};
+
+
+class RGWDataSyncSingleEntryCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager *http_manager;
+  RGWAsyncRadosProcessor *async_rados;
+
+  RGWRESTConn *conn;
+  string source_zone;
+
+  string raw_key;
+  string entry_marker;
+
+  RGWObjectCtx obj_ctx;
+
+  ssize_t pos;
+  string bucket_name;
+  string bucket_instance;
+
+  int sync_status;
+
+  bufferlist md_bl;
+
+  RGWDataSyncShardMarkerTrack *marker_tracker;
+
+public:
+  RGWDataSyncSingleEntryCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+                           RGWRESTConn *_conn, const string& _source_zone,
+                          const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
+                                                      http_manager(_mgr),
+                                                     async_rados(_async_rados),
+                                                      conn(_conn), source_zone(_source_zone),
+                                                     raw_key(_raw_key), entry_marker(_entry_marker),
+                                                      obj_ctx(_store),
+                                                      pos(0), sync_status(0),
+                                                      marker_tracker(_marker_tracker) {
+  }
+
+  int operate() {
+    reenter(this) {
+      yield {
+        pos = raw_key.find(':');
+        bucket_name = raw_key.substr(0, pos);
+        bucket_instance = raw_key.substr(pos + 1);
+        pos = bucket_instance.find(':');
+        int shard_id = -1;
+        if (pos >= 0) {
+          string err;
+          string s = bucket_instance.substr(pos + 1);
+          shard_id = strict_strtol(s.c_str(), 10, &err);
+          if (!err.empty()) {
+            ldout(store->ctx(), 0) << "ERROR: failed to parse bucket instance key: " << bucket_instance << dendl;
+            return set_state(RGWCoroutine_Error, -EIO);
+          }
+
+          bucket_instance = bucket_instance.substr(0, pos - 1);
+        }
+        int ret = call(new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, obj_ctx, source_zone, bucket_name, bucket_instance, shard_id));
+        if (ret < 0) {
+#warning failed syncing bucket, need to log
+          return set_state(RGWCoroutine_Error, sync_status);
+        }
+      }
+
+      sync_status = retcode;
+#warning what do do in case of error
+      yield {
+        /* update marker */
+        int ret = call(marker_tracker->finish(entry_marker));
+        if (ret < 0) {
+          ldout(store->ctx(), 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl;
+          return set_state(RGWCoroutine_Error, sync_status);
+        }
+      }
+      if (sync_status == 0) {
+        sync_status = retcode;
+      }
+      if (sync_status < 0) {
+        return set_state(RGWCoroutine_Error, retcode);
+      }
+      return set_state(RGWCoroutine_Done, 0);
+    }
+    return 0;
+  }
+};
+
+class RGWDataSyncShardCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager *http_manager;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRESTConn *conn;
+
+  rgw_bucket pool;
+
+  string source_zone;
+  uint32_t shard_id;
+  rgw_data_sync_marker sync_marker;
+
+  map<string, bufferlist> entries;
+  map<string, bufferlist>::iterator iter;
+
+  string oid;
+
+  RGWDataSyncShardMarkerTrack *marker_tracker;
+
+  list<cls_log_entry> log_entries;
+  list<cls_log_entry>::iterator log_iter;
+  bool truncated;
+
+  string mdlog_marker;
+  string raw_key;
+
+  Mutex inc_lock;
+  Cond inc_cond;
+
+  boost::asio::coroutine incremental_cr;
+  boost::asio::coroutine full_cr;
+
+
+public:
+  RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+                     RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
+                    uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store),
+                                                      http_manager(_mgr),
+                                                     async_rados(_async_rados),
+                                                      conn(_conn),
+                                                     pool(_pool),
+                                                      source_zone(_source_zone),
+                                                     shard_id(_shard_id),
+                                                     sync_marker(_marker),
+                                                      marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock") {
+  }
+
+  ~RGWDataSyncShardCR() {
+    delete marker_tracker;
+  }
+
+  void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
+    delete marker_tracker;
+    marker_tracker = mt;
+  }
+
+  int operate() {
+    while (true) {
+      switch (sync_marker.state) {
+      case rgw_data_sync_marker::FullSync:
+        return full_sync();
+      case rgw_data_sync_marker::IncrementalSync:
+        return incremental_sync();
+        break;
+      default:
+        return set_state(RGWCoroutine_Error, -EIO);
+      }
+    }
+    return 0;
+  }
+
+  int full_sync() {
+    int ret;
+
+#define OMAP_GET_MAX_ENTRIES 100
+    int max_entries = OMAP_GET_MAX_ENTRIES;
+    reenter(&full_cr) {
+      oid = full_data_sync_index_shard_oid(source_zone, shard_id);
+      set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados,
+                                                         RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id),
+                                                         sync_marker));
+      do {
+        yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
+        if (retcode < 0) {
+          ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
+          return set_state(RGWCoroutine_Error, retcode);
+        }
+        iter = entries.begin();
+        for (; iter != entries.end(); ++iter) {
+          ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
+          marker_tracker->start(iter->first);
+            // fetch remote and write locally
+          yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false);
+          if (retcode < 0) {
+            return set_state(RGWCoroutine_Error, retcode);
+          }
+          sync_marker.marker = iter->first;
+        }
+      } while ((int)entries.size() == max_entries);
+
+      drain_all();
+
+      yield {
+        /* update marker to reflect we're done with full sync */
+        sync_marker.state = rgw_data_sync_marker::IncrementalSync;
+        sync_marker.marker = sync_marker.next_step_marker;
+        sync_marker.next_step_marker.clear();
+        call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+                                                                   RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id), sync_marker));
+      }
+      if (retcode < 0) {
+        ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+        return set_state(RGWCoroutine_Error, retcode);
+      }
+    }
+    return 0;
+  }
+    
+
+  int incremental_sync() {
+#if 0
+    reenter(&incremental_cr) {
+      mdlog_marker = sync_marker.marker;
+      set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados,
+                                                         RGWDataSyncStatusManager::shard_obj_name(shard_id),
+                                                         sync_marker));
+      do {
+#define INCREMENTAL_MAX_ENTRIES 100
+       ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+       if (mdlog_marker <= sync_marker.marker) {
+         /* we're at the tip, try to bring more entries */
+          ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
+         yield call(new RGWCloneMetaLogCoroutine(store, http_manager, shard_id, mdlog_marker, &mdlog_marker));
+       }
+       ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+       if (mdlog_marker > sync_marker.marker) {
+          yield call(new RGWReadMDLogEntriesCR(async_rados, store, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
+          for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
+            ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
+            marker_tracker->start(log_iter->id);
+            raw_key = log_iter->section + ":" + log_iter->name;
+            yield spawn(new RGWMetaSyncSingleEntryCR(store, http_manager, async_rados, raw_key, log_iter->id, marker_tracker), false);
+            if (retcode < 0) {
+              return set_state(RGWCoroutine_Error, retcode);
+          }
+         }
+       }
+       ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+       if (mdlog_marker == sync_marker.marker) {
+#define INCREMENTAL_INTERVAL 20
+         yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
+       }
+      } while (true);
+    }
+    /* TODO */
+    return 0;
+#endif
+  }
+};
+
 int RGWRemoteDataLog::run_sync(int num_shards, rgw_data_sync_status& sync_status)
 {
   RGWObjectCtx obj_ctx(store, NULL);
@@ -538,22 +892,6 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, con
   return 0;
 }
 
-struct bucket_instance_meta_info {
-  string key;
-  obj_version ver;
-  time_t mtime;
-  RGWBucketInstanceMetadataObject data;
-
-  bucket_instance_meta_info() : mtime(0) {}
-
-  void decode_json(JSONObj *obj) {
-    JSONDecoder::decode_json("key", key, obj);
-    JSONDecoder::decode_json("ver", ver, obj);
-    JSONDecoder::decode_json("mtime", mtime, obj);
-    JSONDecoder::decode_json("data", data, obj);
-  }
-};
-
 struct bucket_index_marker_info {
   string bucket_ver;
   string master_ver;
@@ -1365,34 +1703,6 @@ int RGWBucketShardIncrementalSyncCR::operate()
   return 0;
 }
 
-class RGWRunBucketSyncCoroutine : public RGWCoroutine {
-  RGWHTTPManager *http_manager;
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRESTConn *conn;
-  RGWRados *store;
-  RGWObjectCtx& obj_ctx;
-  string source_zone;
-  string bucket_name;
-  string bucket_id;
-  RGWBucketInfo bucket_info;
-  int shard_id;
-  rgw_bucket_shard_sync_info sync_status;
-
-public:
-  RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
-                            RGWRESTConn *_conn, RGWRados *_store,
-                            RGWObjectCtx& _obj_ctx, const string& _source_zone,
-                            const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()),
-                                                                            http_manager(_mgr), async_rados(_async_rados), conn(_conn),
-                                                                            store(_store),
-                                                                           obj_ctx(_obj_ctx), source_zone(_source_zone),
-                                                                            bucket_name(_bucket_name),
-                                                                           bucket_id(_bucket_id), shard_id(_shard_id) {}
-
-  int operate();
-};
-
-
 int RGWRunBucketSyncCoroutine::operate()
 {
   reenter(this) {