]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: multiple fixes and adjustments related to resharding scheduler
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 18 May 2017 23:21:19 +0000 (16:21 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:53 +0000 (13:17 -0700)
still wip, but:
 - get rid of some unneeded rados locking
 - rename shards to logshards where relevant to avoid confusion
 - remove unused code

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/common/config_opts.h
src/rgw/rgw_admin.cc
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index 0502a8b1f56969d111fdaf1441569e975347ec58..bfbcece9c2cae0673645750ae814ae034a668bc2 100644 (file)
@@ -1733,7 +1733,8 @@ OPTION(debug_deliberately_leak_memory, OPT_BOOL, false)
 OPTION(rgw_swift_custom_header, OPT_STR, "") // option to enable swift custom headers
 
 /* resharding tunables */
-OPTION(rgw_reshard_max_jobs, OPT_INT, 1024)
+OPTION(rgw_reshard_num_logs, OPT_INT, 16)
 OPTION(rgw_reshard_bucket_lock_duration, OPT_INT, 120) // duration of lock on bucket obj during resharding
 OPTION(rgw_dynamic_resharding, OPT_BOOL, true)
 OPTION(rgw_max_objs_per_shard, OPT_INT, 100000)
+OPTION(rgw_reshard_thread_interval, OPT_U32, 60 * 10) // maximum time between rounds of reshard thread processing
index b0d3380d86dc36e186ba29ed891b2124d4e02eed..0ee585cde5ee815dbda12d62115eeca39f9e145b 100644 (file)
@@ -5626,31 +5626,40 @@ next:
 
   if (opt_cmd == OPT_RESHARD_LIST) {
     list<cls_rgw_reshard_entry> entries;
-    bool is_truncated = true;
-    string marker;
     int ret;
     int count = 0;
     if (max_entries < 0) {
       max_entries = 1000;
     }
 
+    int num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
+
     RGWReshard reshard(store);
 
     formatter->open_array_section("reshard");
-    do {
-      entries.clear();
-      ret = reshard.list(marker, max_entries, entries, is_truncated);
-      if (ret < 0) {
-       cerr << "Error listing resharding buckets: " << cpp_strerror(-ret) << std::endl;
-       return ret;
-      }
-      for (auto iter=entries.begin(); iter != entries.end(); ++iter) {
-       cls_rgw_reshard_entry& entry = *iter;
-       encode_json("entry", entry, formatter);
+    for (int i = 0; i < num_logshards; i++) {
+      bool is_truncated = true;
+      string marker;
+      do {
+        entries.clear();
+        ret = reshard.list(i, marker, max_entries, entries, &is_truncated);
+        if (ret < 0) {
+          cerr << "Error listing resharding buckets: " << cpp_strerror(-ret) << std::endl;
+          return ret;
+        }
+        for (auto iter=entries.begin(); iter != entries.end(); ++iter) {
+          cls_rgw_reshard_entry& entry = *iter;
+          encode_json("entry", entry, formatter);
+        }
+        count += entries.size();
+        formatter->flush(cout);
+#warning marker?
+      } while (is_truncated && count < max_entries);
+
+      if (count >= max_entries) {
+        break;
       }
-      count += entries.size();
-      formatter->flush(cout);
-    } while (is_truncated && count < max_entries);
+    }
 
     formatter->close_section();
     formatter->flush(cout);
index 6376d3a3c7d84a9ddad97920fea4317cf13b5993..7c9e414df10a27ffd70e4c56e717a222c1af9fd8 100644 (file)
@@ -12,7 +12,7 @@
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
 
-const string reshard_oid = "reshard";
+const string reshard_oid_prefix = "reshard";
 const string reshard_lock_name = "reshard_process";
 const string bucket_instance_lock_name = "bucket_instance_lock";
 
@@ -478,126 +478,101 @@ sleep(10);
 
 RGWReshard::RGWReshard(RGWRados* _store): store(_store), instance_lock(bucket_instance_lock_name)
 {
-  max_jobs = store->ctx()->_conf->rgw_reshard_max_jobs;
+  num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
+}
+
+string RGWReshard::get_logshard_key(const string& tenant, const string& bucket_name)
+{
+  return bucket_name + ":" + tenant; /* transposed on purpose */
+}
+
+#define MAX_RESHARD_LOGSHARDS_PRIME 7877
+
+void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
+{
+  string key = get_logshard_key(tenant, bucket_name);
+
+  uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
+  uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
+  sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
+  int logshard = sid % num_logshards;
+
+  get_logshard_oid(logshard, oid);
 }
 
 int RGWReshard::add(cls_rgw_reshard_entry& entry)
 {
-  rados::cls::lock::Lock l(reshard_lock_name);
+  string logshard_oid;
 
-  int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
-  if (ret == -EBUSY) {
-    ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << dendl;
-    return 0;
-  }
-  if (ret < 0)
-    return ret;
+  get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
 
   librados::ObjectWriteOperation op;
   cls_rgw_reshard_add(op, entry);
 
-  ret = store->reshard_pool_ctx.operate(reshard_oid, &op);
-
-  l.unlock(&store->reshard_pool_ctx, reshard_oid);
-  return ret;
+  int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
+  if (ret < 0) {
+    lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
+    return ret;
+  }
+  return 0;
 }
 
 
-int RGWReshard::list(string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool& is_truncated)
+int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
 {
-  rados::cls::lock::Lock l(reshard_lock_name);
-
-  int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid);
-  if (ret == -EBUSY) {
-    ldout(store->ctx(), 0) << "RGWReshard::list failed to acquire lock on " << reshard_oid << dendl;
-    return 0;
-  }
-  if (ret < 0)
-    return ret;
+  string logshard_oid;
 
-  ret =  cls_rgw_reshard_list(store->reshard_pool_ctx, reshard_oid, marker, max, entries, &is_truncated);
+  get_logshard_oid(logshard_num, &logshard_oid);
 
-  l.unlock(&store->reshard_pool_ctx, reshard_oid);
-  return ret;
+  int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
+  if (ret < 0) {
+    lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl;
+    return ret;
+  }
+  return 0;
 }
 
 int RGWReshard::get(cls_rgw_reshard_entry& entry)
 {
-  rados::cls::lock::Lock l(reshard_lock_name);
+  string logshard_oid;
 
-  int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid);
-  if (ret == -EBUSY) {
-    ldout(store->ctx(), 0) << "RGWReshardLog::get failed to acquire lock on " << reshard_oid << dendl;
-    return 0;
-  }
-  if (ret < 0)
-    return ret;
+  get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
 
-  ret = cls_rgw_reshard_get(store->reshard_pool_ctx, reshard_oid, entry);
+  int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry);
+  if (ret < 0) {
+    lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
+    return ret;
+  }
 
-  l.unlock(&store->reshard_pool_ctx, reshard_oid);
-  return ret;
+  return 0;
 }
 
 int RGWReshard::remove(cls_rgw_reshard_entry& entry)
 {
-  rados::cls::lock::Lock l(reshard_lock_name);
+  string logshard_oid;
 
-  int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
-  if (ret == -EBUSY) {
-    ldout(store->ctx(), 0) << "RGWReshardLog::remove failed to acquire lock on " << reshard_oid << dendl;
-    return 0;
-  }
-  if (ret < 0)
-    return ret;
+  get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
 
   librados::ObjectWriteOperation op;
   cls_rgw_reshard_remove(op, entry);
 
-  ret =  store->reshard_pool_ctx.operate(reshard_oid, &op);
+  int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
+  if (ret < 0) {
+    lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
+    return ret;
+  }
 
-  l.unlock(&store->reshard_pool_ctx, reshard_oid);
   return ret;
 }
 
-std::string create_bucket_index_lock_name(const string& bucket_instance_id) {
-  return bucket_instance_lock_name + "." + bucket_instance_id;
-}
-
 int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
 {
-  rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
-
-  int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid);
-  if (ret == -EBUSY) {
-    ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
-    return 0;
-  }
-  if (ret < 0)
+  int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
+  if (ret < 0) {
+    lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
     return ret;
-
-  entry.new_instance_id.clear();
-
-  ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
-
-  l.unlock(&store->reshard_pool_ctx, bucket_instance_oid);
-  return ret;
-}
-
-int RGWReshard::lock_bucket_index_shared(const string& oid)
-{
-  int ret = instance_lock.lock_shared(&store->reshard_pool_ctx, oid);
-  if (ret == -EBUSY) {
-    ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
-    return 0;
   }
 
-  return ret;
-}
-
-int RGWReshard::unlock_bucket_index(const string& oid)
-{
-  instance_lock.unlock(&store->reshard_pool_ctx, oid);
   return 0;
 }
 
@@ -679,32 +654,34 @@ static  int create_new_bucket_instance(RGWRados *store,
   return 0;
 }
 
-int RGWReshard::process_single_shard(const string& shard)
+int RGWReshard::process_single_logshard(int logshard_num)
 {
   string marker;
   bool truncated = false;
 
   CephContext *cct = store->ctx();
   int max_entries = 1000;
-  int max_secs = 10;
-
+  int max_secs = 60;
+  
   rados::cls::lock::Lock l(reshard_lock_name);
 
   utime_t time(max_secs, 0);
   l.set_duration(time);
 
-  int ret = l.lock_exclusive(&store->reshard_pool_ctx, shard);
+  string logshard_oid;
+  get_logshard_oid(logshard_num, &logshard_oid);
+
+  int ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
   if (ret == -EBUSY) { /* already locked by another processor */
-    dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
+    ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
     return ret;
   }
 
   do {
     std::list<cls_rgw_reshard_entry> entries;
-    ret = list(marker, max_entries, entries, truncated);
+    ret = list(logshard_num, marker, max_entries, entries, &truncated);
     if (ret < 0) {
-      ldout(cct, 10) << "cannot list all reshards: " << shard
-                     << dendl;
+      ldout(cct, 10) << "cannot list all reshards in logshard oid=" << logshard_oid << dendl;
       continue;
     }
 
@@ -758,33 +735,35 @@ int RGWReshard::process_single_shard(const string& shard)
        }
       }
     }
+#warning update marker?
+#warning do work here, renew lock
   } while (truncated);
 
-  l.unlock(&store->reshard_pool_ctx, shard);
+  l.unlock(&store->reshard_pool_ctx, logshard_oid);
   return 0;
 }
 
 
-void  RGWReshard::get_shard(int shard_num, string& shard)
+void  RGWReshard::get_logshard_oid(int shard_num, string *logshard)
 {
   char buf[32];
   snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
 
-  string objname("bucket_reshard.");
-  shard =  objname + buf;
+  string objname(reshard_oid_prefix);
+  *logshard =  objname + buf;
 }
 
-int RGWReshard::inspect_all_shards()
+int RGWReshard::inspect_all_logshards()
 {
   int ret = 0;
 
-  for (int i = 0; i < num_shards; i++) {
-    string shard;
-    store->objexp_get_shard(i, shard);
+  for (int i = 0; i < num_logshards; i++) {
+    string logshard;
+    get_logshard_oid(i, &logshard);
 
-    ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;
+    ldout(store->ctx(), 20) << "proceeding logshard = " << logshard << dendl;
 
-    ret = process_single_shard(shard);
+    ret = process_single_logshard(i);
     if (ret <0) {
       return ret;
     }
@@ -820,7 +799,7 @@ void *RGWReshard::ReshardWorker::entry() {
   do {
     utime_t start = ceph_clock_now();
     ldout(cct, 2) << "object expiration: start" << dendl;
-    if (reshard->inspect_all_shards()) {
+    if (reshard->inspect_all_logshards()) {
       /* All shards have been processed properly. Next time we can start
        * from this moment. */
       last_run = start;
@@ -833,7 +812,7 @@ void *RGWReshard::ReshardWorker::entry() {
 
     utime_t end = ceph_clock_now();
     end -= start;
-    int secs = cct->_conf->rgw_objexp_gc_interval;
+    int secs = cct->_conf->rgw_reshard_thread_interval;
 
     if (secs <= end.sec())
       continue; // next round
@@ -854,6 +833,7 @@ void RGWReshard::ReshardWorker::stop()
   cond.Signal();
 }
 
+#if 0
 BucketIndexLockGuard::BucketIndexLockGuard(RGWRados* _store,
                                           const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
   store(_store),
index 1d4366203674c1f5667cb5d90a3900f8b32bf905..d7365810c88b8f2e4f89853601f8b4b78c2b4f30 100644 (file)
@@ -14,6 +14,7 @@ class CephContext;
 class RGWRados;
 
 
+#if 0
 /* gets a locked lock , release it when exiting context */
 class BucketIndexLockGuard
 {
@@ -33,6 +34,7 @@ protected:
   int lock();
   int unlock();
 };
+#endif
 
 
 class RGWBucketReshard {
@@ -70,13 +72,10 @@ public:
 class RGWReshard {
     RGWRados *store;
     string lock_name;
-    int max_jobs;
     rados::cls::lock::Lock instance_lock;
-    int num_shards;
+    int num_logshards;
 
-    int lock_bucket_index_shared(const string& oid);
-    int unlock_bucket_index(const string& oid);
-    void get_shard(int shard_num, string& shard);
+    void get_logshard_oid(int shard_num, string *shard);
 protected:
   class ReshardWorker : public Thread {
     CephContext *cct;
@@ -99,29 +98,32 @@ protected:
   ReshardWorker *worker;
   std::atomic<bool> down_flag = { false };
 
-  public:
-    RGWReshard(RGWRados* _store);
-    int add(cls_rgw_reshard_entry& entry);
-    int get(cls_rgw_reshard_entry& entry);
-    int remove(cls_rgw_reshard_entry& entry);
-    int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
-    int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
-
-    int reshard_bucket(Formatter *formatter,
-                      int num_shards,
-                      rgw_bucket& bucket,
-                      RGWBucketInfo& bucket_info,
-                      RGWBucketInfo& new_bucket_info,
-                      int max_entries,
-                      RGWBucketAdminOpState& bucket_op,
-                      bool verbose = false);
-
-    /* reshard thread */
-    int process_single_shard(const std::string& shard);
-    int inspect_all_shards();
-    bool going_down();
-    void start_processor();
-    void stop_processor();
+  string get_logshard_key(const string& tenant, const string& bucket_name);
+  void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid);
+
+public:
+  RGWReshard(RGWRados* _store);
+  int add(cls_rgw_reshard_entry& entry);
+  int get(cls_rgw_reshard_entry& entry);
+  int remove(cls_rgw_reshard_entry& entry);
+  int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
+  int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
+
+  int reshard_bucket(Formatter *formatter,
+                     int num_shards,
+                     rgw_bucket& bucket,
+                     RGWBucketInfo& bucket_info,
+                     RGWBucketInfo& new_bucket_info,
+                     int max_entries,
+                     RGWBucketAdminOpState& bucket_op,
+                     bool verbose = false);
+
+  /* reshard thread */
+  int process_single_logshard(int logshard_num);
+  int inspect_all_logshards();
+  bool going_down();
+  void start_processor();
+  void stop_processor();
 };