]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: clean up interenal shard logic in ordered bucket list
authorJ. Eric Ivancich <ivancich@redhat.com>
Sat, 2 Nov 2019 21:54:54 +0000 (17:54 -0400)
committerJ. Eric Ivancich <ivancich@redhat.com>
Wed, 19 Feb 2020 19:30:31 +0000 (14:30 -0500)
The logic of RGWRados::cls_bucket_list_ordered is complex as it needs
to query multiple bucket index shards and manage their returned lists,
and walk through them in a manner to produce an appropriately ordered
set of dir entries to the caller. The previous implementation used
three of parallel std::vectors (e.g., the related data was spread
across the vectors), which needed to each be maintained
appropriately. This clean-up changes this to a single vector of
structs, and the struct both consolidates the related data and
encapsulates much of the related logic.

Additionally, const correctness is expanded.

Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
src/cls/rgw/cls_rgw_types.h
src/rgw/rgw_rados.cc

index b97243570641e154d9e6b363960b09c436b32bc1..0bd197ae856ad61c0d6c4c832e19a6e7dc400292 100644 (file)
@@ -1258,4 +1258,4 @@ struct cls_rgw_reshard_entry
 };
 WRITE_CLASS_ENCODER(cls_rgw_reshard_entry)
 
-#endif
\ No newline at end of file
+#endif
index 85f20574257f93b52c122766ef58240565b4570d..b1dd5c84b732ff5e6fd64f34781db7c79b9c0a6d 100644 (file)
@@ -8198,14 +8198,15 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
   // key   - oid (for different shards if there is any)
   // value - list result for the corresponding oid (shard), it is filled by
   //         the AIO callback
-  map<int, string> oids;
+  map<int, string> shard_oids;
   int r = svc.bi_rados->open_bucket_index(bucket_info, shard_id,
-                                         &index_pool, &oids, nullptr);
+                                         &index_pool, &shard_oids,
+                                         nullptr);
   if (r < 0) {
     return r;
   }
 
-  const uint32_t shard_count = oids.size();
+  const uint32_t shard_count = shard_oids.size();
   uint32_t num_entries_per_shard;
   if (expansion_factor == 0) {
     num_entries_per_shard =
@@ -8226,29 +8227,61 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
     num_entries << " total entries" << dendl;
 
   auto& ioctx = index_pool.ioctx();
-  map<int, struct rgw_cls_list_ret> list_results;
+  map<int, rgw_cls_list_ret> shard_list_results;
   cls_rgw_obj_key start_after_key(start_after.name, start_after.instance);
   r = CLSRGWIssueBucketList(ioctx, start_after_key, prefix, delimiter,
                            num_entries_per_shard,
-                           list_versions, oids, list_results,
+                           list_versions, shard_oids, shard_list_results,
                            cct->_conf->rgw_bucket_index_max_aio)();
   if (r < 0) {
     return r;
   }
 
-  // create a list of iterators that are used to iterate each shard
-  vector<RGWRados::ent_map_t::iterator> vcurrents;
-  vector<RGWRados::ent_map_t::iterator> vends;
-  vector<string> vnames;
-  vcurrents.reserve(list_results.size());
-  vends.reserve(list_results.size());
-  vnames.reserve(list_results.size());
-  *is_truncated = false;
-  *cls_filtered = true;
-  for (auto& r : list_results) {
-    vcurrents.push_back(r.second.dir.m.begin());
-    vends.push_back(r.second.dir.m.end());
-    vnames.push_back(oids[r.first]);
+  // to manage the iterators through each shard's list results
+  struct ShardTracker {
+    const size_t shard_idx;
+    rgw_cls_list_ret& result;
+    const std::string& oid_name;
+    RGWRados::ent_map_t::iterator cursor;
+    RGWRados::ent_map_t::iterator end;
+
+    // manages an iterator through a shard and provides other
+    // accessors
+    ShardTracker(size_t _shard_idx,
+                rgw_cls_list_ret& _result,
+                const std::string& _oid_name):
+      shard_idx(_shard_idx),
+      result(_result),
+      oid_name(_oid_name),
+      cursor(_result.dir.m.begin()),
+      end(_result.dir.m.end())
+    {}
+
+    inline const std::string& entry_name() const {
+      return cursor->first;
+    }
+    rgw_bucket_dir_entry& dir_entry() const {
+      return cursor->second;
+    }
+    inline bool is_truncated() const {
+      return result.is_truncated;
+    }
+    inline ShardTracker& advance() {
+      ++cursor;
+      // return a self-reference to allow for chaining of calls, such
+      // as x.advance().at_end()
+      return *this;
+    }
+    inline bool at_end() const {
+      return cursor == end;
+    }
+  }; // ShardTracker
+
+  // one tracker per shard requested (may not be all shards)
+  std::vector<ShardTracker> results_trackers;
+  results_trackers.reserve(shard_list_results.size());
+  for (auto& r : shard_list_results) {
+    results_trackers.emplace_back(r.first, r.second, shard_oids[r.first]);
 
     // if any *one* shard's result is trucated, the entire result is
     // truncated
@@ -8259,30 +8292,41 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
     *cls_filtered = *cls_filtered && r.second.cls_filtered;
   }
 
-  // create a map to track the next candidate entry from each shard,
-  // if the entry from a specified shard is selected/erased, the next
-  // entry from that shard will be inserted for next round selection
+  // create a map to track the next candidate entry from ShardTracker
+  // (key=candidate, value=index into results_trackers); as we consume
+  // entries from shards, we replace them with the next entries in the
+  // shards until we run out
   map<string, size_t> candidates;
-  for (size_t i = 0; i < vcurrents.size(); ++i) {
-    if (vcurrents[i] != vends[i]) {
-      candidates[vcurrents[i]->first] = i;
+  size_t tracker_idx = 0;
+  for (auto& t : results_trackers) {
+    if (!t.at_end()) {
+      // it's important that the values in the map refer to the index
+      // into the results_trackers vector, which may not be the same
+      // as the shard number (i.e., when not all shards are requested)
+      candidates[t.entry_name()] = tracker_idx;
     }
+    ++tracker_idx;
   }
 
+  rgw_bucket_dir_entry*
+    last_entry_visited = nullptr; // to set last_entry (marker)
   map<string, bufferlist> updates;
   uint32_t count = 0;
-  int pos = -1;
   while (count < num_entries && !candidates.empty()) {
     r = 0;
-    // select the next one
-    pos = candidates.begin()->second;
-    const string& name = vcurrents[pos]->first;
-    struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
+    // select the next entry in lexical order (first key in map);
+    // again tracker_idx is not necessarily shard number, but is index
+    // into results_trackers vector
+    tracker_idx = candidates.begin()->second;
+    auto& tracker = results_trackers.at(tracker_idx);
+    last_entry_visited = &tracker.dir_entry();
+    const string& name = tracker.entry_name();
+    rgw_bucket_dir_entry& dirent = tracker.dir_entry();
 
     ldout(cct, 20) << "RGWRados::" << __func__ << " currently processing " <<
-      dirent.key << " from shard " << pos << dendl;
+      dirent.key << " from shard " << tracker.shard_idx << dendl;
 
-    bool force_check =
+    const bool force_check =
       force_check_filter && force_check_filter(dirent.key.name);
 
     if ((!dirent.exists &&
@@ -8296,7 +8340,7 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
       librados::IoCtx sub_ctx;
       sub_ctx.dup(ioctx);
       r = check_disk_state(sub_ctx, bucket_info, dirent, dirent,
-                          updates[vnames[pos]], y);
+                          updates[tracker.oid_name], y);
       if (r < 0 && r != -ENOENT) {
        return r;
       }
@@ -8316,9 +8360,9 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
 
     // refresh the candidates map
     candidates.erase(candidates.begin());
-    if (++vcurrents[pos] != vends[pos]) { // note: pre-increment
-      candidates[vcurrents[pos]->first] = pos;
-    } else if (list_results[pos].is_truncated) {
+    if (! tracker.advance().at_end()) {
+      candidates[tracker.entry_name()] = tracker_idx;
+    } else if (tracker.is_truncated()) {
       // once we exhaust one shard that is truncated, we need to stop,
       // as we cannot be certain that one of the next entries needs to
       // come from that shard; S3 and swift protocols allow returning
@@ -8340,10 +8384,11 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
     }
   } // updates loop
 
+  // determine truncation by checking if all the returned entries are
+  // consumed or not
   *is_truncated = false;
-  // check if all the returned entries are consumed or not
-  for (size_t i = 0; i < vcurrents.size(); ++i) {
-    if (vcurrents[i] != vends[i] || list_results[i].is_truncated) {
+  for (const auto& t : results_trackers) {
+    if (!t.at_end() || t.is_truncated()) {
       *is_truncated = true;
       break;
     }
@@ -8359,8 +8404,9 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
       count << ", which is truncated" << dendl;
   }
 
-  if (pos >= 0) {
-    *last_entry = std::move((--vcurrents[pos])->first);
+  if (last_entry_visited != nullptr) {
+    // since we'll not need this any more, might as well move it...
+    *last_entry = std::move(last_entry_visited->key);
     ldout(cct, 20) << "RGWRados::" << __func__ <<
       ": returning, last_entry=" << *last_entry << dendl;
   } else {