]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync, update and flush high marker
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 21 Mar 2016 23:50:32 +0000 (16:50 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 23 Mar 2016 17:35:28 +0000 (10:35 -0700)
We need to update the high marker even if we skip entries, and eventually
flush it. This is needed so that our position in the bucket index log that
we follow is reflected correctly.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync.h

index f53596f21e6b4713056fd981f0613b462091fdc0..24756de8fc6298f7d4c31c66267a95322d6dfabb 100644 (file)
@@ -2202,6 +2202,8 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   string instance;
   string ns;
 
+  string cur_id;
+
 
 
 public:
@@ -2270,32 +2272,44 @@ int RGWBucketShardIncrementalSyncCR::operate()
       entries_iter = list_result.begin();
       for (; entries_iter != list_result.end(); ++entries_iter) {
         entry = &(*entries_iter);
-        inc_marker.position = entry->id;
+        {
+          ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
+          if (p < 0) {
+            cur_id = entry->id;
+          } else {
+            cur_id = entry->id.substr(p + 1);
+          }
+        }
+        inc_marker.position = cur_id;
 
         if (!rgw_obj::parse_raw_oid(entries_iter->object, &name, &instance, &ns)) {
           set_status() << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry";
           ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl;
+          marker_tracker->try_update_high_marker(cur_id, 0, entries_iter->timestamp);
           continue;
         }
 
-        ldout(sync_env->cct, 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
+        ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
 
         if (!ns.empty()) {
-          set_status() << "skipping entry in namespace: " << entries_iter->object;
-          ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entries_iter->object << dendl;
+          set_status() << "skipping entry in namespace: " << entry->object;
+          ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
+          marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
 
-        key = rgw_obj_key(name, entries_iter->instance);
-        set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op;
+        key = rgw_obj_key(name, entry->instance);
+        set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
         if (entry->op == CLS_RGW_OP_CANCEL) {
           set_status() << "canceled operation, skipping";
           ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
+          marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
         if (entry->state != CLS_RGW_STATE_COMPLETE) {
           set_status() << "non-complete operation, skipping";
           ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl;
+          marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
         ldout(sync_env->cct, 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
@@ -2309,16 +2323,16 @@ int RGWBucketShardIncrementalSyncCR::operate()
           yield wait_for_child();
           
         }
-        if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) {
+        if (!marker_tracker->index_key_to_marker(key, entry->op, cur_id)) {
           set_status() << "can't do op, sync already in progress for object";
-          ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
-          marker_tracker->try_update_high_marker(entry->id, 0, entries_iter->timestamp);
+          ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
+          marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
           continue;
         }
         // yield {
           set_status() << "start object sync";
-          if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) {
-            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
+          if (!marker_tracker->start(cur_id, 0, entry->timestamp)) {
+            ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
           } else {
             uint64_t versioned_epoch = 0;
             bucket_entry_owner owner(entry->owner, entry->owner_display_name);
@@ -2328,7 +2342,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
             ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
             spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(sync_env, bucket_info, shard_id,
                                                          key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op,
-                                                         entry->state, entry->id, marker_tracker), false);
+                                                         entry->state, cur_id, marker_tracker), false);
           }
         // }
         while ((int)num_spawned() > spawn_window) {
@@ -2345,9 +2359,20 @@ int RGWBucketShardIncrementalSyncCR::operate()
       }
     } while (!list_result.empty());
 
+    yield {
+      call(marker_tracker->flush());
+    }
+    if (retcode < 0) {
+      ldout(sync_env->cct, 0) << "ERROR: marker_tracker->flush() returned retcode=" << retcode << dendl;
+      lease_cr->go_down();
+      drain_all();
+      return set_cr_error(retcode);
+    }
+
     lease_cr->go_down();
     /* wait for all operations to complete */
     drain_all();
+
     return set_cr_done();
   }
   return 0;
index 66c639c42f30e388ede737d47faf6ffc6a98a68e..679f91c6dbb27292ecf4665c227154a648f98dbd 100644 (file)
@@ -294,6 +294,7 @@ class RGWSyncShardMarkerTrack {
   typename std::map<T, marker_entry> pending;
 
   T high_marker;
+  T last_stored_marker;
   marker_entry high_entry;
 
   int window_size;
@@ -355,14 +356,19 @@ public:
     updates_since_flush++;
 
     if (is_first && (updates_since_flush >= window_size || pending.empty())) {
-      return update_marker(high_marker, high_entry);
+      return flush();
     }
     return NULL;
   }
 
-  RGWCoroutine *update_marker(const T& new_marker, marker_entry& entry) {
+  RGWCoroutine *flush() {
+    if (last_stored_marker == high_marker) {
+      return NULL;
+    }
+
     updates_since_flush = 0;
-    return store_marker(new_marker, entry.pos, entry.timestamp);
+    last_stored_marker = high_marker;
+    return store_marker(high_marker, high_entry.pos, high_entry.timestamp);
   }
 
   /*