]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: split per-bucket resharding logic out from general resharding
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 9 May 2017 20:41:30 +0000 (13:41 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:43 +0000 (13:17 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index f67701ec27874126d1edf8f8b4e1c190f13a62d2..6e6bdc801ea7148ea0193381b85892da650a929b 100644 (file)
@@ -894,3 +894,21 @@ void cls_rgw_guard_bucket_resharding(librados::ObjectWriteOperation& op, int ret
   ::encode(call, in);
   op.exec("rgw", "guard_bucket_resharding", in);
 }
+
+static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
+                                        const cls_rgw_bucket_instance_entry& entry,
+                                        BucketIndexAioManager *manager) {
+  bufferlist in;
+  struct cls_rgw_set_bucket_resharding_op call;
+  call.entry = entry;
+  ::encode(call, in);
+  librados::ObjectWriteOperation op;
+  op.exec("rgw", "set_bucket_resharding", in);
+  return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueSetBucketResharding::issue_op(int shard_id, const string& oid)
+{
+  return issue_set_bucket_resharding(io_ctx, oid, entry, &manager);
+}
+
index 8472cd0b59d1240c1766ceccf4dceec72c585c74..8db31c4135f9069dd806384c9ebdffeae8deb97c 100644 (file)
@@ -456,6 +456,16 @@ public:
     CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
 };
 
+class CLSRGWIssueSetBucketResharding : public CLSRGWConcurrentIO {
+  cls_rgw_bucket_instance_entry entry;
+protected:
+  int issue_op(int shard_id, const string& oid) override;
+public:
+  CLSRGWIssueSetBucketResharding(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
+                                 const cls_rgw_bucket_instance_entry& entry,
+                                 uint32_t _max_aio) : CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
+};
+
 int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
index e835079f3466cf8bf8fb81565eef946cb4feb209..003c1a0db7c9a1b419f893a4289904da38916ecd 100644 (file)
@@ -8250,6 +8250,19 @@ int RGWRados::bucket_rebuild_index(RGWBucketInfo& bucket_info)
   return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
 }
 
+int RGWRados::bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
+{
+  librados::IoCtx index_ctx;
+  map<int, string> bucket_objs;
+
+  int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
+  if (r < 0) {
+    return r;
+  }
+
+  return CLSRGWIssueSetBucketResharding(index_ctx, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
+}
+
 int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj)
 {
   RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
index daf7be2388898db158205f3042ef6a0182df3477..b79cc209cbc3bafd8fc37208fd90de9a75c6926b 100644 (file)
@@ -2181,6 +2181,7 @@ class RGWRados
   friend class RGWStateLog;
   friend class RGWReplicaLogger;
   friend class RGWReshard;
+  friend class RGWBucketReshard;
   friend class BucketIndexLockGuard;
 
   /** Open the pool used as root for this gateway */
@@ -3427,6 +3428,7 @@ public:
                          map<RGWObjCategory, RGWStorageStats> *existing_stats,
                          map<RGWObjCategory, RGWStorageStats> *calculated_stats);
   int bucket_rebuild_index(RGWBucketInfo& bucket_info);
+  int bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry);
   int remove_objs_from_index(RGWBucketInfo& bucket_info, list<rgw_obj_index_key>& oid_list);
   int move_rados_obj(librados::IoCtx& src_ioctx,
                     const string& src_oid, const string& src_locator,
index 389d1dcca940d69f403bd4240ea273d0506bc29c..7c2fb7a0f7ffeb37a588fd3a6fdee3377300eb33 100644 (file)
@@ -14,6 +14,65 @@ const string reshard_oid = "reshard";
 const string reshard_lock_name = "reshard_process";
 const string bucket_instance_lock_name = "bucket_instance_lock";
 
+RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info) :
+                                                     store(_store), bucket_info(_bucket_info),
+                                                     reshard_lock(reshard_lock_name) {
+  const rgw_bucket& b = bucket_info.bucket;                                                       
+  reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
+}
+
+int RGWBucketReshard::lock_bucket()
+{
+#warning set timeout for guard lock
+
+  int ret = reshard_lock.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << " ret=" << ret << dendl;
+    return ret;
+  }
+  return 0;
+}
+
+void RGWBucketReshard::unlock_bucket()
+{
+  int ret = reshard_lock.unlock(&store->reshard_pool_ctx, reshard_oid);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "WARNING: RGWReshard::add failed to drop lock on " << reshard_oid << " ret=" << ret << dendl;
+  }
+}
+
+int RGWBucketReshard::init_resharding(const cls_rgw_reshard_entry& entry)
+{
+  if (entry.new_instance_id.empty()) {
+    ldout(store->ctx(), 0) << __func__ << " missing new bucket instance id" << dendl;
+    return -EINVAL;
+  }
+
+  cls_rgw_bucket_instance_entry instance_entry;
+  instance_entry.new_bucket_instance_id = entry.new_instance_id;
+
+  int ret = store->bucket_set_reshard(bucket_info, instance_entry);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
+                 << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+  return 0;
+}
+
+int RGWBucketReshard::clear_resharding()
+{
+  cls_rgw_bucket_instance_entry instance_entry;
+  
+  int ret = store->bucket_set_reshard(bucket_info, instance_entry);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "RGWReshard::" << __func__ << " ERROR: error setting bucket resharding flag on bucket index: "
+                 << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+  return 0;
+}
+
 RGWReshard::RGWReshard(CephContext *_cct, RGWRados* _store):cct(_cct), store(_store),
                                                            instance_lock(bucket_instance_lock_name)
 {
@@ -104,38 +163,6 @@ std::string create_bucket_index_lock_name(const string& bucket_instance_id) {
   return bucket_instance_lock_name + "." + bucket_instance_id;
 }
 
-int RGWReshard::set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
-{
-  rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
-
-  if (entry.new_instance_id.empty()) {
-    ldout(cct, 0) << "RGWReshard::" << __func__ << " missing new bucket instance id" << dendl;
-    return -EEXIST;
-  }
-
-  int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid);
-  if (ret == -EBUSY) {
-    ldout(cct, 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
-    return 0;
-  }
-  if (ret < 0)
-    return ret;
-
-  cls_rgw_bucket_instance_entry instance_entry;
-  instance_entry.new_bucket_instance_id = entry.new_instance_id;
-
-  ret =   cls_rgw_set_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid, instance_entry);
-  if (ret < 0) {
-    ldout(cct, 0) << "RGWReshard::" << __func__ << " ERROR: cls_rgw_set_bucket_resharding: "
-                 << cpp_strerror(-ret) << dendl;
-    goto done;
-  }
-
-done:
-  l.unlock(&store->reshard_pool_ctx, bucket_instance_oid);
-  return ret;
-}
-
 int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
 {
   rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
@@ -150,7 +177,7 @@ int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_r
 
   entry.new_instance_id.clear();
 
-  ret =   cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
+  ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
 
   l.unlock(&store->reshard_pool_ctx, bucket_instance_oid);
   return ret;
index 205a7688c14616bff258ee161b969ea5ac3c972a..8f8a46df77849c848b94a4f052ec469684e06e04 100644 (file)
@@ -33,6 +33,25 @@ protected:
   int unlock();
 };
 
+
+class RGWBucketReshard {
+  RGWRados *store;
+  RGWBucketInfo bucket_info;
+
+  string reshard_oid;
+  rados::cls::lock::Lock reshard_lock;
+
+  int lock_bucket();
+  void unlock_bucket();
+  int init_resharding(const cls_rgw_reshard_entry& entry);
+  int clear_resharding();
+public:
+  RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info);
+
+  int reshard();
+  int abort_reshard();
+};
+
 class RGWReshard {
     CephContext *cct;
     RGWRados *store;
@@ -49,7 +68,6 @@ class RGWReshard {
     int get(cls_rgw_reshard_entry& entry);
     int remove(cls_rgw_reshard_entry& entry);
     int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
-    int set_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
     int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
     /*
       if succefull, keeps the bucket index locked. It will be unlocked