]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: convert old replicalog entries if needed
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 21 Jan 2015 00:36:34 +0000 (16:36 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 28 Jan 2015 23:56:10 +0000 (15:56 -0800)
If reading a bucket replicalog entry and one doesn't exist, fall back to
the old key, and convert it to the new one. When updating entries, if
entry does not exist do the same.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_replica_log.cc
src/rgw/rgw_replica_log.h
src/rgw/rgw_rest_replica_log.cc

index f88b132bd29f8aaa5468304a89224ca97ae7f234..747cce68cd2f02a6bc36411547abaac9d6dcf734 100644 (file)
@@ -56,7 +56,8 @@ int RGWReplicaLogger::open_ioctx(librados::IoCtx& ctx, const string& pool)
 int RGWReplicaLogger::update_bound(const string& oid, const string& pool,
                                    const string& daemon_id,
                                    const string& marker, const utime_t& time,
-                                   const list<RGWReplicaItemMarker> *entries)
+                                   const list<RGWReplicaItemMarker> *entries,
+                                   bool need_to_exist)
 {
   cls_replica_log_progress_marker progress;
   progress.entity_id = daemon_id;
@@ -71,12 +72,15 @@ int RGWReplicaLogger::update_bound(const string& oid, const string& pool,
   }
 
   librados::ObjectWriteOperation opw;
+  if (need_to_exist) {
+    opw.assert_exists();
+  }
   cls_replica_log_update_bound(opw, progress);
   return ioctx.operate(oid, &opw);
 }
 
-int RGWReplicaLogger::delete_bound(const string& oid, const string& pool,
-                                   const string& daemon_id, bool purge_all)
+int RGWReplicaLogger::write_bounds(const string& oid, const string& pool,
+                                   RGWReplicaBounds& bounds)
 {
   librados::IoCtx ioctx;
   int r = open_ioctx(ioctx, pool);
@@ -84,12 +88,40 @@ int RGWReplicaLogger::delete_bound(const string& oid, const string& pool,
     return r;
   }
 
-  if (purge_all) {
-    return ioctx.remove(oid);
+  librados::ObjectWriteOperation opw;
+  list<RGWReplicaProgressMarker>::iterator iter = bounds.markers.begin();
+  for (; iter != bounds.markers.end(); ++iter) {
+    RGWReplicaProgressMarker& progress = *iter;
+    cls_replica_log_update_bound(opw, progress);
+  }
+
+  r = ioctx.operate(oid, &opw);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+int RGWReplicaLogger::delete_bound(const string& oid, const string& pool,
+                                   const string& daemon_id, bool purge_all,
+                                   bool need_to_exist)
+{
+  librados::IoCtx ioctx;
+  int r = open_ioctx(ioctx, pool);
+  if (r < 0) {
+    return r;
   }
 
   librados::ObjectWriteOperation opw;
-  cls_replica_log_delete_bound(opw, daemon_id);
+  if (need_to_exist) {
+    opw.assert_exists();
+  }
+  if (purge_all) {
+    opw.remove();
+  } else {
+    cls_replica_log_delete_bound(opw, daemon_id);
+  }
   return ioctx.operate(oid, &opw);
 }
 
@@ -159,12 +191,14 @@ string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id,
 
 int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
                    const string& marker, const utime_t& time,
-                   const list<RGWReplicaItemMarker> *entries)
+                   const list<RGWReplicaItemMarker> *entries,
+                   bool index_by_instance)
 {
   if (shard_id >= 0 ||
       !BucketIndexShardsManager::is_shards_marker(marker)) {
     return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id, true), pool,
-                                          daemon_id, marker, time, entries);
+                                          daemon_id, marker, time, entries,
+                                          false);
   }
 
   BucketIndexShardsManager sm;
@@ -178,11 +212,30 @@ int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id,
 
   ret = 0;
 
+  bool no_shards = (vals.size() == 1);
+
   map<int, string>::iterator iter;
   for (iter = vals.begin(); iter != vals.end(); ++iter) {
+    bool need_to_exist = index_by_instance && no_shards; /*
+                                                          * don't need to exist if not indexing by instance,
+                                                          * also, we only care about non-sharded
+                                                          * buckets, as these are the ones that
+                                                          * might not be indexed by instance id
+                                                          */
     ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
     int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
-                                          daemon_id, iter->second, time, entries);
+                                          daemon_id, iter->second, time, entries,
+                                          need_to_exist);
+
+    if (r == -ENOENT && need_to_exist) {
+      RGWReplicaBounds bounds;
+      r = convert_old_bounds(bucket, -1, bounds);
+      if (r < 0 && r != -ENOENT) {
+        return r;
+      }
+      r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first, true), pool,
+                                         daemon_id, marker, time, entries, false);
+    }
     if (r < 0) {
       ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
       ret = r;
@@ -191,3 +244,59 @@ int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id,
 
   return ret;
 }
+
+int RGWReplicaBucketLogger::delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool index_by_instance, bool purge_all)
+{
+  bool need_to_exist = index_by_instance; /* don't need to exist if not indexing by instance */
+  int r = RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, index_by_instance), pool, daemon_id, purge_all, need_to_exist);
+  if (r != -ENOENT) {
+    return r;
+  }
+  /*
+   * can only get here if need_to_exist == true,
+   * entry is not found, let's convert old entry if exists
+   */
+  RGWReplicaBounds bounds;
+  r = convert_old_bounds(bucket, shard_id, bounds);
+  if (r < 0 && r != -ENOENT) {
+    return r;
+  }
+  return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, index_by_instance), pool, daemon_id, purge_all, false);
+}
+
+int RGWReplicaBucketLogger::get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds, bool index_by_instance) {
+  int r = RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, index_by_instance), pool, bounds);
+  if (r != -ENOENT || !index_by_instance) {
+    return r;
+  }
+
+  r = convert_old_bounds(bucket, shard_id, bounds);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+int RGWReplicaBucketLogger::convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
+  string old_key = obj_name(bucket, shard_id, false);
+  string new_key = obj_name(bucket, shard_id, true);
+
+  /* couldn't find when indexed by instance, retry with old key by bucket name only */
+  int r = RGWReplicaLogger::get_bounds(old_key, pool, bounds);
+  if (r < 0) {
+    return r;
+  }
+  /* convert to new keys */
+  r = RGWReplicaLogger::write_bounds(new_key, pool, bounds);
+  if (r < 0) {
+    return r;
+  }
+
+  string daemon_id;
+  r = RGWReplicaLogger::delete_bound(old_key, pool, daemon_id, true, false); /* purge all */
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
index 16209a08a7b1240b81475c470c962ecb654e30b5..b00080037f2aeb15256df8748e9316a27634378d 100644 (file)
@@ -50,9 +50,13 @@ protected:
   int update_bound(const string& oid, const string& pool,
                    const string& daemon_id, const string& marker,
                    const utime_t& time,
-                   const list<RGWReplicaItemMarker> *entries);
+                   const list<RGWReplicaItemMarker> *entries,
+                   bool need_to_exist);
+  int write_bounds(const string& oid, const string& pool,
+                 RGWReplicaBounds& bounds);
   int delete_bound(const string& oid, const string& pool,
-                   const string& daemon_id, bool purge_all);
+                   const string& daemon_id, bool purge_all,
+                   bool need_to_exist);
   int get_bounds(const string& oid, const string& pool,
                  RGWReplicaBounds& bounds);
 };
@@ -79,13 +83,13 @@ public:
     string oid;
     get_shard_oid(shard, oid);
     return RGWReplicaLogger::update_bound(oid, pool,
-                                          daemon_id, marker, time, entries);
+                                          daemon_id, marker, time, entries, false);
   }
   int delete_bound(int shard, const string& daemon_id, bool purge_all) {
     string oid;
     get_shard_oid(shard, oid);
     return RGWReplicaLogger::delete_bound(oid, pool,
-                                          daemon_id, purge_all);
+                                          daemon_id, purge_all, false);
   }
   int get_bounds(int shard, RGWReplicaBounds& bounds) {
     string oid;
@@ -104,15 +108,11 @@ public:
   RGWReplicaBucketLogger(RGWRados *_store);
   int update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
                    const string& marker, const utime_t& time,
-                   const list<RGWReplicaItemMarker> *entries);
-  int delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool index_by_instance, bool purge_all) {
-    return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id, index_by_instance), pool,
-                                          daemon_id, purge_all);
-  }
-  int get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds, bool index_by_instance) {
-    return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id, index_by_instance), pool,
-                                        bounds);
-  }
+                   const list<RGWReplicaItemMarker> *entries,
+                   bool index_by_instance);
+  int delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id, bool index_by_instance, bool purge_all);
+  int get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds, bool index_by_instance);
+  int convert_old_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds);
 };
 
 #endif /* RGW_REPLICA_LOG_H_ */
index 2f6d760fddbf0f9038548baf98e813503290e76e..ddcf635814de9031999ba75fa8c80522aaf801c8 100644 (file)
@@ -248,7 +248,7 @@ void RGWOp_BILog_SetBounds::execute() {
     return;
   }
 
-  http_ret = rl.update_bound(bucket, shard_id, daemon_id, marker, ut, &markers);
+  http_ret = rl.update_bound(bucket, shard_id, daemon_id, marker, ut, &markers, index_by_instance);
 }
 
 void RGWOp_BILog_GetBounds::execute() {