]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: rework incremental md sync error handling
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 30 Oct 2015 23:59:44 +0000 (16:59 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:27 +0000 (16:13 -0800)
similar to what we do with the full md sync. Identify transient
errors, and if so return -EAGAIN so that caller would retry.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync.cc

index 67f08e0d8a70984efdc0d0b9d34f4914fa79e434..c3d29db4b8fe11950e3e31547177c2b4b648d16f 100644 (file)
@@ -890,6 +890,7 @@ public:
         }
 
         if (sync_status < 0) {
+#warning need to store entry for non-transient errors
           ldout(sync_env->cct, 10) << *this << ": failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << dendl;
           log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
           return set_cr_error(sync_status);
@@ -902,15 +903,15 @@ public:
 
       sync_status = retcode;
 
-      yield {
-        /* update marker */
-        int ret = call(marker_tracker->finish(entry_marker));
-        if (ret < 0) {
-          ldout(sync_env->cct, 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl;
-          return set_cr_error(sync_status);
-        }
-      }
       if (sync_status == 0) {
+        yield {
+          /* update marker */
+          int ret = call(marker_tracker->finish(entry_marker));
+          if (ret < 0) {
+            ldout(sync_env->cct, 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl;
+            return set_cr_error(sync_status);
+          }
+        }
         sync_status = retcode;
       }
       if (sync_status < 0) {
@@ -974,6 +975,7 @@ class RGWMetaSyncShardCR : public RGWCoroutine {
   uint32_t shard_id;
   rgw_meta_sync_marker sync_marker;
   string marker;
+  string max_marker;
 
   map<string, bufferlist> entries;
   map<string, bufferlist>::iterator iter;
@@ -1227,6 +1229,7 @@ public:
 
   int incremental_sync() {
     reenter(&incremental_cr) {
+      can_adjust_marker = true;
       /* grab lock */
       if (!lease_cr) { /* could have had  a lease_cr lock from previous state */
         yield {
@@ -1255,6 +1258,14 @@ public:
       set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
                                                          sync_env->shard_obj_name(shard_id),
                                                          sync_marker));
+
+      /*
+       * mdlog_marker: the remote sync marker positiion
+       * sync_marker: the local sync marker position
+       * max_marker: the max mdlog position that we fetched
+       * marker: the current position we try to sync
+       */
+      marker = max_marker = sync_marker.marker;
       /* inc sync */
       do {
         if (!lease_cr->is_locked()) {
@@ -1263,30 +1274,49 @@ public:
         }
 #define INCREMENTAL_MAX_ENTRIES 100
        ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
-       if (mdlog_marker <= sync_marker.marker) {
+       if (mdlog_marker <= max_marker) {
          /* we're at the tip, try to bring more entries */
           ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
          yield call(new RGWCloneMetaLogCoroutine(sync_env, shard_id, mdlog_marker, &mdlog_marker));
        }
+        if (retcode < 0) {
+          ldout(sync_env->cct, 10) << *this << ": failed to fetch more log entries, retcode=" << retcode << dendl;
+          return retcode;
+        }
        ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
-       if (mdlog_marker > sync_marker.marker) {
-          yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
+       if (mdlog_marker > max_marker) {
+          marker = max_marker;
+          yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
           for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
             ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
             marker_tracker->start(log_iter->id);
             raw_key = log_iter->section + ":" + log_iter->name;
-            yield spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
-            if (retcode < 0) {
-              return retcode;
+            yield {
+              RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
+              if (retcode < 0) {
+                return retcode;
+              }
+              assert(stack);
+              stack->get();
+
+              stack_to_pos[stack] = log_iter->id;
+              pos_to_prev[log_iter->id] = marker;
+              marker = log_iter->id;
+            }
           }
-         }
-       }
-       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
-       if (mdlog_marker == sync_marker.marker) {
+        }
+        collect_children();
+       ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+       if (mdlog_marker == max_marker && can_adjust_marker) {
 #define INCREMENTAL_INTERVAL 20
          yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
        }
-      } while (true);
+      } while (can_adjust_marker);
+
+      while (num_spawned() > 1) {
+        yield wait_for_child();
+        collect_children();
+      }
 
       yield lease_cr->go_down();
 
@@ -1295,6 +1325,10 @@ public:
       if (lost_lock) {
         return -EBUSY;
       }
+
+      if (!can_adjust_marker) {
+        return -EAGAIN;
+      }
     }
     /* TODO */
     return 0;