]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: renew resharding locks to prevent expiration
authorJ. Eric Ivancich <ivancich@redhat.com>
Thu, 27 Sep 2018 17:31:57 +0000 (13:31 -0400)
committerJ. Eric Ivancich <ivancich@redhat.com>
Wed, 24 Oct 2018 21:52:37 +0000 (17:52 -0400)
Fix lock expiration problem with resharding. The resharding process
will renew its bucket lock (and logshard lock if necessary) when half
the remaining time is left on the lock. If the lock is expired and
cannot renew the process fails and errors out appropriately.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
src/cls/lock/cls_lock_client.h
src/cls/lock/cls_lock_ops.h
src/cls/lock/cls_lock_types.h
src/rgw/rgw_admin.cc
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index ae1a238117469568e4b4a632de423ca7d5c23291..002864edecc3c7c786241bdda2c5ead77eea6678 100644 (file)
@@ -4,6 +4,8 @@
 #ifndef CEPH_CLS_LOCK_CLIENT_H
 #define CEPH_CLS_LOCK_CLIENT_H
 
+#include <chrono>
+
 #include "cls/lock/cls_lock_types.h"
 
 namespace librados {
index 0138f6e0c2b94a90642faf643fc0a894c7d8f1b3..5d22452b3b2554d6a7ce02cc188b68aa08fcc2f0 100644 (file)
@@ -1,3 +1,6 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
 #ifndef CEPH_CLS_LOCK_OPS_H
 #define CEPH_CLS_LOCK_OPS_H
 
index b77bb06ac1d79a7dcdd53fdf2839f5efb30afebe..e8ec4518fa9a8ebd14e518adddbc96d69fa9e1f0 100644 (file)
@@ -1,3 +1,6 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
 #ifndef CEPH_CLS_LOCK_TYPES_H
 #define CEPH_CLS_LOCK_TYPES_H
 
index 047a506248ce25b82c34d7bd42ba16e6f46ca021..6da4d464b56406b57d03c434620dc5afafed47a0 100644 (file)
@@ -6017,7 +6017,7 @@ next:
       return ret;
     }
 
-    RGWBucketReshard br(store, bucket_info, attrs);
+    RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */);
 
 #define DEFAULT_RESHARD_MAX_ENTRIES 1000
     if (max_entries < 1) {
@@ -6103,7 +6103,6 @@ next:
     return 0;
   }
 
-
   if (opt_cmd == OPT_RESHARD_STATUS) {
     if (bucket_name.empty()) {
       cerr << "ERROR: bucket not specified" << std::endl;
@@ -6119,11 +6118,12 @@ next:
       return -ret;
     }
 
-    RGWBucketReshard br(store, bucket_info, attrs);
+    RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */);
     list<cls_rgw_bucket_instance_entry> status;
     int r = br.get_status(&status);
     if (r < 0) {
-      cerr << "ERROR: could not get resharding status for bucket " << bucket_name << std::endl;
+      cerr << "ERROR: could not get resharding status for bucket " <<
+       bucket_name << std::endl;
       return -r;
     }
 
@@ -6156,7 +6156,7 @@ next:
       return -ret;
     }
 
-    RGWBucketReshard br(store, bucket_info, attrs);
+    RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */);
     int ret = br.cancel();
     if (ret < 0) {
       if (ret == -EBUSY) {
index 474ea9baf83f96f01dbe30104836cd8af591bd45..3b563cd51383f8d7251348c3586c967e36207160 100644 (file)
@@ -65,8 +65,10 @@ class BucketReshardShard {
 public:
   BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
                      int _num_shard,
-                     deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
-                                                                       aio_completions(_completions) {
+                     deque<librados::AioCompletion *>& _completions) :
+    store(_store), bucket_info(_bucket_info), bs(store),
+    aio_completions(_completions)
+  {
     num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
     bs.init(bucket_info.bucket, num_shard);
   }
@@ -91,8 +93,10 @@ public:
         return ret;
       }
     }
+
     return 0;
   }
+
   int flush() {
     if (entries.size() == 0) {
       return 0;
@@ -156,14 +160,20 @@ public:
     }
   }
 
+  /*
+   * did_flush is set if not nullptr and a flush occurred; otherwise not altered
+   */
   int add_entry(int shard_index,
                 rgw_cls_bi_entry& entry, bool account, uint8_t category,
                 const rgw_bucket_category_stats& entry_stats) {
-    int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
+    int ret = target_shards[shard_index]->add_entry(entry, account, category,
+                                                   entry_stats);
     if (ret < 0) {
-      derr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << dendl;
+      derr << "ERROR: target_shards.add_entry(" << entry.idx <<
+       ") returned error: " << cpp_strerror(-ret) << dendl;
       return ret;
     }
+
     return 0;
   }
 
@@ -189,13 +199,21 @@ public:
   }
 };
 
-RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
-                                                     store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
-                                                     reshard_lock(reshard_lock_name), locked_bucket(false) {
+RGWBucketReshard::RGWBucketReshard(RGWRados *_store,
+                                  const RGWBucketInfo& _bucket_info,
+                                  const map<string, bufferlist>& _bucket_attrs,
+                                  RenewLocksCallback _renew_locks_callback) :
+  store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
+  reshard_lock(reshard_lock_name),
+  renew_locks_callback(_renew_locks_callback)
+{
   const rgw_bucket& b = bucket_info.bucket;
-  reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
+  reshard_oid = b.get_key(':');
+
+  const int lock_dur_secs =
+    store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
+  lock_duration = std::chrono::seconds(lock_dur_secs);
 
-  utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0);
 #define COOKIE_LEN 16
   char cookie_buf[COOKIE_LEN + 1];
   gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
@@ -205,23 +223,17 @@ RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucke
   reshard_lock.set_duration(lock_duration);
 }
 
-RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs, rados::cls::lock::Lock& _reshard_lock, const utime_t& _lock_start_time) :
-                                                     store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
-                                                     reshard_lock(_reshard_lock), lock_start_time(_lock_start_time), locked_bucket(true)
-{
-  const rgw_bucket& b = bucket_info.bucket;
-  reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
-}
-
 int RGWBucketReshard::lock_bucket()
 {
+  reshard_lock.set_must_renew(false);
   int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl;
     return ret;
   }
-  lock_start_time = ceph_clock_now();
-  locked_bucket = true;
+  lock_start_time = Clock::now();
+  lock_renew_thresh = lock_start_time + lock_duration / 2;
+
   return 0;
 }
 
@@ -231,27 +243,31 @@ void RGWBucketReshard::unlock_bucket()
   if (ret < 0) {
     ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
   }
-  locked_bucket = false;
 }
 
-
-int RGWBucketReshard::renew_lock_bucket()
+int RGWBucketReshard::renew_lock_bucket(const Clock::time_point& now)
 {
-  if (!locked_bucket) {
-    return 0;
-  }
-
-  utime_t now = ceph_clock_now();
- /* do you need to renew lock? */
-  if (now > lock_start_time + store->ctx()->_conf->rgw_reshard_bucket_lock_duration/ 2) {
-    reshard_lock.set_must_renew(true);
-    int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
-    if (ret == -EBUSY) { /* already locked by another processor */
-      ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << reshard_oid << dendl;
+  // assume outer locks have timespans at least the size of ours, so
+  // can call inside conditional
+  if (renew_locks_callback) {
+    int ret = renew_locks_callback(now);
+    if (ret < 0) {
       return ret;
     }
-    lock_start_time = now;
   }
+
+  reshard_lock.set_must_renew(true);
+  int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
+  if (ret < 0) { /* expired or already locked by another processor */
+    ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
+      reshard_oid << " with " << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+  lock_start_time = now;
+  lock_renew_thresh = lock_start_time + lock_duration / 2;
+  ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
+    reshard_oid << dendl;
+
   return 0;
 }
 
@@ -357,12 +373,17 @@ class BucketInfoReshardUpdate
   }
 
 public:
-  BucketInfoReshardUpdate(RGWRados *_store, RGWBucketInfo& _bucket_info,
-                          map<string, bufferlist>& _bucket_attrs, const string& new_bucket_id) : store(_store), 
-                                                                                                 bucket_info(_bucket_info),
-                                                                                                 bucket_attrs(_bucket_attrs) {
+  BucketInfoReshardUpdate(RGWRados *_store,
+                         RGWBucketInfo& _bucket_info,
+                          map<string, bufferlist>& _bucket_attrs,
+                         const string& new_bucket_id) :
+    store(_store),
+    bucket_info(_bucket_info),
+    bucket_attrs(_bucket_attrs)
+  {
     bucket_info.new_bucket_instance_id = new_bucket_id;
   }
+
   ~BucketInfoReshardUpdate() {
     if (in_progress) {
       bucket_info.new_bucket_instance_id.clear();
@@ -389,13 +410,12 @@ public:
   }
 };
 
-int RGWBucketReshard::do_reshard(
-                  int num_shards,
-                  RGWBucketInfo& new_bucket_info,
-                  int max_entries,
-                   bool verbose,
-                   ostream *out,
-                  Formatter *formatter)
+int RGWBucketReshard::do_reshard(int num_shards,
+                                RGWBucketInfo& new_bucket_info,
+                                int max_entries,
+                                bool verbose,
+                                ostream *out,
+                                Formatter *formatter)
 {
   rgw_bucket& bucket = bucket_info.bucket;
 
@@ -452,7 +472,7 @@ int RGWBucketReshard::do_reshard(
       ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
       if (ret < 0 && ret != -ENOENT) {
        derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
-       return -ret;
+       return ret;
       }
 
       list<rgw_cls_bi_entry>::iterator iter;
@@ -484,30 +504,30 @@ int RGWBucketReshard::do_reshard(
 
        int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
 
-       ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
+       ret = target_shards_mgr.add_entry(shard_index, entry, account,
+                                         category, stats);
        if (ret < 0) {
          return ret;
        }
+
+       Clock::time_point now = Clock::now();
+       if (now >= lock_renew_thresh) {
+         ret = renew_lock_bucket(now);
+         if (ret < 0) {
+           lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
+           return ret;
+         }
+       }
+
        if (verbose) {
          formatter->close_section();
          if (out) {
            formatter->flush(*out);
-           formatter->flush(*out);
          }
        } else if (out && !(total_entries % 1000)) {
          (*out) << " " << total_entries;
        }
-      }
-      ret = renew_lock_bucket();
-      if (ret < 0) {
-       lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
-       return -ret;
-      }
-      ret = renew_lock_bucket();
-      if (ret < 0) {
-       lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
-       return -ret;
-      }
+      } // entries loop
     }
   }
 
@@ -523,13 +543,13 @@ int RGWBucketReshard::do_reshard(
   ret = target_shards_mgr.finish();
   if (ret < 0) {
     lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
-    return EIO;
+    return -EIO;
   }
 
   ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time);
   if (ret < 0) {
     lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
-    return -ret;
+    return ret;
   }
 
   ret = bucket_info_updater.complete();
@@ -537,8 +557,9 @@ int RGWBucketReshard::do_reshard(
     ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
     /* don't error out, reshard process succeeded */
   }
+
   return 0;
-}
+} // RGWBucketReshard::do_reshard
 
 int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
 {
@@ -567,8 +588,8 @@ int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
 
 
 int RGWBucketReshard::execute(int num_shards, int max_op_entries,
-                              bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log)
-
+                              bool verbose, ostream *out, Formatter *formatter,
+                             RGWReshard* reshard_log)
 {
   int ret = lock_bucket();
   if (ret < 0) {
@@ -619,8 +640,9 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries,
 
 
 RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
-                       Formatter *_formatter) : store(_store), instance_lock(bucket_instance_lock_name),
-                                                verbose(_verbose), out(_out), formatter(_formatter)
+                       Formatter *_formatter) :
+  store(_store), instance_lock(bucket_instance_lock_name),
+  verbose(_verbose), out(_out), formatter(_formatter)
 {
   num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
 }
@@ -814,18 +836,16 @@ int RGWReshard::process_single_logshard(int logshard_num)
   bool truncated = true;
 
   CephContext *cct = store->ctx();
-  int max_entries = 1000;
+  constexpr uint32_t max_entries = 1000;
   int max_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
+  std::chrono::seconds lock_duration(max_secs);
 
   rados::cls::lock::Lock l(reshard_lock_name);
-
-  utime_t time(max_secs, 0);
-  l.set_duration(time);
+  l.set_duration(lock_duration);
 
   char cookie_buf[COOKIE_LEN + 1];
   gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
   cookie_buf[COOKIE_LEN] = '\0';
-
   l.set_cookie(cookie_buf);
 
   string logshard_oid;
@@ -836,8 +856,24 @@ int RGWReshard::process_single_logshard(int logshard_num)
     ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
     return ret;
   }
-
-  utime_t lock_start_time = ceph_clock_now();
+  Clock::time_point lock_start_time = Clock::now();
+  const char* const log_func_name = __func__;
+
+  auto renew_locks_callback =
+    [&l, &lock_start_time, &logshard_oid, &log_func_name, this](const Clock::time_point& now) -> int {
+      l.set_must_renew(true);
+      int ret = l.lock_exclusive(&this->store->reshard_pool_ctx, logshard_oid);
+      if (ret < 0) { /* expired or already locked by another processor */
+       ldout(this->store->ctx(), 5) << log_func_name <<
+         "[lambda](): failed to renew lock on " << logshard_oid <<
+         " with " << cpp_strerror(-ret) << dendl;
+       return ret;
+      }
+      lock_start_time = now;
+      ldout(this->store->ctx(), 20) << log_func_name <<
+       "[lambda](): successfully renewed lock on " << logshard_oid << dendl;
+      return 0;
+    };
 
   do {
     std::list<cls_rgw_reshard_entry> entries;
@@ -847,28 +883,30 @@ int RGWReshard::process_single_logshard(int logshard_num)
       continue;
     }
 
-    for(auto& entry: entries) {
+    for(auto& entry: entries) { // logshard entries
       if(entry.new_instance_id.empty()) {
 
-       ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name  << dendl;
+       ldout(store->ctx(), 20) << __func__ << " resharding " <<
+         entry.bucket_name  << dendl;
 
        RGWObjectCtx obj_ctx(store);
        rgw_bucket bucket;
        RGWBucketInfo bucket_info;
        map<string, bufferlist> attrs;
 
-       ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
-                                   &attrs);
+       ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name,
+                                    bucket_info, nullptr, &attrs);
        if (ret < 0) {
          ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
          return -ret;
        }
 
-       RGWBucketReshard br(store, bucket_info, attrs, l, lock_start_time);
+       RGWBucketReshard br(store, bucket_info, attrs, renew_locks_callback);
 
        Formatter* formatter = new JSONFormatter(false);
        auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
-       ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this);
+       ret = br.execute(entry.new_num_shards, max_entries, true, nullptr,
+                        formatter, this);
        if (ret < 0) {
          ldout (store->ctx(), 0) <<  __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
            cpp_strerror(-ret)<< dendl;
@@ -883,24 +921,16 @@ int RGWReshard::process_single_logshard(int logshard_num)
                       << cpp_strerror(-ret) << dendl;
          return ret;
        }
-       ret = br.renew_lock_bucket();
+      }
+
+      Clock::time_point now = Clock::now();
+      if (now >= lock_start_time + lock_duration / 2) {
+       ret = renew_locks_callback(now);
        if (ret < 0) {
-         ldout(cct, 0)<< __func__ << ":Error renewing bucket " << entry.bucket_name << " lock: "
-                      << cpp_strerror(-ret) << dendl;
          return ret;
        }
       }
-      utime_t now = ceph_clock_now();
-
-      if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */
-        l.set_must_renew(true);
-        ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
-        if (ret == -EBUSY) { /* already locked by another processor */
-          ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
-          return ret;
-        }
-        lock_start_time = now;
-      }
+
       entry.get_key(&marker);
     }
   } while (truncated);
index eeaf901f7a2c2832a6187d901c1bc5ef848d93df..ffff5f29a2aedda64d2d5dc8093b2546f1648f9f 100644 (file)
@@ -5,30 +5,45 @@
 #define RGW_RESHARD_H
 
 #include <vector>
+#include <functional>
+
 #include "include/rados/librados.hpp"
+#include "common/ceph_time.h"
 #include "cls/rgw/cls_rgw_types.h"
 #include "cls/lock/cls_lock_client.h"
 #include "rgw_bucket.h"
 
+
 class CephContext;
 class RGWRados;
 
-
 class RGWBucketReshard {
+public:
+
   friend class RGWReshard;
 
+  using Clock = ceph::coarse_mono_clock;
+
+  // returns 0 for success or a negative error code
+  using RenewLocksCallback = std::function<int(const Clock::time_point&)>;
+
+private:
+
   RGWRados *store;
   RGWBucketInfo bucket_info;
   std::map<string, bufferlist> bucket_attrs;
 
   string reshard_oid;
   rados::cls::lock::Lock reshard_lock;
-  utime_t lock_start_time;
-  bool locked_bucket;
+  Clock::time_point lock_start_time;
+  std::chrono::seconds lock_duration;
+  Clock::time_point lock_renew_thresh;
+
+  RenewLocksCallback renew_locks_callback;
 
   int lock_bucket();
   void unlock_bucket();
-  int renew_lock_bucket();
+  int renew_lock_bucket(const Clock::time_point&);
   int set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status);
   int clear_resharding();
 
@@ -40,22 +55,24 @@ class RGWBucketReshard {
                  ostream *os,
                 Formatter *formatter);
 public:
-  RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
-                   const std::map<string, bufferlist>& _bucket_attrs);
 
+  // pass nullptr for the final parameter if no callback is used
   RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
                    const std::map<string, bufferlist>& _bucket_attrs,
-                  rados::cls::lock::Lock& reshard_lock, const utime_t& lock_start_time);
-
+                  RenewLocksCallback _renew_locks_callback);
   int execute(int num_shards, int max_op_entries,
               bool verbose = false, ostream *out = nullptr,
               Formatter *formatter = nullptr,
              RGWReshard *reshard_log = nullptr);
   int get_status(std::list<cls_rgw_bucket_instance_entry> *status);
   int cancel();
-};
+}; // RGWBucketReshard
 
 class RGWReshard {
+public:
+    using Clock = ceph::coarse_mono_clock;
+
+private:
     RGWRados *store;
     string lock_name;
     rados::cls::lock::Lock instance_lock;