]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: de-conflate shard_id and request_id in CLSRGWConcurrentIO
authorJ. Eric Ivancich <ivancich@redhat.com>
Fri, 16 Jul 2021 19:31:35 +0000 (15:31 -0400)
committerDan van der Ster <daniel.vanderster@cern.ch>
Fri, 18 Feb 2022 21:21:24 +0000 (22:21 +0100)
When using asynchronous (concurrent) IO for bucket index requests,
there are two int ids that are used that need to be kept separate --
shard id and request id. In many cases they're the same -- shard 0
gets request 0, and so forth.

But in preparation for re-requests, those ids can diverge, where
request 13 maps to shard 2. The existing code maintained the OIDs that
went with each request. This PR also maintains the shard id as
well. Documentation has been beefed up to help future developers
navigate this.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
(cherry picked from commit 9606346592dfd6261aa2daa4cbec56f9a72c65fc)

Conflicts:
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h

In all cases I took the patch code, not mangling anything.
These were all cases of std:: or auto type.

src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h

index 397dcf2f2941d1376c301648b6bfa7ccb3ca2369..414ecbad345eaca306452fa8c91a8917ddf4c46b 100644 (file)
@@ -39,19 +39,19 @@ public:
   }
 };
 
-void BucketIndexAioManager::do_completion(int id) {
+void BucketIndexAioManager::do_completion(const int request_id) {
   std::lock_guard l{lock};
 
-  map<int, librados::AioCompletion*>::iterator iter = pendings.find(id);
+  auto iter = pendings.find(request_id);
   ceph_assert(iter != pendings.end());
-  completions[id] = iter->second;
+  completions[request_id] = iter->second;
   pendings.erase(iter);
 
   // If the caller needs a list of finished objects, store them
   // for further processing
-  map<int, string>::iterator miter = pending_objs.find(id);
+  auto miter = pending_objs.find(request_id);
   if (miter != pending_objs.end()) {
-    completion_objs[id] = miter->second;
+    completion_objs.emplace(request_id, miter->second);
     pending_objs.erase(miter);
   }
 
@@ -77,7 +77,7 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
     if (objs && r == 0) { /* update list of successfully completed objs */
       map<int, string>::iterator liter = completion_objs.find(iter->first);
       if (liter != completion_objs.end()) {
-        (*objs)[liter->first] = liter->second;
+       (*objs)[liter->second.shard_id] = liter->second.oid;
       }
     }
     if (ret_code && (r < 0 && r != valid_ret_code))
@@ -99,38 +99,43 @@ void cls_rgw_bucket_init_index(ObjectWriteOperation& o)
 }
 
 static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
+                                      const int shard_id,
                                       const string& oid,
                                       BucketIndexAioManager *manager) {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.create(true);
   op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
 static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
+                                       const int shard_id,
                                        const string& oid,
                                        BucketIndexAioManager *manager) {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.remove();
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
 static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
-    const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
+                                           const int shard_id,
+                                           const string& oid,
+                                           uint64_t timeout,
+                                           BucketIndexAioManager *manager) {
   bufferlist in;
   rgw_cls_tag_timeout_op call;
   call.tag_timeout = timeout;
   encode(call, in);
   ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_index_init_op(io_ctx, oid, &manager);
+  return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager);
 }
 
 void CLSRGWIssueBucketIndexInit::cleanup()
@@ -141,14 +146,14 @@ void CLSRGWIssueBucketIndexInit::cleanup()
   }
 }
 
-int CLSRGWIssueBucketIndexClean::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_index_clean_op(io_ctx, oid, &manager);
+  return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager);
 }
 
-int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
+  return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager);
 }
 
 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
@@ -229,10 +234,11 @@ void cls_rgw_bucket_list_op(librados::ObjectReadOperation& op,
 }
 
 static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
-                                const string& oid,
+                                const int shard_id,
+                                const std::string& oid,
                                 const cls_rgw_obj_key& start_obj,
-                                const string& filter_prefix,
-                                const string& delimiter,
+                                const std::string& filter_prefix,
+                                const std::string& delimiter,
                                 uint32_t num_entries,
                                 bool list_versions,
                                 BucketIndexAioManager *manager,
@@ -242,12 +248,12 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
   cls_rgw_bucket_list_op(op,
                         start_obj, filter_prefix, delimiter,
                          num_entries, list_versions, pdata);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_list_op(io_ctx, oid,
+  return issue_bucket_list_op(io_ctx, shard_id, oid,
                              start_obj, filter_prefix, delimiter,
                              num_entries, list_versions, &manager,
                              &result[shard_id]);
@@ -371,7 +377,7 @@ int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
   return 0;
 }
 
-int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, 
+int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid,
                             const cls_rgw_obj_key& key, bufferlist& olh_tag,
                             bool delete_marker, const string& op_tag, rgw_bucket_dir_entry_meta *meta,
                             uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace)
@@ -508,17 +514,17 @@ void cls_rgw_bilog_list(librados::ObjectReadOperation& op,
   op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx<cls_rgw_bi_log_list_ret>(pdata, ret));
 }
 
-static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
                                  BucketIndexShardsManager& marker_mgr, uint32_t max,
                                  BucketIndexAioManager *manager,
                                  cls_rgw_bi_log_list_ret *pdata)
 {
   librados::ObjectReadOperation op;
   cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid)
 {
   return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
 }
@@ -536,46 +542,46 @@ void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op,
   op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
 }
 
-static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
                               BucketIndexShardsManager& start_marker_mgr,
                               BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
   cls_rgw_bi_log_trim_op call;
   librados::ObjectWriteOperation op;
   cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""),
                      end_marker_mgr.get(shard_id, ""));
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
 {
   return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
 }
 
-static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
     rgw_cls_check_index_ret *pdata) {
   bufferlist in;
   librados::ObjectReadOperation op;
   op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx<rgw_cls_check_index_ret>(
         pdata, NULL));
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
 int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
 {
-  return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
+  return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
 }
 
-static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
     BucketIndexAioManager *manager) {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
+  return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager);
 }
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
@@ -589,40 +595,40 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
   o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates);
 }
 
-int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid)
 {
   cls_rgw_obj_key empty_key;
   string empty_prefix;
   string empty_delimiter;
-  return issue_bucket_list_op(io_ctx, oid,
+  return issue_bucket_list_op(io_ctx, shard_id, oid,
                              empty_key, empty_prefix, empty_delimiter,
                              0, false, &manager, &result[shard_id]);
 }
 
-static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager)
+static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
 {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueResyncBucketBILog::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid)
 {
-  return issue_resync_bi_log(io_ctx, oid, &manager);
+  return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
 }
 
-static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager)
+static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
 {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
-  return manager->aio_operate(io_ctx, oid, &op); 
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketBILogStop::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bi_log_stop(io_ctx, oid, &manager);
+  return issue_bi_log_stop(io_ctx, shard_id, oid, &manager);
 }
 
 class GetDirHeaderCompletion : public ObjectOperationCompletion {
@@ -1064,7 +1070,8 @@ void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err)
   op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in);
 }
 
-static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
+static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
+                                       const int shard_id, const string& oid,
                                         const cls_rgw_bucket_instance_entry& entry,
                                         BucketIndexAioManager *manager) {
   bufferlist in;
@@ -1073,10 +1080,10 @@ static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& o
   encode(call, in);
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueSetBucketResharding::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid)
 {
-  return issue_set_bucket_resharding(io_ctx, oid, entry, &manager);
+  return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager);
 }
index 0fbc47f230d0a2287d0a254d2083dafd12a928f9..43225b9dbe61be5ca20309a7712b6bade9153685 100644 (file)
@@ -28,16 +28,33 @@ struct BucketIndexAioArg : public RefCountedObject {
 };
 
 /*
- * This class manages AIO completions. This class is not completely thread-safe,
- * methods like *get_next* is not thread-safe and is expected to be called from
- * within one thread.
+ * This class manages AIO completions. This class is not completely
+ * thread-safe, methods like *get_next_request_id* is not thread-safe
+ * and is expected to be called from within one thread.
  */
 class BucketIndexAioManager {
+public:
+
+  // allows us to reaccess the shard id and shard's oid during and
+  // after the asynchronous call is made
+  struct RequestObj {
+    int shard_id;
+    std::string oid;
+
+    RequestObj(int _shard_id, const std::string& _oid) :
+      shard_id(_shard_id), oid(_oid)
+    {/* empty */}
+  };
+
+
 private:
-  map<int, librados::AioCompletion*> pendings;
-  map<int, librados::AioCompletion*> completions;
-  map<int, string> pending_objs;
-  map<int, string> completion_objs;
+  // NB: the following 4 maps use the request_id as the key; this
+  // is not the same as the shard_id!
+  std::map<int, librados::AioCompletion*> pendings;
+  std::map<int, librados::AioCompletion*> completions;
+  std::map<int, const RequestObj> pending_objs;
+  std::map<int, const RequestObj> completion_objs;
+
   int next = 0;
   ceph::mutex lock = ceph::make_mutex("BucketIndexAioManager::lock");
   ceph::condition_variable cond;
@@ -55,8 +72,8 @@ private:
    *
    * Return next request ID.
    */
-  int get_next() { return next++; }
-    
+  int get_next_request_id() { return next++; }
+
   /*
    * Add a new pending AIO completion instance.
    *
@@ -65,10 +82,11 @@ private:
    * @param oid        - the object id associated with the object, if it is NULL, we don't
    *                     track the object id per callback.
    */
-  void add_pending(int id, librados::AioCompletion* completion, const string& oid) {
-    pendings[id] = completion;
-    pending_objs[id] = oid;
+  void add_pending(int request_id, librados::AioCompletion* completion, const int shard_id, const std::string& oid) {
+    pendings[request_id] = completion;
+    pending_objs.emplace(request_id, RequestObj(shard_id, oid));
   }
+
 public:
   /*
    * Create a new instance.
@@ -78,7 +96,7 @@ public:
   /*
    * Do completion for the given AIO request.
    */
-  void do_completion(int id);
+  void do_completion(int request_id);
 
   /*
    * Wait for AIO completions.
@@ -96,13 +114,14 @@ public:
   /**
    * Do aio read operation.
    */
-  bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) {
+  bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectReadOperation *op) {
     std::lock_guard l{lock};
-    BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+    const int request_id = get_next_request_id();
+    BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this);
     librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb);
     int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
     if (r >= 0) {
-      add_pending(arg->id, c, oid);
+      add_pending(arg->id, c, shard_id, oid);
     } else {
       arg->put();
       c->release();
@@ -113,13 +132,14 @@ public:
   /**
    * Do aio write operation.
    */
-  bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) {
+  bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectWriteOperation *op) {
     std::lock_guard l{lock};
-    BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+    const int request_id = get_next_request_id();
+    BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this);
     librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb);
     int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
     if (r >= 0) {
-      add_pending(arg->id, c, oid);
+      add_pending(arg->id, c, shard_id, oid);
     } else {
       arg->put();
       c->release();
@@ -434,9 +454,11 @@ class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
   string delimiter;
   uint32_t num_entries;
   bool list_versions;
-  map<int, rgw_cls_list_ret>& result;
+  std::map<int, rgw_cls_list_ret>& result; // request_id -> return value
+
 protected:
-  int issue_op(int shard_id, const string& oid) override;
+  int issue_op(int shard_id, const std::string& oid) override;
+
 public:
   CLSRGWIssueBucketList(librados::IoCtx& io_ctx,
                        const cls_rgw_obj_key& _start_obj,
@@ -444,8 +466,9 @@ public:
                        const string& _delimiter,
                        uint32_t _num_entries,
                         bool _list_versions,
-                        map<int, string>& oids,
-                        map<int, rgw_cls_list_ret>& list_results,
+                        std::map<int, std::string>& oids, // shard_id -> shard_oid
+                       // shard_id -> return value
+                        std::map<int, rgw_cls_list_ret>& list_results,
                         uint32_t max_aio) :
   CLSRGWConcurrentIO(io_ctx, oids, max_aio),
     start_obj(_start_obj), filter_prefix(_filter_prefix), delimiter(_delimiter),