]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: clean up unused bucket index shards
authorJ. Eric Ivancich <ivancich@redhat.com>
Tue, 20 Nov 2018 14:52:39 +0000 (09:52 -0500)
committerAbhishek Lekshmanan <abhishek@suse.com>
Fri, 7 Dec 2018 15:00:17 +0000 (16:00 +0100)
Clean up old bucket index shards when a resharding is complete. Also,
when a resharding fails, clean up unfinished bucket index shards. Do
both clean-ups asynchronously.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
(cherry picked from commit f84f70d451036ee99011b8e2c7b974f15d3a005a)

 Conflicts:
src/rgw/rgw_rados.h
merge conflict as bucket_placement functions were moved after the rgw rados
refactor

src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_reshard.cc

index 2144feb4ffcd79e9e14446f893116c6083deeeca..93ef2b522d16d66ea8ebeb5094a8a038d62ae6cb 100644 (file)
@@ -92,6 +92,7 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
   return true;
 }
 
+// note: currently only called by tesing code
 void cls_rgw_bucket_init_index(ObjectWriteOperation& o)
 {
   bufferlist in;
@@ -99,7 +100,8 @@ void cls_rgw_bucket_init_index(ObjectWriteOperation& o)
 }
 
 static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
-    const string& oid, BucketIndexAioManager *manager) {
+                                      const string& oid,
+                                      BucketIndexAioManager *manager) {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.create(true);
@@ -107,6 +109,15 @@ static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
   return manager->aio_operate(io_ctx, oid, &op);
 }
 
+static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
+                                       const string& oid,
+                                       BucketIndexAioManager *manager) {
+  bufferlist in;
+  librados::ObjectWriteOperation op;
+  op.remove();
+  return manager->aio_operate(io_ctx, oid, &op);
+}
+
 static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
     const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
   bufferlist in;
@@ -126,11 +137,16 @@ int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
 void CLSRGWIssueBucketIndexInit::cleanup()
 {
   // Do best effort removal
-  for (map<int, string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
+  for (auto citer = objs_container.begin(); citer != iter; ++citer) {
     io_ctx.remove(citer->second);
   }
 }
 
+int CLSRGWIssueBucketIndexClean::issue_op(int shard_id, const string& oid)
+{
+  return issue_bucket_index_clean_op(io_ctx, oid, &manager);
+}
+
 int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
 {
   return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
index d3d01ca3cfa8d8366ee565c6ff62ca0def0de1d8..97a950cf0242ca30dd5817b60d2a489f038e643c 100644 (file)
@@ -1,3 +1,6 @@
+// -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
 #ifndef CEPH_CLS_RGW_CLIENT_H
 #define CEPH_CLS_RGW_CLIENT_H
 
@@ -252,9 +255,15 @@ protected:
   virtual void reset_container(map<int, string>& objs) {}
 
 public:
-  CLSRGWConcurrentIO(librados::IoCtx& ioc, map<int, string>& _objs_container,
-                     uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
-  virtual ~CLSRGWConcurrentIO() {}
+
+  CLSRGWConcurrentIO(librados::IoCtx& ioc,
+                    map<int, string>& _objs_container,
+                    uint32_t _max_aio) :
+  io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio)
+  {}
+
+  virtual ~CLSRGWConcurrentIO()
+  {}
 
   int operator()() {
     int ret = 0;
@@ -305,6 +314,23 @@ public:
     CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
 };
 
+
+class CLSRGWIssueBucketIndexClean : public CLSRGWConcurrentIO {
+protected:
+  int issue_op(int shard_id, const string& oid) override;
+  int valid_ret_code() override {
+    return -ENOENT;
+  }
+
+public:
+  CLSRGWIssueBucketIndexClean(librados::IoCtx& ioc,
+                             map<int, string>& _bucket_objs,
+                             uint32_t _max_aio) :
+  CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio)
+  {}
+};
+
+
 class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
   uint64_t tag_timeout;
 protected:
index cf748044476b5c7c76b25800fc0edced3adf9d9a..d97464387e033455cea7ae333796aac7fbc41a16 100644 (file)
@@ -5905,9 +5905,9 @@ int RGWRados::create_pool(const rgw_pool& pool)
 
 int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
 {
-  librados::IoCtx index_ctx; // context for new bucket
+  librados::IoCtx index_ctx;
 
-  string dir_oid =  dir_oid_prefix;
+  string dir_oid = dir_oid_prefix;
   int r = open_bucket_index_ctx(bucket_info, index_ctx);
   if (r < 0) {
     return r;
@@ -5918,7 +5918,29 @@ int RGWRados::init_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
   map<int, string> bucket_objs;
   get_bucket_index_objects(dir_oid, num_shards, bucket_objs);
 
-  return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+  return CLSRGWIssueBucketIndexInit(index_ctx,
+                                   bucket_objs,
+                                   cct->_conf->rgw_bucket_index_max_aio)();
+}
+
+int RGWRados::clean_bucket_index(RGWBucketInfo& bucket_info, int num_shards)
+{
+  librados::IoCtx index_ctx;
+
+  std::string dir_oid = dir_oid_prefix;
+  int r = open_bucket_index_ctx(bucket_info, index_ctx);
+  if (r < 0) {
+    return r;
+  }
+
+  dir_oid.append(bucket_info.bucket.bucket_id);
+
+  std::map<int, std::string> bucket_objs;
+  get_bucket_index_objects(dir_oid, num_shards, bucket_objs);
+
+  return CLSRGWIssueBucketIndexClean(index_ctx,
+                                    bucket_objs,
+                                    cct->_conf->rgw_bucket_index_max_aio)();
 }
 
 void RGWRados::create_bucket_id(string *bucket_id)
index 12d42f6806184cb8ed67f36826482cfe293d31f3..cd462fe20893f865ac511912508ee899a794aae4 100644 (file)
@@ -2612,6 +2612,7 @@ public:
   int create_pool(const rgw_pool& pool);
 
   int init_bucket_index(RGWBucketInfo& bucket_info, int num_shards);
+  int clean_bucket_index(RGWBucketInfo& bucket_info, int num_shards);
   int select_bucket_placement(RGWUserInfo& user_info, const string& zonegroup_id, const string& rule,
                               string *pselected_rule_name, RGWZonePlacementInfo *rule_info);
   int select_legacy_bucket_placement(RGWZonePlacementInfo *rule_info);
index 199bb9c89cab69fc396557db4a7d1bd465508d09..509a7e59626b8a078c2f07267c8b263a8150bfa0 100644 (file)
@@ -137,7 +137,8 @@ public:
     }
     return ret;
   }
-};
+}; // class BucketReshardShard
+
 
 class BucketReshardManager {
   RGWRados *store;
@@ -147,8 +148,12 @@ class BucketReshardManager {
   vector<BucketReshardShard *> target_shards;
 
 public:
-  BucketReshardManager(RGWRados *_store, const RGWBucketInfo& _target_bucket_info, int _num_target_shards) : store(_store), target_bucket_info(_target_bucket_info),
-                                                                                                       num_target_shards(_num_target_shards) {
+  BucketReshardManager(RGWRados *_store,
+                      const RGWBucketInfo& _target_bucket_info,
+                      int _num_target_shards) :
+    store(_store), target_bucket_info(_target_bucket_info),
+    num_target_shards(_num_target_shards)
+  {
     target_shards.resize(num_target_shards);
     for (int i = 0; i < num_target_shards; ++i) {
       target_shards[i] = new BucketReshardShard(store, target_bucket_info, i, completions);
@@ -159,14 +164,12 @@ public:
     for (auto& shard : target_shards) {
       int ret = shard->wait_all_aio();
       if (ret < 0) {
-        ldout(store->ctx(), 20) << __func__ << ": shard->wait_all_aio() returned ret=" << ret << dendl;
+        ldout(store->ctx(), 20) << __func__ <<
+         ": shard->wait_all_aio() returned ret=" << ret << dendl;
       }
     }
   }
 
-  /*
-   * did_flush is set if not nullptr and a flush occurred; otherwise not altered
-   */
   int add_entry(int shard_index,
                 rgw_cls_bi_entry& entry, bool account, uint8_t category,
                 const rgw_bucket_category_stats& entry_stats) {
@@ -201,7 +204,7 @@ public:
     target_shards.clear();
     return ret;
   }
-};
+}; // class BucketReshardManager
 
 RGWBucketReshard::RGWBucketReshard(RGWRados *_store,
                                   const RGWBucketInfo& _bucket_info,
@@ -315,7 +318,8 @@ static int create_new_bucket_instance(RGWRados *store,
 int RGWBucketReshard::create_new_bucket_instance(int new_num_shards,
                                                  RGWBucketInfo& new_bucket_info)
 {
-  return ::create_new_bucket_instance(store, new_num_shards, bucket_info, bucket_attrs, new_bucket_info);
+  return ::create_new_bucket_instance(store, new_num_shards,
+                                     bucket_info, bucket_attrs, new_bucket_info);
 }
 
 int RGWBucketReshard::cancel()
@@ -363,6 +367,7 @@ public:
 
   ~BucketInfoReshardUpdate() {
     if (in_progress) {
+      // resharding must not have ended correctly, clean up
       int ret =
        RGWBucketReshard::clear_index_shard_reshard_status(store, bucket_info);
       if (ret < 0) {
@@ -370,7 +375,7 @@ public:
          " clear_index_shard_status returned " << ret << dendl;
       }
       bucket_info.new_bucket_instance_id.clear();
-      set_status(CLS_RGW_RESHARD_NONE); // saves new_bucket_instance as well
+      set_status(CLS_RGW_RESHARD_NONE); // clears new_bucket_instance as well
     }
   }
 
@@ -477,19 +482,20 @@ int RGWBucketReshard::do_reshard(int num_shards,
   int ret = 0;
 
   if (out) {
-    (*out) << "*** NOTICE: operation will not remove old bucket index objects ***" << std::endl;
-    (*out) << "***         these will need to be removed manually             ***" << std::endl;
     (*out) << "tenant: " << bucket_info.bucket.tenant << std::endl;
     (*out) << "bucket name: " << bucket_info.bucket.name << std::endl;
-    (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id << std::endl;
-    (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id << std::endl;
+    (*out) << "old bucket instance id: " << bucket_info.bucket.bucket_id <<
+      std::endl;
+    (*out) << "new bucket instance id: " << new_bucket_info.bucket.bucket_id <<
+      std::endl;
   }
 
-  /* update bucket info  -- in progress*/
+  /* update bucket info -- in progress*/
   list<rgw_cls_bi_entry> entries;
 
   if (max_entries < 0) {
-    ldout(store->ctx(), 0) << __func__ << ": can't reshard, negative max_entries" << dendl;
+    ldout(store->ctx(), 0) << __func__ <<
+      ": can't reshard, negative max_entries" << dendl;
     return -EINVAL;
   }
 
@@ -519,7 +525,8 @@ int RGWBucketReshard::do_reshard(int num_shards,
     cout << "total entries:";
   }
 
-  int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+  const int num_source_shards =
+    (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
   string marker;
   for (int i = 0; i < num_source_shards; ++i) {
     bool is_truncated = true;
@@ -532,8 +539,7 @@ int RGWBucketReshard::do_reshard(int num_shards,
        return ret;
       }
 
-      list<rgw_cls_bi_entry>::iterator iter;
-      for (iter = entries.begin(); iter != entries.end(); ++iter) {
+      for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
        rgw_cls_bi_entry& entry = *iter;
        if (verbose) {
          formatter->open_object_section("entry");
@@ -624,6 +630,7 @@ int RGWBucketReshard::do_reshard(int num_shards,
   }
 
   return 0;
+  // NB: some error clean-up is done by ~BucketInfoReshardUpdate
 } // RGWBucketReshard::do_reshard
 
 int RGWBucketReshard::get_status(list<cls_rgw_bucket_instance_entry> *status)
@@ -656,6 +663,8 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries,
                               bool verbose, ostream *out, Formatter *formatter,
                              RGWReshard* reshard_log)
 {
+  Clock::time_point now;
+
   int ret = reshard_lock.lock();
   if (ret < 0) {
     return ret;
@@ -664,18 +673,19 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries,
   RGWBucketInfo new_bucket_info;
   ret = create_new_bucket_instance(num_shards, new_bucket_info);
   if (ret < 0) {
-    reshard_lock.unlock();
-    return ret;
+    // shard state is uncertain, but this will attempt to remove them anyway
+    goto error_out;
   }
 
   if (reshard_log) {
     ret = reshard_log->update(bucket_info, new_bucket_info);
     if (ret < 0) {
-      reshard_lock.unlock();
-      return ret;
+      goto error_out;
     }
   }
 
+  // set resharding status of current bucket_info & shards with
+  // information about planned resharding
   ret = set_resharding_status(new_bucket_info.bucket.bucket_id,
                              num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
   if (ret < 0) {
@@ -687,23 +697,65 @@ int RGWBucketReshard::execute(int num_shards, int max_op_entries,
                   new_bucket_info,
                   max_op_entries,
                    verbose, out, formatter);
+  if (ret < 0) {
+    goto error_out;
+  }
+
+  // at this point we've done the main work; we'll make a best-effort
+  // to clean-up but will not indicate any errors encountered
 
+  reshard_lock.unlock();
+
+  // resharding successful, so remove old bucket index shards; use
+  // best effort and don't report out an error; the lock isn't needed
+  // at this point since all we're using a best effor to to remove old
+  // shard objects
+  ret = store->clean_bucket_index(bucket_info, bucket_info.num_shards);
   if (ret < 0) {
-    reshard_lock.unlock();
-    return ret;
+    lderr(store->ctx()) << "Error: " << __func__ <<
+      " failed to clean up old shards; " <<
+      "RGWRados::clean_bucket_index returned " << ret << dendl;
   }
 
-  ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards,
-                             CLS_RGW_RESHARD_DONE);
+  ret = rgw_bucket_instance_remove_entry(store,
+                                        bucket_info.bucket.get_key(),
+                                        nullptr);
   if (ret < 0) {
-    reshard_lock.unlock();
-    return ret;
+    lderr(store->ctx()) << "Error: " << __func__ <<
+      " failed to clean old bucket info object \"" <<
+      bucket_info.bucket.get_key() <<
+      "\"created after successufl resharding with error " << ret << dendl;
   }
 
+  return 0;
+
+error_out:
+
   reshard_lock.unlock();
 
-  return 0;
-}
+  // since the real problem is the issue that led to this error code
+  // path, we won't touch ret and instead use another variable to
+  // temporarily error codes
+  int ret2 = store->clean_bucket_index(new_bucket_info,
+                                      new_bucket_info.num_shards);
+  if (ret2 < 0) {
+    lderr(store->ctx()) << "Error: " << __func__ <<
+      " failed to clean up shards from failed incomplete resharding; " <<
+      "RGWRados::clean_bucket_index returned " << ret2 << dendl;
+  }
+
+  ret2 = rgw_bucket_instance_remove_entry(store,
+                                         new_bucket_info.bucket.get_key(),
+                                         nullptr);
+  if (ret2 < 0) {
+    lderr(store->ctx()) << "Error: " << __func__ <<
+      " failed to clean bucket info object \"" <<
+      new_bucket_info.bucket.get_key() <<
+      "\"created during incomplete resharding with error " << ret2 << dendl;
+  }
+
+  return ret;
+} // execute
 
 
 RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
@@ -714,7 +766,8 @@ RGWReshard::RGWReshard(RGWRados* _store, bool _verbose, ostream *_out,
   num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
 }
 
-string RGWReshard::get_logshard_key(const string& tenant, const string& bucket_name)
+string RGWReshard::get_logshard_key(const string& tenant,
+                                   const string& bucket_name)
 {
   return tenant + ":" + bucket_name;
 }