]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: RGWMetaSyncCR loops through period history
authorCasey Bodley <cbodley@redhat.com>
Thu, 18 Feb 2016 16:31:45 +0000 (11:31 -0500)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 18 Feb 2016 22:04:21 +0000 (14:04 -0800)
RGWMetaSyncCR uses a period history Cursor to track its position. it
uses this to get the max sync markers for each shard from the following
period, so that RGWMetaSyncShardCR knows when to stop syncing and return
control to RGWMetaSyncCR

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_sync.cc

index 4a230d09a0000bb8ef2f717b37e4c58154471032..f17e1e3ab00153762534f9d92a1262b23ba65c16 100644 (file)
@@ -1,3 +1,6 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
 #include "common/ceph_json.h"
 #include "common/RWLock.h"
 #include "common/RefCountedObj.h"
@@ -309,14 +312,17 @@ protected:
   }
 public:
   RGWAsyncReadMDLogEntries(RGWAioCompletionNotifier *cn, RGWRados *_store,
-                          int _shard_id, string* _marker, int _max_entries,
-                          list<cls_log_entry> *_entries, bool *_truncated) : RGWAsyncRadosRequest(cn), store(_store), mdlog(store->meta_mgr->get_log()),
-                                                                   shard_id(_shard_id), marker(_marker), max_entries(_max_entries),
-                                                                  entries(_entries), truncated(_truncated) {}
+                           RGWMetadataLog* mdlog, int _shard_id,
+                           string* _marker, int _max_entries,
+                           list<cls_log_entry> *_entries, bool *_truncated)
+    : RGWAsyncRadosRequest(cn), store(_store), mdlog(mdlog),
+      shard_id(_shard_id), marker(_marker), max_entries(_max_entries),
+      entries(_entries), truncated(_truncated) {}
 };
 
 class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
   RGWMetaSyncEnv *sync_env;
+  RGWMetadataLog *const mdlog;
   int shard_id;
   string marker;
   string *pmarker;
@@ -327,12 +333,12 @@ class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
   RGWAsyncReadMDLogEntries *req;
 
 public:
-  RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env,
-                      int _shard_id, string*_marker, int _max_entries,
-                      list<cls_log_entry> *_entries, bool *_truncated) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                                shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
-                                               entries(_entries), truncated(_truncated) {
-                                               }
+  RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
+                        int _shard_id, string*_marker, int _max_entries,
+                        list<cls_log_entry> *_entries, bool *_truncated)
+    : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
+      shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
+      entries(_entries), truncated(_truncated) {}
 
   ~RGWReadMDLogEntriesCR() {
     if (req) {
@@ -343,7 +349,8 @@ public:
   int send_request() {
     marker = *pmarker;
     req = new RGWAsyncReadMDLogEntries(stack->create_completion_notifier(),
-                                  sync_env->store, shard_id, &marker, max_entries, entries, truncated);
+                                       sync_env->store, mdlog, shard_id, &marker,
+                                       max_entries, entries, truncated);
     sync_env->async_rados->queue(req);
     return 0;
   }
@@ -419,14 +426,15 @@ class RGWInitSyncStatusCoroutine : public RGWCoroutine {
   RGWObjectCtx& obj_ctx;
 
   rgw_meta_sync_info status;
-  map<int, RGWMetadataLogInfo> shards_info;
+  vector<RGWMetadataLogInfo> shards_info;
   RGWContinuousLeaseCR *lease_cr;
 public:
   RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
                              RGWObjectCtx& _obj_ctx,
                              const rgw_meta_sync_info &status)
     : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
-      obj_ctx(_obj_ctx), status(status), lease_cr(NULL) {}
+      obj_ctx(_obj_ctx), status(status), shards_info(status.num_shards),
+      lease_cr(NULL) {}
 
   ~RGWInitSyncStatusCoroutine() {
     if (lease_cr) {
@@ -474,8 +482,8 @@ public:
       set_status("fetching remote log position");
       yield {
         for (int i = 0; i < (int)status.num_shards; i++) {
-          spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period,
-                                                  i, &shards_info[i]), false);
+          spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
+                                                  &shards_info[i]), false);
        }
       }
 
@@ -1011,24 +1019,23 @@ class RGWCloneMetaLogCoroutine : public RGWCoroutine {
   const std::string& period;
   int shard_id;
   string marker;
-  bool truncated;
+  bool truncated = false;
   string *new_marker;
 
-  int max_entries;
+  int max_entries = CLONE_MAX_ENTRIES;
 
-  RGWRESTReadResource *http_op;
+  RGWRESTReadResource *http_op = nullptr;
 
-  int req_ret;
+  int req_ret = 0;
   RGWMetadataLogInfo shard_info;
   rgw_mdlog_shard_data data;
 
 public:
-  RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, const std::string& period,
-                           int _id, const string& _marker, string *_new_marker)
-    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-      mdlog(sync_env->store->meta_mgr->get_log()), period(period),
-      shard_id(_id), marker(_marker), truncated(false), new_marker(_new_marker),
-      max_entries(CLONE_MAX_ENTRIES), http_op(NULL), req_ret(0) {
+  RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
+                           const std::string& period, int _id,
+                           const string& _marker, string *_new_marker)
+    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
+      period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
     if (new_marker) {
       *new_marker = marker;
     }
@@ -1053,24 +1060,25 @@ public:
 class RGWMetaSyncShardCR : public RGWCoroutine {
   RGWMetaSyncEnv *sync_env;
 
-  rgw_bucket pool;
-
-  const std::string& period;
+  const rgw_bucket& pool;
+  const std::string& period; //< currently syncing period id
+  RGWMetadataLog* mdlog; //< log of syncing period
   uint32_t shard_id;
-  rgw_meta_sync_marker sync_marker;
+  rgw_meta_sync_marker& sync_marker;
   string marker;
   string max_marker;
+  const std::string& period_marker; //< max marker stored in next period
 
   map<string, bufferlist> entries;
   map<string, bufferlist>::iterator iter;
 
   string oid;
 
-  RGWMetaSyncShardMarkerTrack *marker_tracker;
+  RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
 
   list<cls_log_entry> log_entries;
   list<cls_log_entry>::iterator log_iter;
-  bool truncated;
+  bool truncated = false;
 
   string mdlog_marker;
   string raw_key;
@@ -1082,30 +1090,28 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
   boost::asio::coroutine incremental_cr;
   boost::asio::coroutine full_cr;
 
-  RGWContinuousLeaseCR *lease_cr;
-  bool lost_lock;
+  RGWContinuousLeaseCR *lease_cr = nullptr;
+  bool lost_lock = false;
 
   bool *reset_backoff;
 
   map<RGWCoroutinesStack *, string> stack_to_pos;
   map<string, string> pos_to_prev;
 
-  bool can_adjust_marker;
+  bool can_adjust_marker = false;
+  bool done_with_period = false;
 
-  int total_entries;
+  int total_entries = 0;
 
 public:
-  RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env,
-                    rgw_bucket& _pool, const std::string& period,
-                    uint32_t _shard_id, rgw_meta_sync_marker& _marker,
-                     bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                                     pool(_pool),
-                  period(period),
-                                                     shard_id(_shard_id),
-                                                     sync_marker(_marker),
-                                                      marker_tracker(NULL), truncated(false), inc_lock("RGWMetaSyncShardCR::inc_lock"),
-                                                      lease_cr(NULL), lost_lock(false), reset_backoff(_reset_backoff), can_adjust_marker(false),
-                                                      total_entries(0) {
+  RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_bucket& _pool,
+                     const std::string& period, RGWMetadataLog* mdlog,
+                     uint32_t _shard_id, rgw_meta_sync_marker& _marker,
+                     const std::string& period_marker, bool *_reset_backoff)
+    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
+      period(period), mdlog(mdlog), shard_id(_shard_id), sync_marker(_marker),
+      period_marker(period_marker), inc_lock("RGWMetaSyncShardCR::inc_lock"),
+      reset_backoff(_reset_backoff) {
     *reset_backoff = false;
   }
 
@@ -1211,7 +1217,7 @@ public:
           lease_cr->put();
         }
         RGWRados *store = sync_env->store;
-       lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
+       lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, pool,
                                             sync_env->shard_obj_name(shard_id),
                                             lock_name, lock_duration, this);
         lease_cr->get();
@@ -1287,9 +1293,10 @@ public:
             sync_marker.marker = sync_marker.next_step_marker;
             sync_marker.next_step_marker.clear();
           }
+          // XXX: why write the marker if !can_adjust_marker?
           RGWRados *store = sync_env->store;
           ldout(sync_env->cct, 0) << *this << ": saving marker pos=" << sync_marker.marker << dendl;
-          call(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
+          call(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, store, pool,
                                                                sync_env->shard_obj_name(shard_id), sync_marker));
         }
         if (retcode < 0) {
@@ -1332,7 +1339,7 @@ public:
           uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
           string lock_name = "sync_lock";
           RGWRados *store = sync_env->store;
-          lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
+          lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, pool,
                                               sync_env->shard_obj_name(shard_id),
                                               lock_name, lock_duration, this);
           lease_cr->get();
@@ -1360,6 +1367,7 @@ public:
        * sync_marker: the local sync marker position
        * max_marker: the max mdlog position that we fetched
        * marker: the current position we try to sync
+       * period_marker: the last marker before the next period begins (optional)
        */
       marker = max_marker = sync_marker.marker;
       /* inc sync */
@@ -1369,11 +1377,16 @@ public:
           break;
         }
 #define INCREMENTAL_MAX_ENTRIES 100
-       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
+        if (!period_marker.empty() && period_marker <= marker) {
+          done_with_period = true;
+          break;
+        }
        if (mdlog_marker <= max_marker) {
          /* we're at the tip, try to bring more entries */
           ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
-          yield call(new RGWCloneMetaLogCoroutine(sync_env, period, shard_id,
+          yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
+                                                  period, shard_id,
                                                   mdlog_marker, &mdlog_marker));
        }
         if (retcode < 0) {
@@ -1385,8 +1398,14 @@ public:
        ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
        if (mdlog_marker > max_marker) {
           marker = max_marker;
-          yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
+          yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
+                                               &max_marker, INCREMENTAL_MAX_ENTRIES,
+                                               &log_entries, &truncated));
           for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
+            if (!period_marker.empty() && period_marker < log_iter->id) {
+              done_with_period = true;
+              break;
+            }
             if (!mdlog_entry.convert_from(*log_iter)) {
               ldout(sync_env->cct, 0) << __func__ << ":" << __LINE__ << ": ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry" << dendl;
               continue;
@@ -1409,7 +1428,11 @@ public:
           }
         }
         collect_children();
-       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
+        if (done_with_period) {
+          // return control to RGWMetaSyncCR and advance to the next period
+          break;
+        }
        if (mdlog_marker == max_marker && can_adjust_marker) {
 #define INCREMENTAL_INTERVAL 20
          yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
@@ -1442,66 +1465,122 @@ class RGWMetaSyncShardControlCR : public RGWBackoffControlCR
 {
   RGWMetaSyncEnv *sync_env;
 
-  rgw_bucket pool;
-
+  const rgw_bucket& pool;
   const std::string& period;
+  RGWMetadataLog* mdlog;
   uint32_t shard_id;
   rgw_meta_sync_marker sync_marker;
+  const std::string period_marker;
 
   RGWObjectCtx obj_ctx;
 
 public:
-  RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env,
-                            rgw_bucket& _pool, const std::string& period,
-                            uint32_t _shard_id, rgw_meta_sync_marker& _marker)
+  RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_bucket& _pool,
+                            const std::string& period, RGWMetadataLog* mdlog,
+                            uint32_t _shard_id, const rgw_meta_sync_marker& _marker,
+                            std::string&& period_marker)
     : RGWBackoffControlCR(_sync_env->cct), sync_env(_sync_env),
-      pool(_pool), period(period), shard_id(_shard_id),
-      sync_marker(_marker), obj_ctx(sync_env->store) {}
+      pool(_pool), period(period), mdlog(mdlog), shard_id(_shard_id),
+      sync_marker(_marker), period_marker(std::move(period_marker)),
+      obj_ctx(sync_env->store) {}
 
   RGWCoroutine *alloc_cr() {
-    return new RGWMetaSyncShardCR(sync_env, pool, period, shard_id,
-                                  sync_marker, backoff_ptr());
+    return new RGWMetaSyncShardCR(sync_env, pool, period, mdlog, shard_id,
+                                  sync_marker, period_marker, backoff_ptr());
   }
 
   RGWCoroutine *alloc_finisher_cr() {
     RGWRados *store = sync_env->store;
-    return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+    return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store, obj_ctx, pool,
                                                                sync_env->shard_obj_name(shard_id), &sync_marker);
   }
 };
 
 class RGWMetaSyncCR : public RGWCoroutine {
   RGWMetaSyncEnv *sync_env;
-
+  const rgw_bucket& pool;
+  RGWPeriodHistory::Cursor cursor; //< sync position in period history
+  RGWPeriodHistory::Cursor next; //< next period in history
   rgw_meta_sync_status sync_status;
 
   map<int, RGWMetaSyncShardControlCR *> shard_crs;
 
-
 public:
-  RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const rgw_meta_sync_status& _sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                                     sync_status(_sync_status) {
-  }
+  RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, RGWPeriodHistory::Cursor cursor,
+                const rgw_meta_sync_status& _sync_status)
+    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+      pool(sync_env->store->get_zone_params().log_pool),
+      cursor(cursor), sync_status(_sync_status) {}
 
   int operate() {
+    int ret = 0;
     reenter(this) {
-      yield {
-       map<uint32_t, rgw_meta_sync_marker>::iterator iter = sync_status.sync_markers.begin();
-       for (; iter != sync_status.sync_markers.end(); ++iter) {
-         uint32_t shard_id = iter->first;
-         rgw_meta_sync_marker marker;
-
-         RGWMetaSyncShardControlCR *shard_cr = new RGWMetaSyncShardControlCR(sync_env,
-                                                                              sync_env->store->get_zone_params().log_pool,
-                                                                              sync_status.sync_info.period, shard_id,
-                                                                              sync_status.sync_markers[shard_id]);
+      // loop through one period at a time
+      for (;;) {
+        if (cursor == sync_env->store->period_history->get_current()) {
+          next = RGWPeriodHistory::Cursor{};
+          if (cursor) {
+            ldout(cct, 10) << "RGWMetaSyncCR on current period="
+                << cursor.get_period().get_id() << dendl;
+          } else {
+            ldout(cct, 10) << "RGWMetaSyncCR with no period" << dendl;
+          }
+        } else {
+          next = cursor;
+          next.next();
+          ldout(cct, 10) << "RGWMetaSyncCR on period="
+              << cursor.get_period().get_id() << ", next="
+              << next.get_period().get_id() << dendl;
+        }
 
+        yield {
+          // get the mdlog for the current period (may be empty)
+          auto& period_id = sync_status.sync_info.period;
+          auto mdlog = sync_env->store->meta_mgr->get_log(period_id);
+
+          // sync this period on each shard
+          for (const auto& m : sync_status.sync_markers) {
+            uint32_t shard_id = m.first;
+            auto& marker = m.second;
+
+            std::string period_marker;
+            if (next) {
+              // read the maximum marker from the next period's sync status
+              period_marker = next.get_period().get_sync_status()[shard_id];
+              if (period_marker.empty()) {
+                // no metadata changes have occurred on this shard, skip it
+                ldout(cct, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
+                    << " with empty period marker" << dendl;
+                continue;
+              }
+            }
 
-         shard_crs[shard_id] = shard_cr;
-          spawn(shard_cr, true);
+            auto cr = new RGWMetaSyncShardControlCR(sync_env, pool, period_id,
+                                                    mdlog, shard_id, marker,
+                                                    std::move(period_marker));
+            // XXX: do we need to hold a ref on cr while it's in shard_crs?
+            shard_crs[shard_id] = cr;
+            spawn(cr, false);
+          }
         }
+        // wait for each shard to complete
+        collect(&ret);
+        if (ret < 0) {
+          return set_cr_error(ret);
+        }
+        drain_all();
+        // advance to the next period
+        assert(next);
+        cursor = next;
+
+        // write the updated sync info
+        sync_status.sync_info.period = cursor.get_period().get_id();
+        sync_status.sync_info.realm_epoch = cursor.get_epoch();
+        yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados,
+                                                                 sync_env->store, pool,
+                                                                 sync_env->status_oid(),
+                                                                 sync_status.sync_info));
       }
-      yield return set_cr_done();
     }
     return 0;
   }