]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: renew resharding locks to prevent expiration
authorJ. Eric Ivancich <ivancich@redhat.com>
Thu, 27 Sep 2018 17:31:57 +0000 (13:31 -0400)
committerAbhishek Lekshmanan <abhishek@suse.com>
Thu, 29 Nov 2018 12:03:59 +0000 (13:03 +0100)
Fix lock expiration problem with resharding. The resharding process
will renew its bucket lock (and logshard lock if necessary) when half
the remaining time is left on the lock. If the lock is expired and
cannot renew the process fails and errors out appropriately.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
(cherry picked from commit 8cebffa1d8ad4df6fdae4e10e782aad0753545ce)

src/cls/lock/cls_lock_client.h
src/cls/lock/cls_lock_ops.h
src/cls/lock/cls_lock_types.h
src/rgw/rgw_admin.cc
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index cf3886eebf20d114d03a2d640c594f10091d974f..e2c21cc9e5b059621169f44a40e973c8087bd3d8 100644 (file)
@@ -4,6 +4,8 @@
 #ifndef CEPH_CLS_LOCK_CLIENT_H
 #define CEPH_CLS_LOCK_CLIENT_H
 
+#include <chrono>
+
 #include "cls/lock/cls_lock_types.h"
 
 namespace librados {
index dbdddfe21407d5182673e4599c689f630fc76dec..b9388e78877c73ddc61801f069ce9f8505c57f49 100644 (file)
@@ -1,3 +1,6 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
 #ifndef CEPH_CLS_LOCK_OPS_H
 #define CEPH_CLS_LOCK_OPS_H
 
index d00d6a3398f5b5dcfcbc217164d3359db5da7032..cc2e5617a0a6c2ca7f69d7fa6455da6440cbf068 100644 (file)
@@ -1,3 +1,6 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
 #ifndef CEPH_CLS_LOCK_TYPES_H
 #define CEPH_CLS_LOCK_TYPES_H
 
index 311596b8820adcf192c610269a8d28ee28d7b92f..73b998a8ac245139761344df039eade5adbca1be 100644 (file)
@@ -5909,7 +5909,7 @@ next:
       return ret;
     }
 
-    RGWBucketReshard br(store, bucket_info, attrs);
+    RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */);
 
 #define DEFAULT_RESHARD_MAX_ENTRIES 1000
     if (max_entries < 1) {
@@ -5995,7 +5995,6 @@ next:
     return 0;
   }
 
-
   if (opt_cmd == OPT_RESHARD_STATUS) {
     if (bucket_name.empty()) {
       cerr << "ERROR: bucket not specified" << std::endl;
@@ -6011,11 +6010,12 @@ next:
       return -ret;
     }
 
-    RGWBucketReshard br(store, bucket_info, attrs);
+    RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */);
     list<cls_rgw_bucket_instance_entry> status;
     int r = br.get_status(&status);
     if (r < 0) {
-      cerr << "ERROR: could not get resharding status for bucket " << bucket_name << std::endl;
+      cerr << "ERROR: could not get resharding status for bucket " <<
+       bucket_name << std::endl;
       return -r;
     }
 
@@ -6048,7 +6048,7 @@ next:
       return -ret;
     }
 
-    RGWBucketReshard br(store, bucket_info, attrs);
+    RGWBucketReshard br(store, bucket_info, attrs, nullptr /* no callback */);
     int ret = br.cancel();
     if (ret < 0) {
       if (ret == -EBUSY) {
index 1cb8d518ed742050026ef570e5e6da9bcdde530d..7a34af8489cff943cc7fc7c446aa2b4f6d6f628e 100644 (file)
@@ -66,8 +66,10 @@ class BucketReshardShard {
 public:
   BucketReshardShard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
                      int _num_shard,
-                     deque<librados::AioCompletion *>& _completions) : store(_store), bucket_info(_bucket_info), bs(store),
-                                                                       aio_completions(_completions) {
+                     deque<librados::AioCompletion *>& _completions) :
+    store(_store), bucket_info(_bucket_info), bs(store),
+    aio_completions(_completions)
+  {
     num_shard = (bucket_info.num_shards > 0 ? _num_shard : -1);
     bs.init(bucket_info.bucket, num_shard);
   }
@@ -92,8 +94,10 @@ public:
         return ret;
       }
     }
+
     return 0;
   }
+
   int flush() {
     if (entries.size() == 0) {
       return 0;
@@ -157,14 +161,20 @@ public:
     }
   }
 
+  /*
+   * did_flush is set if not nullptr and a flush occurred; otherwise not altered
+   */
   int add_entry(int shard_index,
                 rgw_cls_bi_entry& entry, bool account, uint8_t category,
                 const rgw_bucket_category_stats& entry_stats) {
-    int ret = target_shards[shard_index]->add_entry(entry, account, category, entry_stats);
+    int ret = target_shards[shard_index]->add_entry(entry, account, category,
+                                                   entry_stats);
     if (ret < 0) {
-      derr << "ERROR: target_shards.add_entry(" << entry.idx << ") returned error: " << cpp_strerror(-ret) << dendl;
+      derr << "ERROR: target_shards.add_entry(" << entry.idx <<
+       ") returned error: " << cpp_strerror(-ret) << dendl;
       return ret;
     }
+
     return 0;
   }
 
@@ -190,13 +200,21 @@ public:
   }
 };
 
-RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
-                                                     store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
-                                                     reshard_lock(reshard_lock_name), locked_bucket(false) {
+RGWBucketReshard::RGWBucketReshard(RGWRados *_store,
+                                  const RGWBucketInfo& _bucket_info,
+                                  const map<string, bufferlist>& _bucket_attrs,
+                                  RenewLocksCallback _renew_locks_callback) :
+  store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
+  reshard_lock(reshard_lock_name),
+  renew_locks_callback(_renew_locks_callback)
+{
   const rgw_bucket& b = bucket_info.bucket;
-  reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
+  reshard_oid = b.get_key(':');
+
+  const int lock_dur_secs =
+    store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
+  lock_duration = std::chrono::seconds(lock_dur_secs);
 
-  utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0);
 #define COOKIE_LEN 16
   char cookie_buf[COOKIE_LEN + 1];
   gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
@@ -206,23 +224,17 @@ RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucke
   reshard_lock.set_duration(lock_duration);
 }
 
-RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs, rados::cls::lock::Lock& _reshard_lock, const utime_t& _lock_start_time) :
-                                                     store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
-                                                     reshard_lock(_reshard_lock), lock_start_time(_lock_start_time), locked_bucket(true)
-{
-  const rgw_bucket& b = bucket_info.bucket;
-  reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
-}
-
 int RGWBucketReshard::lock_bucket()
 {
+  reshard_lock.set_must_renew(false);
   int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
   if (ret < 0) {
     ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl;
     return ret;
   }
-  lock_start_time = ceph_clock_now();
-  locked_bucket = true;
+  lock_start_time = Clock::now();
+  lock_renew_thresh = lock_start_time + lock_duration / 2;
+
   return 0;
 }
 
@@ -232,27 +244,31 @@ void RGWBucketReshard::unlock_bucket()
   if (ret < 0) {
     ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
   }
-  locked_bucket = false;
 }
 
-
-int RGWBucketReshard::renew_lock_bucket()
+int RGWBucketReshard::renew_lock_bucket(const Clock::time_point& now)
 {
-  if (!locked_bucket) {
-    return 0;
-  }
-
-  utime_t now = ceph_clock_now();
- /* do you need to renew lock? */
-  if (now > lock_start_time + store->ctx()->_conf->rgw_reshard_bucket_lock_duration/ 2) {
-    reshard_lock.set_must_renew(true);
-    int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
-    if (ret == -EBUSY) { /* already locked by another processor */
-      ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << reshard_oid << dendl;
+  // assume outer locks have timespans at least the size of ours, so
+  // can call inside conditional
+  if (renew_locks_callback) {
+    int ret = renew_locks_callback(now);
+    if (ret < 0) {
       return ret;
     }
-    lock_start_time = now;
   }
+
+  reshard_lock.set_must_renew(true);
+  int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
+  if (ret < 0) { /* expired or already locked by another processor */
+    ldout(store->ctx(), 5) << __func__ << "(): failed to renew lock on " <<
+      reshard_oid << " with " << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+  lock_start_time = now;
+  lock_renew_thresh = lock_start_time + lock_duration / 2;
+  ldout(store->ctx(), 20) << __func__ << "(): successfully renewed lock on " <<
+    reshard_oid << dendl;
+
   return 0;
 }
 
@@ -358,12 +374,17 @@ class BucketInfoReshardUpdate
   }
 
 public:
-  BucketInfoReshardUpdate(RGWRados *_store, RGWBucketInfo& _bucket_info,
-                          map<string, bufferlist>& _bucket_attrs, const string& new_bucket_id) : store(_store), 
-                                                                                                 bucket_info(_bucket_info),
-                                                                                                 bucket_attrs(_bucket_attrs) {
+  BucketInfoReshardUpdate(RGWRados *_store,
+                         RGWBucketInfo& _bucket_info,
+                          map<string, bufferlist>& _bucket_attrs,
+                         const string& new_bucket_id) :
+    store(_store),
+    bucket_info(_bucket_info),
+    bucket_attrs(_bucket_attrs)
+  {
     bucket_info.new_bucket_instance_id = new_bucket_id;
   }
+
   ~BucketInfoReshardUpdate() {
     if (in_progress) {
       bucket_info.new_bucket_instance_id.clear();
@@ -390,13 +411,12 @@ public:
   }
 };
 
-int RGWBucketReshard::do_reshard(
-                  int num_shards,
-                  RGWBucketInfo& new_bucket_info,
-                  int max_entries,
-                   bool verbose,
-                   ostream *out,
-                  Formatter *formatter)
+int RGWBucketReshard::do_reshard(int num_shards,
+                                RGWBucketInfo& new_bucket_info,
+                                int max_entries,
+                                bool verbose,
+                                ostream *out,
+                                Formatter *formatter)
 {
   rgw_bucket& bucket = bucket_info.bucket;
 
@@ -453,7 +473,7 @@ int RGWBucketReshard::do_reshard(
       ret = store->bi_list(bucket, i, string(), marker, max_entries, &entries, &is_truncated);
       if (ret < 0 && ret != -ENOENT) {
        derr << "ERROR: bi_list(): " << cpp_strerror(-ret) << dendl;
-       return -ret;
+       return ret;
       }
 
       list<rgw_cls_bi_entry>::iterator iter;
@@ -485,30 +505,30 @@ int RGWBucketReshard::do_reshard(
 
        int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
 
-       ret = target_shards_mgr.add_entry(shard_index, entry, account, category, stats);
+       ret = target_shards_mgr.add_entry(shard_index, entry, account,
+                                         category, stats);
        if (ret < 0) {
          return ret;
        }
+
+       Clock::time_point now = Clock::now();
+       if (now >= lock_renew_thresh) {
+         ret = renew_lock_bucket(now);
+         if (ret < 0) {
+           lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
+           return ret;
+         }
+       }
+
        if (verbose) {
          formatter->close_section();
          if (out) {
            formatter->flush(*out);
-           formatter->flush(*out);
          }
        } else if (out && !(total_entries % 1000)) {
          (*out) << " " << total_entries;
        }
-      }
-      ret = renew_lock_bucket();
-      if (ret < 0) {
-       lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
-       return -ret;
-      }
-      ret = renew_lock_bucket();
-      if (ret < 0) {
-       lderr(store->ctx()) << "Error renewing bucket lock: " << ret << dendl;
-       return -ret;
-      }
+      } // entries loop
     }
   }
 
@@ -524,13 +544,13 @@ int RGWBucketReshard::do_reshard(
   ret = target_shards_mgr.finish();
   if (ret < 0) {
     lderr(store->ctx()) << "ERROR: failed to reshard" << dendl;
-    return EIO;
+    return -EIO;
   }
 
   ret = rgw_link_bucket(store, new_bucket_info.owner, new_bucket_info.bucket, bucket_info.creation_time);
   if (ret < 0) {
     lderr(store->ctx()) << "failed to link new bucket instance (bucket_id=" << new_bucket_info.bucket.bucket_id << ": " << cpp_strerror(-ret) << ")" << dendl;
-    return -ret;
+    return ret;
   }
 
   ret = bucket_info_updater.complete();
@@ -538,8 +558,9 @@ int RGWBucketReshard::do_reshard(
     ldout(store->ctx(), 0) << __func__ << ": failed to update bucket info ret=" << ret << dendl;
     /* don't error out, reshard process succeeded */
   }
+
   return 0;
-}
+} // RGWBucketReshard::do_reshard
 
 int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
 {
@@ -568,8 +589,8 @@ int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
 
 
 int RGWBucketReshard::execute(int num_shards, int max_op_entries,
-                              bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log)
-
+                              bool verbose, ostream *out, Formatter *formatter,
+                             RGWReshard* reshard_log)
 {
   int ret = lock_bucket();
   if (ret < 0) {
@@ -620,8 +641,9 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries,
 
 
 RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
-                       Formatter *_formatter) : store(_store), instance_lock(bucket_instance_lock_name),
-                                                verbose(_verbose), out(_out), formatter(_formatter)
+                       Formatter *_formatter) :
+  store(_store), instance_lock(bucket_instance_lock_name),
+  verbose(_verbose), out(_out), formatter(_formatter)
 {
   num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
 }
@@ -815,18 +837,16 @@ int RGWReshard::process_single_logshard(int logshard_num)
   bool truncated = true;
 
   CephContext *cct = store->ctx();
-  int max_entries = 1000;
+  constexpr uint32_t max_entries = 1000;
   int max_secs = store->ctx()->_conf->rgw_reshard_bucket_lock_duration;
+  std::chrono::seconds lock_duration(max_secs);
 
   rados::cls::lock::Lock l(reshard_lock_name);
-
-  utime_t time(max_secs, 0);
-  l.set_duration(time);
+  l.set_duration(lock_duration);
 
   char cookie_buf[COOKIE_LEN + 1];
   gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
   cookie_buf[COOKIE_LEN] = '\0';
-
   l.set_cookie(cookie_buf);
 
   string logshard_oid;
@@ -837,8 +857,24 @@ int RGWReshard::process_single_logshard(int logshard_num)
     ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
     return ret;
   }
-
-  utime_t lock_start_time = ceph_clock_now();
+  Clock::time_point lock_start_time = Clock::now();
+  const char* const log_func_name = __func__;
+
+  auto renew_locks_callback =
+    [&l, &lock_start_time, &logshard_oid, &log_func_name, this](const Clock::time_point& now) -> int {
+      l.set_must_renew(true);
+      int ret = l.lock_exclusive(&this->store->reshard_pool_ctx, logshard_oid);
+      if (ret < 0) { /* expired or already locked by another processor */
+       ldout(this->store->ctx(), 5) << log_func_name <<
+         "[lambda](): failed to renew lock on " << logshard_oid <<
+         " with " << cpp_strerror(-ret) << dendl;
+       return ret;
+      }
+      lock_start_time = now;
+      ldout(this->store->ctx(), 20) << log_func_name <<
+       "[lambda](): successfully renewed lock on " << logshard_oid << dendl;
+      return 0;
+    };
 
   do {
     std::list<cls_rgw_reshard_entry> entries;
@@ -848,28 +884,30 @@ int RGWReshard::process_single_logshard(int logshard_num)
       continue;
     }
 
-    for(auto& entry: entries) {
+    for(auto& entry: entries) { // logshard entries
       if(entry.new_instance_id.empty()) {
 
-       ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name  << dendl;
+       ldout(store->ctx(), 20) << __func__ << " resharding " <<
+         entry.bucket_name  << dendl;
 
        RGWObjectCtx obj_ctx(store);
        rgw_bucket bucket;
        RGWBucketInfo bucket_info;
        map<string, bufferlist> attrs;
 
-       ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
-                                   &attrs);
+       ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name,
+                                    bucket_info, nullptr, &attrs);
        if (ret < 0) {
          ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
          return -ret;
        }
 
-       RGWBucketReshard br(store, bucket_info, attrs, l, lock_start_time);
+       RGWBucketReshard br(store, bucket_info, attrs, renew_locks_callback);
 
        Formatter* formatter = new JSONFormatter(false);
        auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
-       ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this);
+       ret = br.execute(entry.new_num_shards, max_entries, true, nullptr,
+                        formatter, this);
        if (ret < 0) {
          ldout (store->ctx(), 0) <<  __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
            cpp_strerror(-ret)<< dendl;
@@ -884,24 +922,16 @@ int RGWReshard::process_single_logshard(int logshard_num)
                       << cpp_strerror(-ret) << dendl;
          return ret;
        }
-       ret = br.renew_lock_bucket();
+      }
+
+      Clock::time_point now = Clock::now();
+      if (now >= lock_start_time + lock_duration / 2) {
+       ret = renew_locks_callback(now);
        if (ret < 0) {
-         ldout(cct, 0)<< __func__ << ":Error renewing bucket " << entry.bucket_name << " lock: "
-                      << cpp_strerror(-ret) << dendl;
          return ret;
        }
       }
-      utime_t now = ceph_clock_now();
-
-      if (now > lock_start_time + max_secs / 2) { /* do you need to renew lock? */
-        l.set_must_renew(true);
-        ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
-        if (ret == -EBUSY) { /* already locked by another processor */
-          ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
-          return ret;
-        }
-        lock_start_time = now;
-      }
+
       entry.get_key(&marker);
     }
   } while (truncated);
index f9d3bf0720c4c7b140d143df14ea31f9f8f9b703..f8f77abbffdc1ef66228f128a7fbba4fd939db7a 100644 (file)
@@ -5,30 +5,45 @@
 #define RGW_RESHARD_H
 
 #include <vector>
+#include <functional>
+
 #include "include/rados/librados.hpp"
+#include "common/ceph_time.h"
 #include "cls/rgw/cls_rgw_types.h"
 #include "cls/lock/cls_lock_client.h"
 #include "rgw_bucket.h"
 
+
 class CephContext;
 class RGWRados;
 
-
 class RGWBucketReshard {
+public:
+
   friend class RGWReshard;
 
+  using Clock = ceph::coarse_mono_clock;
+
+  // returns 0 for success or a negative error code
+  using RenewLocksCallback = std::function<int(const Clock::time_point&)>;
+
+private:
+
   RGWRados *store;
   RGWBucketInfo bucket_info;
   std::map<string, bufferlist> bucket_attrs;
 
   string reshard_oid;
   rados::cls::lock::Lock reshard_lock;
-  utime_t lock_start_time;
-  bool locked_bucket;
+  Clock::time_point lock_start_time;
+  std::chrono::seconds lock_duration;
+  Clock::time_point lock_renew_thresh;
+
+  RenewLocksCallback renew_locks_callback;
 
   int lock_bucket();
   void unlock_bucket();
-  int renew_lock_bucket();
+  int renew_lock_bucket(const Clock::time_point&);
   int set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status);
   int clear_resharding();
 
@@ -40,22 +55,24 @@ class RGWBucketReshard {
                  ostream *os,
                 Formatter *formatter);
 public:
-  RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
-                   const std::map<string, bufferlist>& _bucket_attrs);
 
+  // pass nullptr for the final parameter if no callback is used
   RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info,
                    const std::map<string, bufferlist>& _bucket_attrs,
-                  rados::cls::lock::Lock& reshard_lock, const utime_t& lock_start_time);
-
+                  RenewLocksCallback _renew_locks_callback);
   int execute(int num_shards, int max_op_entries,
               bool verbose = false, ostream *out = nullptr,
               Formatter *formatter = nullptr,
              RGWReshard *reshard_log = nullptr);
   int get_status(std::list<cls_rgw_bucket_instance_entry> *status);
   int cancel();
-};
+}; // RGWBucketReshard
 
 class RGWReshard {
+public:
+    using Clock = ceph::coarse_mono_clock;
+
+private:
     RGWRados *store;
     string lock_name;
     rados::cls::lock::Lock instance_lock;