]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: reshard guard for versioned operations
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 12 May 2017 21:44:06 +0000 (14:44 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:46 +0000 (13:17 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index af6d46a5f1cbd5abbe75b77ec2de0ea76aa4916b..55da65a1ace1281fa2dd5dca39b32df210067875 100644 (file)
@@ -9522,7 +9522,7 @@ int RGWRados::stat_system_obj(RGWObjectCtx& obj_ctx,
 }
 
 
-int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard *pbs, std::function<int(BucketShard *)> call)
+int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard **pbs, std::function<int(BucketShard *)> call)
 {
   RGWRados *store = target->get_store();
   BucketShard *bs;
@@ -9691,7 +9691,7 @@ int RGWRados::Bucket::UpdateIndex::cancel()
   RGWRados *store = target->get_store();
   BucketShard *bs;
 
-  ret = guard_reshard(&bs, [&](BucketShard *bs) -> int { 
+  int ret = guard_reshard(&bs, [&](BucketShard *bs) -> int { 
     return store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags, zones_trace);
   });
 
@@ -10491,6 +10491,47 @@ int RGWRados::olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjStat
   return ret;
 }
 
+int RGWRados::guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call)
+{
+  rgw_obj obj;
+  const rgw_obj *pobj = &obj_instance;
+  int r;
+
+  for (int i = 0; i < NUM_RESHARD_RETRIES; ++i) {
+    r = bs->init(pobj->bucket, *pobj);
+    if (r < 0) {
+      ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
+      return r;
+    }
+    r = call(bs);
+    if (r != -ERR_BUSY_RESHARDING) {
+      break;
+    }
+    ldout(cct, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
+    RGWReshard reshard(this);
+    string new_bucket_id;
+    r = reshard.block_while_resharding(bs, &new_bucket_id);
+    if (r == -ERR_BUSY_RESHARDING) {
+      continue;
+    }
+    if (r < 0) {
+      return r;
+    }
+    ldout(cct, 20) << "reshard completion identified, new_bucket_id=" << new_bucket_id << dendl;
+    i = 0; /* resharding is finished, make sure we can retry */
+
+    obj = *pobj;
+    obj.bucket.update_bucket_id(new_bucket_id);
+    pobj = &obj;
+  }
+
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
 int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance,
                                     bool delete_marker,
                                     const string& op_tag,
@@ -10504,13 +10545,6 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
     return r;
   }
 
-  BucketShard bs(this);
-  r = bs.init(obj_instance.bucket, obj_instance);
-  if (r < 0) {
-    ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
-    return r;
-  }
-  
   rgw_zone_set zones_trace;
   if (_zones_trace) {
     zones_trace = *_zones_trace;
@@ -10519,10 +10553,23 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
     zones_trace.insert(get_zone().id);
   }
 
+  BucketShard bs(this);
+
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
-  return cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
-                                 unmod_since, high_precision_time,
-                                 get_zone().log_data, zones_trace);
+  r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int { 
+                    librados::ObjectWriteOperation op;
+                    cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+                    return cls_rgw_bucket_link_olh(bs->index_ctx, op,
+                                                   bs->bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
+                                                   unmod_since, high_precision_time,
+                                                   get_zone().log_data, zones_trace);
+                    });
+  if (r < 0) {
+    ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
+    return r;
+  }
+
+  return 0;
 }
 
 void RGWRados::bucket_index_guard_olh_op(RGWObjState& olh_state, ObjectOperation& op)
@@ -10540,23 +10587,24 @@ int RGWRados::bucket_index_unlink_instance(const RGWBucketInfo& bucket_info, con
     return r;
   }
 
-  BucketShard bs(this);
-  int ret = bs.init(obj_instance.bucket, obj_instance);
-  if (ret < 0) {
-    ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
-    return ret;
-  }
-  
   rgw_zone_set zones_trace;
   if (_zones_trace) {
     zones_trace = *_zones_trace;
   }
   zones_trace.insert(get_zone().id);
 
+  BucketShard bs(this);
+
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
-  ret = cls_rgw_bucket_unlink_instance(bs.index_ctx, bs.bucket_obj, key, op_tag, olh_tag, olh_epoch, get_zone().log_data, zones_trace);
-  if (ret < 0) {
-    return ret;
+  r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int { 
+                    librados::ObjectWriteOperation op;
+                    cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+                    return cls_rgw_bucket_unlink_instance(bs->index_ctx, op, bs->bucket_obj, key, op_tag,
+                                                          olh_tag, olh_epoch, get_zone().log_data, zones_trace);
+                    });
+  if (r < 0) {
+    ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
+    return r;
   }
 
   return 0;
@@ -10584,11 +10632,16 @@ int RGWRados::bucket_index_read_olh_log(const RGWBucketInfo& bucket_info, RGWObj
 
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
 
-  ObjectReadOperation op;
-
-  ret = cls_rgw_get_olh_log(bs.index_ctx, bs.bucket_obj, op, key, ver_marker, olh_tag, log, is_truncated);
-  if (ret < 0)
+  ret = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int { 
+                      ObjectReadOperation op;
+                      cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+                      return cls_rgw_get_olh_log(bs->index_ctx, bs->bucket_obj, op,
+                                                 key, ver_marker, olh_tag, log, is_truncated);
+                    });
+  if (ret < 0) {
+    ldout(cct, 20) << "cls_rgw_get_olh_log() returned r=" << r << dendl;
     return ret;
+  }
 
   return 0;
 }
@@ -10612,13 +10665,16 @@ int RGWRados::bucket_index_trim_olh_log(const RGWBucketInfo& bucket_info, RGWObj
 
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
 
-  ObjectWriteOperation op;
-
-  cls_rgw_trim_olh_log(op, key, ver, olh_tag);
-
-  ret = bs.index_ctx.operate(bs.bucket_obj, &op);
-  if (ret < 0)
+  ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int { 
+                      ObjectWriteOperation op;
+                      cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+                      cls_rgw_trim_olh_log(op, key, ver, olh_tag);
+                      return pbs->index_ctx.operate(pbs->bucket_obj, &op);
+                      });
+  if (ret < 0) {
+    ldout(cct, 20) << "cls_rgw_trim_olh_log() returned r=" << ret << dendl;
     return ret;
+  }
 
   return 0;
 }
@@ -10632,17 +10688,16 @@ int RGWRados::bucket_index_clear_olh(const RGWBucketInfo& bucket_info, RGWObjSta
   }
 
   BucketShard bs(this);
-  int ret = bs.init(obj_instance.bucket, obj_instance);
-  if (ret < 0) {
-    ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
-    return ret;
-  }
 
   string olh_tag(state.olh_tag.c_str(), state.olh_tag.length());
 
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
 
-  ret = cls_rgw_clear_olh(bs.index_ctx, bs.bucket_obj, key, olh_tag);
+  int ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int { 
+                          ObjectWriteOperation op;
+                          cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+                          return cls_rgw_clear_olh(pbs->index_ctx, op, pbs->bucket_obj, key, olh_tag);
+                          });
   if (ret < 0) {
     ldout(cct, 5) << "cls_rgw_clear_olh() returned ret=" << ret << dendl;
     return ret;
index da129430bfa2c033749d46b331b403b002d8d831..28fd2dd97937fa2ea5539052a0b4147718f8de35 100644 (file)
@@ -2882,7 +2882,7 @@ public:
         bs_initialized = false;
       }
 
-      int guard_reshard(BucketShard *pbs, std::function<int(BucketShard *)> call);
+      int guard_reshard(BucketShard **pbs, std::function<int(BucketShard *)> call);
     public:
 
       UpdateIndex(RGWRados::Bucket *_target, const rgw_obj& _obj) : target(_target), obj(_obj),
@@ -3247,6 +3247,8 @@ public:
   int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectWriteOperation *op);
   int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op);
 
+  int guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call);
+
   void bucket_index_guard_olh_op(RGWObjState& olh_state, librados::ObjectOperation& op);
   int olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
   int olh_init_modification_impl(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);