]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: move RGWReshardBucket lock to its own separate class
authorJ. Eric Ivancich <ivancich@redhat.com>
Tue, 16 Oct 2018 20:40:03 +0000 (16:40 -0400)
committerJ. Eric Ivancich <ivancich@redhat.com>
Fri, 26 Oct 2018 15:38:55 +0000 (11:38 -0400)
There are other processes beyond resharding that would need to take a
bucket reshard lock (e.g., correcting bucet resharding flags in event
of crash, tools to remove bucket shard information from earlier
versions of ceph). Pulling this logic outside of RGWReshardBucket
allows this code to be re-used.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
src/rgw/rgw_rados.h
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index ccf067d197f1b20d71d63ca26359ac9ba2fa1647..6b6bcd381fc97fab348c22eeba04b4324bf35899 100644 (file)
@@ -2118,6 +2118,7 @@ class RGWRados : public AdminSocketHook
   friend class RGWDataSyncProcessorThread;
   friend class RGWReshard;
   friend class RGWBucketReshard;
+  friend class RGWBucketReshardLock;
   friend class BucketIndexLockGuard;
   friend class RGWCompleteMultipart;
 
index 367bdb12a670e9db0fdb87ff6a813d330ba541c5..7fa2725d5fc361ea9ae9e259b9f800f6bf72bbe5 100644 (file)
@@ -205,78 +205,11 @@ public:
 RGWBucketReshard::RGWBucketReshard(RGWRados *_store,
                                   const RGWBucketInfo& _bucket_info,
                                   const map<string, bufferlist>& _bucket_attrs,
-                                  RenewLocksCallback _renew_locks_callback) :
+                                  RGWBucketReshardLock* _outer_reshard_lock) :
   store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
-  reshard_lock(reshard_lock_name),
-  renew_locks_callback(_renew_locks_callback)
-{
-  const rgw_bucket& b = bucket_info.bucket;
-  reshard_oid = b.get_key(':');
-
-  const int lock_dur_secs =
-    store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
-  lock_duration = std::chrono::seconds(lock_dur_secs);
-
-#define COOKIE_LEN 16
-  char cookie_buf[COOKIE_LEN + 1];
-  gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
-  cookie_buf[COOKIE_LEN] = '\0';
-
-  reshard_lock.set_cookie(cookie_buf);
-  reshard_lock.set_duration(lock_duration);
-}
-
-int RGWBucketReshard::lock_bucket()
-{
-  reshard_lock.set_must_renew(false);
-  int ret = reshard_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
-                                                 reshard_oid);
-  if (ret < 0) {
-    ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " <<
-      reshard_oid << " ret=" << ret << dendl;
-    return ret;
-  }
-  lock_start_time = Clock::now();
-  lock_renew_thresh = lock_start_time + lock_duration / 2;
-
-  return 0;
-}
-
-void RGWBucketReshard::unlock_bucket()
-{
-  int ret = reshard_lock.unlock(&store->reshard_pool_ctx, reshard_oid);
-  if (ret < 0) {
-    ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
-  }
-}
-
-int RGWBucketReshard::renew_lock_bucket(const Clock::time_point& now)
-{
-  // assume outer locks have timespans at least the size of ours, so
-  // can call inside conditional
-  if (renew_locks_callback) {
-    int ret = renew_locks_callback(now);
-    if (ret < 0) {
-      return ret;
-    }
-  }
-
-  reshard_lock.set_must_renew(true);
-  int ret = reshard_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
-                                                 reshard_oid);
-  if (ret < 0) { /* expired or already locked by another processor */
-    ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
-      reshard_oid << " with " << cpp_strerror(-ret) << dendl;
-    return ret;
-  }
-  reshard_lock.set_must_renew(false);
-  lock_start_time = now;
-  lock_renew_thresh = lock_start_time + lock_duration / 2;
-  ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
-    reshard_oid << dendl;
-
-  return 0;
-}
+  reshard_lock(store, bucket_info, true),
+  outer_reshard_lock(_outer_reshard_lock)
+{ }
 
 int RGWBucketReshard::set_resharding_status(RGWRados* store,
                                            RGWBucketInfo& bucket_info,
@@ -385,14 +318,14 @@ int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
 
 int RGWBucketReshard::cancel()
 {
-  int ret = lock_bucket();
+  int ret = reshard_lock.lock();
   if (ret < 0) {
     return ret;
   }
 
   ret = clear_resharding();
 
-  unlock_bucket();
+  reshard_lock.unlock();
   return ret;
 }
 
@@ -458,6 +391,78 @@ public:
   }
 };
 
+
+RGWBucketReshardLock::RGWBucketReshardLock(RGWRados* _store,
+                                          const std::string& reshard_lock_oid,
+                                          bool _ephemeral) :
+  store(_store),
+  lock_oid(reshard_lock_oid),
+  ephemeral(_ephemeral),
+  internal_lock(reshard_lock_name)
+{
+  const int lock_dur_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
+  duration = std::chrono::seconds(lock_dur_secs);
+
+#define COOKIE_LEN 16
+  char cookie_buf[COOKIE_LEN + 1];
+  gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
+  cookie_buf[COOKIE_LEN] = '\0';
+
+  internal_lock.set_cookie(cookie_buf);
+  internal_lock.set_duration(duration);
+}
+
+int RGWBucketReshardLock::lock() {
+  internal_lock.set_must_renew(false);
+  int ret;
+  if (ephemeral) {
+    ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
+                                                lock_oid);
+  } else {
+    ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid);
+  }
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "RGWReshardLock::" << __func__ <<
+      " failed to acquire lock on " << lock_oid << " ret=" << ret << dendl;
+    return ret;
+  }
+  reset_time(Clock::now());
+
+  return 0;
+}
+
+void RGWBucketReshardLock::unlock() {
+  int ret = internal_lock.unlock(&store->reshard_pool_ctx, lock_oid);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "WARNING: RGWBucketReshardLock::" << __func__ <<
+      " failed to drop lock on " << lock_oid << " ret=" << ret << dendl;
+  }
+}
+
+int RGWBucketReshardLock::renew(const Clock::time_point& now) {
+  internal_lock.set_must_renew(true);
+  int ret;
+  if (ephemeral) {
+    ret = internal_lock.lock_exclusive_ephemeral(&store->reshard_pool_ctx,
+                                                lock_oid);
+  } else {
+    ret = internal_lock.lock_exclusive(&store->reshard_pool_ctx, lock_oid);
+  }
+  if (ret < 0) { /* expired or already locked by another processor */
+    ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
+      lock_oid << " with " << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+  internal_lock.set_must_renew(false);
+
+  reset_time(now);
+  ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
+    lock_oid << dendl;
+
+  return 0;
+}
+
+
 int RGWBucketReshard::do_reshard(int num_shards,
                                 RGWBucketInfo& new_bucket_info,
                                 int max_entries,
@@ -561,8 +566,16 @@ int RGWBucketReshard::do_reshard(int num_shards,
        }
 
        Clock::time_point now = Clock::now();
-       if (now >= lock_renew_thresh) {
-         ret = renew_lock_bucket(now);
+       if (reshard_lock.should_renew(now)) {
+         // assume outer locks have timespans at least the size of ours, so
+         // can call inside conditional
+         if (outer_reshard_lock) {
+           ret = outer_reshard_lock->renew(now);
+           if (ret < 0) {
+             return ret;
+           }
+         }
+         ret = reshard_lock.renew(now);
          if (ret < 0) {
            lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
            return ret;
@@ -641,7 +654,7 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries,
                               bool verbose, ostream *out, Formatter *formatter,
                              RGWReshard* reshard_log)
 {
-  int ret = lock_bucket();
+  int ret = reshard_lock.lock();
   if (ret < 0) {
     return ret;
   }
@@ -649,21 +662,22 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries,
   RGWBucketInfo new_bucket_info;
   ret = create_new_bucket_instance(num_shards, new_bucket_info);
   if (ret < 0) {
-    unlock_bucket();
+    reshard_lock.unlock();
     return ret;
   }
 
   if (reshard_log) {
     ret = reshard_log->update(bucket_info, new_bucket_info);
     if (ret < 0) {
-      unlock_bucket();
+      reshard_lock.unlock();
       return ret;
     }
   }
 
-  ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
+  ret = set_resharding_status(new_bucket_info.bucket.bucket_id,
+                             num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
   if (ret < 0) {
-    unlock_bucket();
+    reshard_lock.unlock();
     return ret;
   }
 
@@ -673,17 +687,18 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries,
                    verbose, out, formatter);
 
   if (ret < 0) {
-    unlock_bucket();
+    reshard_lock.unlock();
     return ret;
   }
 
-  ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_DONE);
+  ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards,
+                             CLS_RGW_RESHARD_DONE);
   if (ret < 0) {
-    unlock_bucket();
+    reshard_lock.unlock();
     return ret;
   }
 
-  unlock_bucket();
+  reshard_lock.unlock();
 
   return 0;
 }
@@ -887,49 +902,25 @@ int RGWReshard::process_single_logshard(int logshard_num)
 
   CephContext *cct = store->ctx();
   constexpr uint32_t max_entries = 1000;
-  int max_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
-  std::chrono::seconds lock_duration(max_secs);
-
-  rados::cls::lock::Lock l(reshard_lock_name);
-  l.set_duration(lock_duration);
-
-  char cookie_buf[COOKIE_LEN + 1];
-  gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
-  cookie_buf[COOKIE_LEN] = '\0';
-  l.set_cookie(cookie_buf);
 
   string logshard_oid;
   get_logshard_oid(logshard_num, &logshard_oid);
 
-  int ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
+  RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
+
+  int ret = logshard_lock.lock();
   if (ret == -EBUSY) { /* already locked by another processor */
-    ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
+    ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " <<
+      logshard_oid << dendl;
     return ret;
   }
-  Clock::time_point lock_start_time = Clock::now();
-  const char* const log_func_name = __func__;
-
-  auto renew_locks_callback =
-    [&l, &lock_start_time, &logshard_oid, &log_func_name, this](const Clock::time_point& now) -> int {
-      l.set_must_renew(true);
-      int ret = l.lock_exclusive(&this->store->reshard_pool_ctx, logshard_oid);
-      if (ret < 0) { /* expired or already locked by another processor */
-       ldout(this->store->ctx(), 5) << log_func_name <<
-         "[lambda](): failed to renew lock on " << logshard_oid <<
-         " with " << cpp_strerror(-ret) << dendl;
-       return ret;
-      }
-      lock_start_time = now;
-      ldout(this->store->ctx(), 20) << log_func_name <<
-       "[lambda](): successfully renewed lock on " << logshard_oid << dendl;
-      return 0;
-    };
 
   do {
     std::list<cls_rgw_reshard_entry> entries;
     ret = list(logshard_num, marker, max_entries, entries, &truncated);
     if (ret < 0) {
-      ldout(cct, 10) << "cannot list all reshards in logshard oid=" << logshard_oid << dendl;
+      ldout(cct, 10) << "cannot list all reshards in logshard oid=" <<
+       logshard_oid << dendl;
       continue;
     }
 
@@ -947,35 +938,39 @@ int RGWReshard::process_single_logshard(int logshard_num)
        ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name,
                                     bucket_info, nullptr, &attrs);
        if (ret < 0) {
-         ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
+         ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " <<
+           cpp_strerror(-ret) << dendl;
          return -ret;
        }
 
-       RGWBucketReshard br(store, bucket_info, attrs, renew_locks_callback);
+       RGWBucketReshard br(store, bucket_info, attrs, nullptr);
 
        Formatter* formatter = new JSONFormatter(false);
        auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
        ret = br.execute(entry.new_num_shards, max_entries, true, nullptr,
                         formatter, this);
        if (ret < 0) {
-         ldout (store->ctx(), 0) <<  __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
+         ldout (store->ctx(), 0) <<  __func__ <<
+           "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
            cpp_strerror(-ret)<< dendl;
          return ret;
        }
 
-       ldout (store->ctx(), 20) <<  " removing entry" << entry.bucket_name<< dendl;
+       ldout (store->ctx(), 20) <<  " removing entry" << entry.bucket_name <<
+         dendl;
 
        ret = remove(entry);
        if (ret < 0) {
-         ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
-                      << cpp_strerror(-ret) << dendl;
+         ldout(cct, 0)<< __func__ << ":Error removing bucket " <<
+           entry.bucket_name << " for resharding queue: " <<
+           cpp_strerror(-ret) << dendl;
          return ret;
        }
       }
 
       Clock::time_point now = Clock::now();
-      if (now >= lock_start_time + lock_duration / 2) {
-       ret = renew_locks_callback(now);
+      if (logshard_lock.should_renew(now)) {
+       ret = logshard_lock.renew(now);
        if (ret < 0) {
          return ret;
        }
@@ -985,7 +980,7 @@ int RGWReshard::process_single_logshard(int logshard_num)
     }
   } while (truncated);
 
-  l.unlock(&store->reshard_pool_ctx, logshard_oid);
+  logshard_lock.unlock();
   return 0;
 }
 
index 9ffe6ce8f2b224aba06f9db638606ed1aed7b0a1..26b20e29613cb16b57f9ca4ababef0015c23974f 100644 (file)
 class CephContext;
 class RGWRados;
 
+class RGWBucketReshardLock {
+  using Clock = ceph::coarse_mono_clock;
+
+  RGWRados* store;
+  const std::string lock_oid;
+  const bool ephemeral;
+  rados::cls::lock::Lock internal_lock;
+  std::chrono::seconds duration;
+
+  Clock::time_point start_time;
+  Clock::time_point renew_thresh;
+
+  void reset_time(const Clock::time_point& now) {
+    start_time = now;
+    renew_thresh = start_time + duration / 2;
+  }
+
+public:
+  RGWBucketReshardLock(RGWRados* _store,
+                      const std::string& reshard_lock_oid,
+                      bool _ephemeral);
+  RGWBucketReshardLock(RGWRados* _store,
+                      const RGWBucketInfo& bucket_info,
+                      bool _ephemeral) :
+    RGWBucketReshardLock(_store, bucket_info.bucket.get_key(':'), _ephemeral)
+  {}
+
+  int lock();
+  void unlock();
+  int renew(const Clock::time_point&);
+
+  bool should_renew(const Clock::time_point& now) const {
+    return now >= renew_thresh;
+  }
+}; // class RGWBucketReshardLock
+
 class RGWBucketReshard {
 public:
 
@@ -24,29 +60,19 @@ public:
 
   using Clock = ceph::coarse_mono_clock;
 
-  // returns 0 for success or a negative error code
-  using RenewLocksCallback = std::function<int(const Clock::time_point&)>;
-
 private:
 
   RGWRados *store;
   RGWBucketInfo bucket_info;
   std::map<string, bufferlist> bucket_attrs;
 
-  string reshard_oid;
-  rados::cls::lock::Lock reshard_lock;
-  Clock::time_point lock_start_time;
-  std::chrono::seconds lock_duration;
-  Clock::time_point lock_renew_thresh;
-
-  RenewLocksCallback renew_locks_callback;
+  RGWBucketReshardLock reshard_lock;
+  RGWBucketReshardLock* outer_reshard_lock;
 
-  int lock_bucket();
-  void unlock_bucket();
-  int renew_lock_bucket(const Clock::time_point&);
   int clear_resharding();
 
-  int create_new_bucket_instance(int new_num_shards, RGWBucketInfo& new_bucket_info);
+  int create_new_bucket_instance(int new_num_shards,
+                                RGWBucketInfo& new_bucket_info);
   int do_reshard(int num_shards,
                 RGWBucketInfo& new_bucket_info,
                 int max_entries,
@@ -55,10 +81,11 @@ private:
                 Formatter *formatter);
 public:
 
-  // pass nullptr for the final parameter if no callback is used
+  // pass nullptr for the final parameter if no outer reshard lock to
+  // manage
   RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
                    const std::map<string, bufferlist>& _bucket_attrs,
-                  RenewLocksCallback _renew_locks_callback);
+                  RGWBucketReshardLock* _outer_reshard_lock);
   int execute(int num_shards, int max_op_entries,
               bool verbose = false, ostream *out = nullptr,
               Formatter *formatter = nullptr,