]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: de-conflate shard_id and request_id in CLSRGWConcurrentIO
authorJ. Eric Ivancich <ivancich@redhat.com>
Fri, 16 Jul 2021 19:31:35 +0000 (15:31 -0400)
committerJ. Eric Ivancich <ivancich@redhat.com>
Wed, 4 Aug 2021 04:44:06 +0000 (00:44 -0400)
When using asynchronous (concurrent) IO for bucket index requests,
there are two int ids that are used that need to be kept separate --
shard id and request id. In many cases they're the same -- shard 0
gets request 0, and so forth.

But in preparation for re-requests, those ids can diverge, where
request 13 maps to shard 2. The existing code maintained the OIDs that
went with each request. This PR also maintains the shard id as
well. Documentation has been beefed up to help future developers
navigate this.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h

index 73bcbac066e51551e0728fd3e47408436a261474..a93b28bd5deac3f92c33cfd3ccd2c3e3fbb22b10 100644 (file)
@@ -47,19 +47,19 @@ public:
   }
 };
 
-void BucketIndexAioManager::do_completion(int id) {
+void BucketIndexAioManager::do_completion(const int request_id) {
   std::lock_guard l{lock};
 
-  auto iter = pendings.find(id);
+  auto iter = pendings.find(request_id);
   ceph_assert(iter != pendings.end());
-  completions[id] = iter->second;
+  completions[request_id] = iter->second;
   pendings.erase(iter);
 
   // If the caller needs a list of finished objects, store them
   // for further processing
-  auto miter = pending_objs.find(id);
+  auto miter = pending_objs.find(request_id);
   if (miter != pending_objs.end()) {
-    completion_objs[id] = miter->second;
+    completion_objs.emplace(request_id, miter->second);
     pending_objs.erase(miter);
   }
 
@@ -85,7 +85,7 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
     if (objs && r == 0) { /* update list of successfully completed objs */
       auto liter = completion_objs.find(iter->first);
       if (liter != completion_objs.end()) {
-        (*objs)[liter->first] = liter->second;
+       (*objs)[liter->second.shard_id] = liter->second.oid;
       }
     }
     if (ret_code && (r < 0 && r != valid_ret_code))
@@ -107,38 +107,43 @@ void cls_rgw_bucket_init_index(ObjectWriteOperation& o)
 }
 
 static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
+                                      const int shard_id,
                                       const string& oid,
                                       BucketIndexAioManager *manager) {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.create(true);
   op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
 static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
+                                       const int shard_id,
                                        const string& oid,
                                        BucketIndexAioManager *manager) {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.remove();
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
 static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
-    const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
+                                           const int shard_id,
+                                           const string& oid,
+                                           uint64_t timeout,
+                                           BucketIndexAioManager *manager) {
   bufferlist in;
   rgw_cls_tag_timeout_op call;
   call.tag_timeout = timeout;
   encode(call, in);
   ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_index_init_op(io_ctx, oid, &manager);
+  return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager);
 }
 
 void CLSRGWIssueBucketIndexInit::cleanup()
@@ -149,14 +154,14 @@ void CLSRGWIssueBucketIndexInit::cleanup()
   }
 }
 
-int CLSRGWIssueBucketIndexClean::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_index_clean_op(io_ctx, oid, &manager);
+  return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager);
 }
 
-int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
+  return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager);
 }
 
 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
@@ -237,10 +242,11 @@ void cls_rgw_bucket_list_op(librados::ObjectReadOperation& op,
 }
 
 static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
-                                const string& oid,
+                                const int shard_id,
+                                const std::string& oid,
                                 const cls_rgw_obj_key& start_obj,
-                                const string& filter_prefix,
-                                const string& delimiter,
+                                const std::string& filter_prefix,
+                                const std::string& delimiter,
                                 uint32_t num_entries,
                                 bool list_versions,
                                 BucketIndexAioManager *manager,
@@ -250,12 +256,12 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
   cls_rgw_bucket_list_op(op,
                         start_obj, filter_prefix, delimiter,
                          num_entries, list_versions, pdata);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_list_op(io_ctx, oid,
+  return issue_bucket_list_op(io_ctx, shard_id, oid,
                              start_obj, filter_prefix, delimiter,
                              num_entries, list_versions, &manager,
                              &result[shard_id]);
@@ -379,7 +385,7 @@ int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
   return 0;
 }
 
-int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, 
+int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid,
                             const cls_rgw_obj_key& key, bufferlist& olh_tag,
                             bool delete_marker, const string& op_tag, rgw_bucket_dir_entry_meta *meta,
                             uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, rgw_zone_set& zones_trace)
@@ -516,17 +522,17 @@ void cls_rgw_bilog_list(librados::ObjectReadOperation& op,
   op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx<cls_rgw_bi_log_list_ret>(pdata, ret));
 }
 
-static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
                                  BucketIndexShardsManager& marker_mgr, uint32_t max,
                                  BucketIndexAioManager *manager,
                                  cls_rgw_bi_log_list_ret *pdata)
 {
   librados::ObjectReadOperation op;
   cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid)
 {
   return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
 }
@@ -544,46 +550,46 @@ void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op,
   op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
 }
 
-static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
                               BucketIndexShardsManager& start_marker_mgr,
                               BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
   cls_rgw_bi_log_trim_op call;
   librados::ObjectWriteOperation op;
   cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""),
                      end_marker_mgr.get(shard_id, ""));
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
 {
   return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
 }
 
-static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
     rgw_cls_check_index_ret *pdata) {
   bufferlist in;
   librados::ObjectReadOperation op;
   op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx<rgw_cls_check_index_ret>(
         pdata, NULL));
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
 int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
 {
-  return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
+  return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
 }
 
-static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
     BucketIndexAioManager *manager) {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
+  return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager);
 }
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
@@ -597,40 +603,40 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
   o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates);
 }
 
-int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid)
 {
   cls_rgw_obj_key empty_key;
   string empty_prefix;
   string empty_delimiter;
-  return issue_bucket_list_op(io_ctx, oid,
+  return issue_bucket_list_op(io_ctx, shard_id, oid,
                              empty_key, empty_prefix, empty_delimiter,
                              0, false, &manager, &result[shard_id]);
 }
 
-static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager)
+static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
 {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueResyncBucketBILog::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid)
 {
-  return issue_resync_bi_log(io_ctx, oid, &manager);
+  return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
 }
 
-static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager)
+static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
 {
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
-  return manager->aio_operate(io_ctx, oid, &op); 
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueBucketBILogStop::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid)
 {
-  return issue_bi_log_stop(io_ctx, oid, &manager);
+  return issue_bi_log_stop(io_ctx, shard_id, oid, &manager);
 }
 
 class GetDirHeaderCompletion : public ObjectOperationCompletion {
@@ -1072,7 +1078,8 @@ void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err)
   op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in);
 }
 
-static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
+static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
+                                       const int shard_id, const string& oid,
                                         const cls_rgw_bucket_instance_entry& entry,
                                         BucketIndexAioManager *manager) {
   bufferlist in;
@@ -1081,10 +1088,10 @@ static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, const string& o
   encode(call, in);
   librados::ObjectWriteOperation op;
   op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
-  return manager->aio_operate(io_ctx, oid, &op);
+  return manager->aio_operate(io_ctx, shard_id, oid, &op);
 }
 
-int CLSRGWIssueSetBucketResharding::issue_op(int shard_id, const string& oid)
+int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid)
 {
-  return issue_set_bucket_resharding(io_ctx, oid, entry, &manager);
+  return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager);
 }
index b4f0f42349baf2ec608a4e5ca297ce474fdfa401..bcabd51391ff070d8b4f705523b9b14028b80ba9 100644 (file)
@@ -27,16 +27,33 @@ struct BucketIndexAioArg : public RefCountedObject {
 };
 
 /*
- * This class manages AIO completions. This class is not completely thread-safe,
- * methods like *get_next* is not thread-safe and is expected to be called from
- * within one thread.
+ * This class manages AIO completions. This class is not completely
+ * thread-safe, methods like *get_next_request_id* is not thread-safe
+ * and is expected to be called from within one thread.
  */
 class BucketIndexAioManager {
+public:
+
+  // allows us to reaccess the shard id and shard's oid during and
+  // after the asynchronous call is made
+  struct RequestObj {
+    int shard_id;
+    std::string oid;
+
+    RequestObj(int _shard_id, const std::string& _oid) :
+      shard_id(_shard_id), oid(_oid)
+    {/* empty */}
+  };
+
+
 private:
+  // NB: the following 4 maps use the request_id as the key; this
+  // is not the same as the shard_id!
   std::map<int, librados::AioCompletion*> pendings;
   std::map<int, librados::AioCompletion*> completions;
-  std::map<int, std::string> pending_objs;
-  std::map<int, std::string> completion_objs;
+  std::map<int, const RequestObj> pending_objs;
+  std::map<int, const RequestObj> completion_objs;
+
   int next = 0;
   ceph::mutex lock = ceph::make_mutex("BucketIndexAioManager::lock");
   ceph::condition_variable cond;
@@ -54,8 +71,8 @@ private:
    *
    * Return next request ID.
    */
-  int get_next() { return next++; }
-    
+  int get_next_request_id() { return next++; }
+
   /*
    * Add a new pending AIO completion instance.
    *
@@ -64,10 +81,11 @@ private:
    * @param oid        - the object id associated with the object, if it is NULL, we don't
    *                     track the object id per callback.
    */
-  void add_pending(int id, librados::AioCompletion* completion, const std::string& oid) {
-    pendings[id] = completion;
-    pending_objs[id] = oid;
+  void add_pending(int request_id, librados::AioCompletion* completion, const int shard_id, const std::string& oid) {
+    pendings[request_id] = completion;
+    pending_objs.emplace(request_id, RequestObj(shard_id, oid));
   }
+
 public:
   /*
    * Create a new instance.
@@ -77,7 +95,7 @@ public:
   /*
    * Do completion for the given AIO request.
    */
-  void do_completion(int id);
+  void do_completion(int request_id);
 
   /*
    * Wait for AIO completions.
@@ -95,13 +113,14 @@ public:
   /**
    * Do aio read operation.
    */
-  bool aio_operate(librados::IoCtx& io_ctx, const std::string& oid, librados::ObjectReadOperation *op) {
+  bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectReadOperation *op) {
     std::lock_guard l{lock};
-    BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+    const int request_id = get_next_request_id();
+    BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this);
     librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb);
     int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
     if (r >= 0) {
-      add_pending(arg->id, c, oid);
+      add_pending(arg->id, c, shard_id, oid);
     } else {
       arg->put();
       c->release();
@@ -112,13 +131,14 @@ public:
   /**
    * Do aio write operation.
    */
-  bool aio_operate(librados::IoCtx& io_ctx, const std::string& oid, librados::ObjectWriteOperation *op) {
+  bool aio_operate(librados::IoCtx& io_ctx, const int shard_id, const std::string& oid, librados::ObjectWriteOperation *op) {
     std::lock_guard l{lock};
-    BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+    const int request_id = get_next_request_id();
+    BucketIndexAioArg *arg = new BucketIndexAioArg(request_id, this);
     librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, bucket_index_op_completion_cb);
     int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
     if (r >= 0) {
-      add_pending(arg->id, c, oid);
+      add_pending(arg->id, c, shard_id, oid);
     } else {
       arg->put();
       c->release();
@@ -435,9 +455,11 @@ class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
   std::string delimiter;
   uint32_t num_entries;
   bool list_versions;
-  std::map<int, rgw_cls_list_ret>& result;
+  std::map<int, rgw_cls_list_ret>& result; // request_id -> return value
+
 protected:
   int issue_op(int shard_id, const std::string& oid) override;
+
 public:
   CLSRGWIssueBucketList(librados::IoCtx& io_ctx,
                        const cls_rgw_obj_key& _start_obj,
@@ -445,7 +467,8 @@ public:
                        const std::string& _delimiter,
                        uint32_t _num_entries,
                         bool _list_versions,
-                        std::map<int, std::string>& oids,
+                        std::map<int, std::string>& oids, // shard_id -> shard_oid
+                       // shard_id -> return value
                         std::map<int, rgw_cls_list_ret>& list_results,
                         uint32_t max_aio) :
   CLSRGWConcurrentIO(io_ctx, oids, max_aio),