]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync, update high marker when skipping duplicate sync entries
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 23 Dec 2015 00:09:45 +0000 (16:09 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:13:48 +0000 (16:13 -0800)
Need to update the high marker when skipping an entry, so that when the processing
entry finishes we set the markers correctly.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync.h
src/test/rgw/test_multi.py

index 36a1a645b9f71f7a6905d5db1a316d82843c88be..b16dbe11a6a8f94f831e7664c65ee422a40d470e 100644 (file)
@@ -975,6 +975,7 @@ public:
             ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
             if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
               ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
+              marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
               continue;
             }
             if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
@@ -2126,6 +2127,11 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   RGWContinuousLeaseCR *lease_cr;
   string status_oid;
 
+  string name;
+  string instance;
+  string ns;
+
+
 
 public:
   RGWBucketShardIncrementalSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
@@ -2195,10 +2201,25 @@ int RGWBucketShardIncrementalSyncCR::operate()
       }
       entries_iter = list_result.begin();
       for (; entries_iter != list_result.end(); ++entries_iter) {
-        key = rgw_obj_key(entries_iter->object, entries_iter->instance);
         entry = &(*entries_iter);
-        set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op;
         inc_marker.position = entry->id;
+
+        if (!rgw_obj::parse_raw_oid(entries_iter->object, &name, &instance, &ns)) {
+          set_status() << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry";
+          ldout(store->ctx(), 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl;
+          continue;
+        }
+
+        ldout(store->ctx(), 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
+
+        if (!ns.empty()) {
+          set_status() << "skipping entry in namespace: " << entries_iter->object;
+          ldout(store->ctx(), 20) << "skipping entry in namespace: " << entries_iter->object << dendl;
+          continue;
+        }
+
+        key = rgw_obj_key(name, entries_iter->instance);
+        set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op;
         if (entry->op == CLS_RGW_OP_CANCEL) {
           set_status() << "canceled operation, skipping";
           ldout(store->ctx(), 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
@@ -2223,6 +2244,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
         if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) {
           set_status() << "can't do op, sync already in progress for object";
           ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
+          marker_tracker->try_update_high_marker(entry->id, 0, entries_iter->timestamp);
           continue;
         }
         // yield {
index 556aec4bbd65746a249faf4e8d63a4fa4403a2a1..e386329f4f59ac73c28f97aa7828141f1d23c36b 100644 (file)
@@ -221,6 +221,13 @@ public:
     return true;
   }
 
+  void try_update_high_marker(const T& pos, int index_pos, const utime_t& timestamp) {
+    if (!(pos <= high_marker)) {
+      high_marker = pos;
+      high_entry = marker_entry(index_pos, timestamp);
+    }
+  }
+
   RGWCoroutine *finish(const T& pos) {
     if (pending.empty()) {
       /* can happen, due to a bug that ended up with multiple objects with the same name and version
index 28115f207bf6c3c193752fb309804e314b25efff..d8edcfb2f1dd70ba96e09b470d474be95d287b22 100644 (file)
@@ -584,13 +584,14 @@ def test_object_sync():
     for z in zone_bucket:
         all_zones.append(z)
 
-    objname = 'myobj'
+    objnames = [ 'myobj', '_myobj', ':', '&' ]
     content = 'asdasd'
 
     # don't wait for meta sync just yet
     for zone, bucket_name in zone_bucket.iteritems():
-        k = new_key(zone, bucket_name, objname)
-        k.set_contents_from_string(content)
+        for objname in objnames:
+            k = new_key(zone, bucket_name, objname)
+            k.set_contents_from_string(content)
 
     realm.meta_checkpoint()