]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add guard class for bucket index lock
authorOrit Wasserman <owasserm@redhat.com>
Mon, 1 May 2017 06:24:19 +0000 (09:24 +0300)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:39 +0000 (13:17 -0700)
Signed-off-by: Orit Wasserman <owasserm@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index 4b0774eebbd7c283685fb9f2930b75a35e2cdf5a..fde3a0a3149120179f749fad5f31991e16b5c82f 100644 (file)
@@ -5391,7 +5391,10 @@ int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
   librados::IoCtx index_ctx; // context for new bucket
 
   /* handle on going bucket resharding */
-  int r = reshard->block_while_resharding(bucket_info.bucket.oid);
+  BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid,
+                      reshard_pool_ctx);
+
+  int r = reshard->block_while_resharding(bucket_info.bucket.oid, guard);
   if (r < 0) {
     return r;
   }
@@ -5399,7 +5402,6 @@ int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
   string dir_oid =  dir_oid_prefix;
   r = open_bucket_index_ctx(bucket_info, index_ctx);
   if (r < 0) {
-    reshard->unlock_bucket_index(bucket_info.bucket.oid);
     return r;
   }
 
@@ -5408,11 +5410,7 @@ int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
   map<int, string> bucket_objs;
   get_bucket_index_objects(dir_oid, num_shards, bucket_objs);
 
-  r =  CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
-
-  reshard->unlock_bucket_index(bucket_info.bucket.oid);
-
-  return r;
+  return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
 }
 
 void RGWRados::create_bucket_id(string *bucket_id)
@@ -8229,20 +8227,20 @@ int RGWRados::bucket_check_index(RGWBucketInfo& bucket_info,
   map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
 
   /* handle on going bucket resharding */
-  int ret = reshard->block_while_resharding(bucket_info.bucket.oid);
+  BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid,
+                            reshard_pool_ctx);
+  int ret = reshard->block_while_resharding(bucket_info.bucket.oid, guard);
   if (ret < 0) {
     return ret;
   }
 
   ret = open_bucket_index(bucket_info, index_ctx, oids, bucket_objs_ret);
   if (ret < 0) {
-      reshard->unlock_bucket_index(bucket_info.bucket.oid);
       return ret;
   }
 
   ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
   if (ret < 0) {
-      reshard->unlock_bucket_index(bucket_info.bucket.oid);
       return ret;
   }
 
@@ -8262,25 +8260,21 @@ int RGWRados::bucket_rebuild_index(RGWBucketInfo& bucket_info)
   map<int, string> bucket_objs;
 
   /* handle on going bucket resharding */
-  int r = reshard->block_while_resharding(bucket_info.bucket.oid);
+  BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid,
+                            reshard_pool_ctx);
+  int r = reshard->block_while_resharding(bucket_info.bucket.oid, guard);
   if (r < 0) {
     return r;
   }
 
   r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
   if (r < 0) {
-    reshard->unlock_bucket_index(bucket_info.bucket.oid);
     return r;
   }
 
-  r =  CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
-
-  reshard->unlock_bucket_index(bucket_info.bucket.oid);
-
-  return r;
+  return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
 }
 
-
 int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj)
 {
   RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
@@ -9542,7 +9536,9 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_t
   BucketShard *bs;
 
   /* handle on going bucket resharding */
-  int ret = store->reshard->block_while_resharding(target->get_bucket().oid);
+  BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid,
+                            store->reshard_pool_ctx);
+  int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard);
   if (ret < 0) {
     return ret;
   }
@@ -9550,7 +9546,6 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_t
   ret = get_bucket_shard(&bs);
   if (ret < 0) {
     ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
-    store->reshard->unlock_bucket_index(target->get_bucket().oid);
     return ret;
   }
 
@@ -9564,12 +9559,10 @@ int RGWRados::Bucket::UpdateIndex::prepare(RGWModifyOp op, const string *write_t
 
   int r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace);
   if (r < 0) {
-    store->reshard->unlock_bucket_index(target->get_bucket().oid);
     return r;
   }
   prepared = true;
 
-  store->reshard->unlock_bucket_index(target->get_bucket().oid);
   return 0;
 }
 
@@ -9588,7 +9581,9 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch,
   BucketShard *bs;
 
   /* handle on going bucket resharding */
-  int ret = store->reshard->block_while_resharding(target->get_bucket().oid);
+  BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid,
+                            store->reshard_pool_ctx);
+  int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard);
   if (ret < 0) {
     return ret;
   }
@@ -9596,7 +9591,6 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch,
   ret = get_bucket_shard(&bs);
   if (ret < 0) {
     ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
-    store->reshard->unlock_bucket_index(target->get_bucket().oid);
     return ret;
   }
 
@@ -9627,8 +9621,6 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch,
     lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
   }
 
-  store->reshard->unlock_bucket_index(target->get_bucket().oid);
-
   return ret;
 }
 
@@ -9643,7 +9635,9 @@ int RGWRados::Bucket::UpdateIndex::complete_del(int64_t poolid, uint64_t epoch,
   BucketShard *bs;
 
   /* handle on going bucket resharding */
-  int ret = store->reshard->block_while_resharding(target->get_bucket().oid);
+  BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid,
+                            store->reshard_pool_ctx);
+  int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard);
   if (ret < 0) {
     return ret;
   }
@@ -9651,7 +9645,6 @@ int RGWRados::Bucket::UpdateIndex::complete_del(int64_t poolid, uint64_t epoch,
   ret = get_bucket_shard(&bs);
   if (ret < 0) {
     ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
-    store->reshard->unlock_bucket_index(target->get_bucket().oid);
     return ret;
   }
 
@@ -9662,7 +9655,6 @@ int RGWRados::Bucket::UpdateIndex::complete_del(int64_t poolid, uint64_t epoch,
     lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
   }
 
-  store->reshard->unlock_bucket_index(target->get_bucket().oid);
   return ret;
 }
 
@@ -9676,7 +9668,9 @@ int RGWRados::Bucket::UpdateIndex::cancel()
   BucketShard *bs;
 
   /* handle on going bucket resharding */
-  int ret = store->reshard->block_while_resharding(target->get_bucket().oid);
+  BucketIndexLockGuard guard(store->ctx(), store, target->get_bucket().bucket_id, target->get_bucket().oid,
+                            store->reshard_pool_ctx);
+  int ret = store->reshard->block_while_resharding(target->get_bucket().oid, guard);
   if (ret < 0) {
     return ret;
   }
@@ -9684,7 +9678,6 @@ int RGWRados::Bucket::UpdateIndex::cancel()
   ret = get_bucket_shard(&bs);
   if (ret < 0) {
     ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
-    store->reshard->unlock_bucket_index(target->get_bucket().oid);
     return ret;
   }
 
@@ -9700,7 +9693,6 @@ int RGWRados::Bucket::UpdateIndex::cancel()
     lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
   }
 
-  store->reshard->unlock_bucket_index(target->get_bucket().oid);
   return ret;
 }
 
@@ -10501,7 +10493,9 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
   }
 
   /* handle on going bucket resharding */
-  r = reshard->block_while_resharding(bucket_info.bucket.oid);
+  BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid,
+                            reshard_pool_ctx);
+  r = reshard->block_while_resharding(bucket_info.bucket.oid, guard);
   if (r < 0) {
     return r;
   }
@@ -10510,7 +10504,6 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
   r = bs.init(obj_instance.bucket, obj_instance);
   if (r < 0) {
     ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
-    reshard->unlock_bucket_index(bucket_info.bucket.oid);
     return r;
   }
   
@@ -10523,12 +10516,9 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
   }
 
   cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
-  r = cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
-                                unmod_since, high_precision_time,
-                                get_zone().log_data, zones_trace);
-
-  reshard->unlock_bucket_index(bucket_info.bucket.oid);
-    return r;
+  return cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
+                                 unmod_since, high_precision_time,
+                                 get_zone().log_data, zones_trace);
 }
 
 void RGWRados::bucket_index_guard_olh_op(RGWObjState& olh_state, ObjectOperation& op)
@@ -11949,7 +11939,9 @@ int RGWRados::trim_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, stri
   map<int, string> bucket_objs;
 
   /* handle on going bucket resharding */
-  int r = reshard->block_while_resharding(bucket_info.bucket.oid);
+  BucketIndexLockGuard guard(cct, this, bucket_info.bucket.bucket_id, bucket_info.bucket.oid,
+                            reshard_pool_ctx);
+  int r = reshard->block_while_resharding(bucket_info.bucket.oid, guard);
   if (r < 0) {
     return r;
   }
@@ -11959,26 +11951,22 @@ int RGWRados::trim_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, stri
 
   r = open_bucket_index(bucket_info, index_ctx, bucket_objs, shard_id);
   if (r < 0) {
-    reshard->unlock_bucket_index(bucket_info.bucket.oid);
     return r;
   }
 
   r = start_marker_mgr.from_string(start_marker, shard_id);
   if (r < 0) {
-    reshard->unlock_bucket_index(bucket_info.bucket.oid);
     return r;
   }
 
   r = end_marker_mgr.from_string(end_marker, shard_id);
   if (r < 0) {
-    reshard->unlock_bucket_index(bucket_info.bucket.oid);
     return r;
   }
 
-  r = CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
-                          cct->_conf->rgw_bucket_index_max_aio)();
+  return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
+                             cct->_conf->rgw_bucket_index_max_aio)();
 
-  reshard->unlock_bucket_index(bucket_info.bucket.oid);
   return r;
 }
 
index 0f1e990a21c6d16866a1d7fbd2e01525a80fe7ff..daf7be2388898db158205f3042ef6a0182df3477 100644 (file)
@@ -2181,6 +2181,7 @@ class RGWRados
   friend class RGWStateLog;
   friend class RGWReplicaLogger;
   friend class RGWReshard;
+  friend class BucketIndexLockGuard;
 
   /** Open the pool used as root for this gateway */
   int open_root_pool_ctx();
index 896d9b46205234e0dcd6a00efcbbaae6348b3abc..1ba40cd43209fddb785d890ad75ad67a866e5342 100644 (file)
@@ -100,9 +100,13 @@ int RGWReshard::remove(cls_rgw_reshard_entry& entry)
   return ret;
 }
 
+std::string create_bucket_index_lock_name(const string& bucket_instance_id) {
+  return bucket_instance_lock_name + "." + bucket_instance_id;
+}
+
 int RGWReshard::set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
 {
-  rados::cls::lock::Lock l(reshard_lock_name);
+  rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
 
   if (entry.new_instance_id.empty()) {
     ldout(cct, 0) << "RGWReshard::" << __func__ << " missing new bucket instance id" << dendl;
@@ -155,7 +159,7 @@ done:
 
 int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
 {
-  rados::cls::lock::Lock l(bucket_instance_lock_name);
+  rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
 
   int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid);
   if (ret == -EBUSY) {
@@ -214,18 +218,18 @@ int RGWReshard::unlock_bucket_index(const string& oid)
   return 0;
 }
 
-
 const int num_retries = 10;
 const int default_reshard_sleep_duration = 30;
 
-int RGWReshard::block_while_resharding(const string& bucket_instance_oid)
+int RGWReshard::block_while_resharding(const string& bucket_instance_oid,
+                                                                 BucketIndexLockGuard& guard)
 {
   int ret = 0;
   cls_rgw_bucket_instance_entry entry;
   bool resharding = false;
 
   for (int i=0; i< num_retries;i++) {
-    ret = lock_bucket_index_shared(bucket_instance_oid);
+    ret = guard.lock();
     if (ret < 0) {
       return ret;
     }
@@ -239,7 +243,8 @@ int RGWReshard::block_while_resharding(const string& bucket_instance_oid)
     }
 
     if (resharding) {
-      ret = unlock_bucket_index(bucket_instance_oid);
+      /* clear resharding uses the same lock */
+      ret = guard.unlock();
       if (ret < 0) {
        return ret;
       }
@@ -249,9 +254,51 @@ int RGWReshard::block_while_resharding(const string& bucket_instance_oid)
     }
   }
   ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: bucket is still resharding, please retry" << dendl;
-  ret = unlock_bucket_index(bucket_instance_oid);
-  if (ret < 0) {
+  return -EAGAIN;
+}
+
+BucketIndexLockGuard::BucketIndexLockGuard(CephContext* _cct, RGWRados* _store,
+                                          const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
+  cct(_cct),store(_store),
+  l(create_bucket_index_lock_name(bucket_instance_id)),
+  oid(_oid), io_ctx(_io_ctx),locked(false)
+{
+}
+
+int BucketIndexLockGuard::lock()
+{
+  if (!locked) {
+    int ret = l.lock_shared(&store->reshard_pool_ctx, oid);
+    if (ret == -EBUSY) {
+      ldout(cct,0) << "RGWReshardLog::add failed to acquire lock on " << oid << dendl;
+      return 0;
+    }
+    if (ret < 0) {
+      return ret;
+    }
+    locked = true;
     return ret;
+  } else {
+    ldout(cct,0) << " % alread lock" << oid << dendl;
+    return -EBUSY;
   }
-  return -EAGAIN;
+}
+
+int BucketIndexLockGuard::unlock()
+{
+  if (locked) {
+    int ret = l.unlock(&io_ctx, oid);
+    if (ret <0) {
+      ldout(cct, 0) << "failed to unlock " << oid << dendl;
+    } else {
+      locked = false;
+    }
+    return ret;
+  }
+  return 0;
+}
+
+BucketIndexLockGuard::~BucketIndexLockGuard()
+{
+  unlock();
 }
index 8ddb91109c02d0a04b95f44302a23de67d4acc15..205a7688c14616bff258ee161b969ea5ac3c972a 100644 (file)
 class CephContext;
 class RGWRados;
 
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_rgw
+
+/* gets a locked lock , release it when exiting context */
+class BucketIndexLockGuard
+{
+   CephContext *cct;
+   RGWRados *store;
+   rados::cls::lock::Lock l;
+   string oid;
+   librados::IoCtx io_ctx;
+   bool locked;
+public:
+  BucketIndexLockGuard(CephContext* cct, RGWRados* store, const string& bucket_instance_id,
+                                        const string& oid, const librados::IoCtx& io_ctx);
+  /* unlocks the lock */
+  ~BucketIndexLockGuard();
+protected:
+  friend class RGWReshard;
+  int lock();
+  int unlock();
+};
 
 class RGWReshard {
     CephContext *cct;
@@ -22,6 +40,9 @@ class RGWReshard {
     int max_jobs;
     rados::cls::lock::Lock instance_lock;
 
+    int lock_bucket_index_shared(const string& oid);
+    int unlock_bucket_index(const string& oid);
+
   public:
     RGWReshard(CephContext* cct, RGWRados* _store);
     int add(cls_rgw_reshard_entry& entry);
@@ -30,6 +51,11 @@ class RGWReshard {
     int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
     int set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
     int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
+    /*
+      if succefull, keeps the bucket index locked. It will be unlocked
+      in the guard dtor.
+     */
+    int block_while_resharding(const string& bucket_instance_oid, BucketIndexLockGuard& guard);
 };
 
 #endif