From 55db9a282b627de94bfb3d9a233693e6dba73619 Mon Sep 17 00:00:00 2001 From: "J. Eric Ivancich" Date: Sat, 2 Nov 2019 17:54:54 -0400 Subject: [PATCH] rgw: clean up interenal shard logic in ordered bucket list The logic of RGWRados::cls_bucket_list_ordered is complex as it needs to query multiple bucket index shards and manage their returned lists, and walk through them in a manner to produce an appropriately ordered set of dir entries to the caller. The previous implementation used three of parallel std::vectors (e.g., the related data was spread across the vectors), which needed to each be maintained appropriately. This clean-up changes this to a single vector of structs, and the struct both consolidates the related data and encapsulates much of the related logic. Additionally, const correctness is expanded. Signed-off-by: J. Eric Ivancich --- src/cls/rgw/cls_rgw_types.h | 2 +- src/rgw/rgw_rados.cc | 126 ++++++++++++++++++++++++------------ 2 files changed, 87 insertions(+), 41 deletions(-) diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index b9724357064..0bd197ae856 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -1258,4 +1258,4 @@ struct cls_rgw_reshard_entry }; WRITE_CLASS_ENCODER(cls_rgw_reshard_entry) -#endif \ No newline at end of file +#endif diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 85f20574257..b1dd5c84b73 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -8198,14 +8198,15 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info, // key - oid (for different shards if there is any) // value - list result for the corresponding oid (shard), it is filled by // the AIO callback - map oids; + map shard_oids; int r = svc.bi_rados->open_bucket_index(bucket_info, shard_id, - &index_pool, &oids, nullptr); + &index_pool, &shard_oids, + nullptr); if (r < 0) { return r; } - const uint32_t shard_count = oids.size(); + const uint32_t shard_count = shard_oids.size(); uint32_t num_entries_per_shard; if (expansion_factor == 0) { num_entries_per_shard = @@ -8226,29 +8227,61 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info, num_entries << " total entries" << dendl; auto& ioctx = index_pool.ioctx(); - map list_results; + map shard_list_results; cls_rgw_obj_key start_after_key(start_after.name, start_after.instance); r = CLSRGWIssueBucketList(ioctx, start_after_key, prefix, delimiter, num_entries_per_shard, - list_versions, oids, list_results, + list_versions, shard_oids, shard_list_results, cct->_conf->rgw_bucket_index_max_aio)(); if (r < 0) { return r; } - // create a list of iterators that are used to iterate each shard - vector vcurrents; - vector vends; - vector vnames; - vcurrents.reserve(list_results.size()); - vends.reserve(list_results.size()); - vnames.reserve(list_results.size()); - *is_truncated = false; - *cls_filtered = true; - for (auto& r : list_results) { - vcurrents.push_back(r.second.dir.m.begin()); - vends.push_back(r.second.dir.m.end()); - vnames.push_back(oids[r.first]); + // to manage the iterators through each shard's list results + struct ShardTracker { + const size_t shard_idx; + rgw_cls_list_ret& result; + const std::string& oid_name; + RGWRados::ent_map_t::iterator cursor; + RGWRados::ent_map_t::iterator end; + + // manages an iterator through a shard and provides other + // accessors + ShardTracker(size_t _shard_idx, + rgw_cls_list_ret& _result, + const std::string& _oid_name): + shard_idx(_shard_idx), + result(_result), + oid_name(_oid_name), + cursor(_result.dir.m.begin()), + end(_result.dir.m.end()) + {} + + inline const std::string& entry_name() const { + return cursor->first; + } + rgw_bucket_dir_entry& dir_entry() const { + return cursor->second; + } + inline bool is_truncated() const { + return result.is_truncated; + } + inline ShardTracker& advance() { + ++cursor; + // return a self-reference to allow for chaining of calls, such + // as x.advance().at_end() + return *this; + } + inline bool at_end() const { + return cursor == end; + } + }; // ShardTracker + + // one tracker per shard requested (may not be all shards) + std::vector results_trackers; + results_trackers.reserve(shard_list_results.size()); + for (auto& r : shard_list_results) { + results_trackers.emplace_back(r.first, r.second, shard_oids[r.first]); // if any *one* shard's result is trucated, the entire result is // truncated @@ -8259,30 +8292,41 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info, *cls_filtered = *cls_filtered && r.second.cls_filtered; } - // create a map to track the next candidate entry from each shard, - // if the entry from a specified shard is selected/erased, the next - // entry from that shard will be inserted for next round selection + // create a map to track the next candidate entry from ShardTracker + // (key=candidate, value=index into results_trackers); as we consume + // entries from shards, we replace them with the next entries in the + // shards until we run out map candidates; - for (size_t i = 0; i < vcurrents.size(); ++i) { - if (vcurrents[i] != vends[i]) { - candidates[vcurrents[i]->first] = i; + size_t tracker_idx = 0; + for (auto& t : results_trackers) { + if (!t.at_end()) { + // it's important that the values in the map refer to the index + // into the results_trackers vector, which may not be the same + // as the shard number (i.e., when not all shards are requested) + candidates[t.entry_name()] = tracker_idx; } + ++tracker_idx; } + rgw_bucket_dir_entry* + last_entry_visited = nullptr; // to set last_entry (marker) map updates; uint32_t count = 0; - int pos = -1; while (count < num_entries && !candidates.empty()) { r = 0; - // select the next one - pos = candidates.begin()->second; - const string& name = vcurrents[pos]->first; - struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second; + // select the next entry in lexical order (first key in map); + // again tracker_idx is not necessarily shard number, but is index + // into results_trackers vector + tracker_idx = candidates.begin()->second; + auto& tracker = results_trackers.at(tracker_idx); + last_entry_visited = &tracker.dir_entry(); + const string& name = tracker.entry_name(); + rgw_bucket_dir_entry& dirent = tracker.dir_entry(); ldout(cct, 20) << "RGWRados::" << __func__ << " currently processing " << - dirent.key << " from shard " << pos << dendl; + dirent.key << " from shard " << tracker.shard_idx << dendl; - bool force_check = + const bool force_check = force_check_filter && force_check_filter(dirent.key.name); if ((!dirent.exists && @@ -8296,7 +8340,7 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info, librados::IoCtx sub_ctx; sub_ctx.dup(ioctx); r = check_disk_state(sub_ctx, bucket_info, dirent, dirent, - updates[vnames[pos]], y); + updates[tracker.oid_name], y); if (r < 0 && r != -ENOENT) { return r; } @@ -8316,9 +8360,9 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info, // refresh the candidates map candidates.erase(candidates.begin()); - if (++vcurrents[pos] != vends[pos]) { // note: pre-increment - candidates[vcurrents[pos]->first] = pos; - } else if (list_results[pos].is_truncated) { + if (! tracker.advance().at_end()) { + candidates[tracker.entry_name()] = tracker_idx; + } else if (tracker.is_truncated()) { // once we exhaust one shard that is truncated, we need to stop, // as we cannot be certain that one of the next entries needs to // come from that shard; S3 and swift protocols allow returning @@ -8340,10 +8384,11 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info, } } // updates loop + // determine truncation by checking if all the returned entries are + // consumed or not *is_truncated = false; - // check if all the returned entries are consumed or not - for (size_t i = 0; i < vcurrents.size(); ++i) { - if (vcurrents[i] != vends[i] || list_results[i].is_truncated) { + for (const auto& t : results_trackers) { + if (!t.at_end() || t.is_truncated()) { *is_truncated = true; break; } @@ -8359,8 +8404,9 @@ int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info, count << ", which is truncated" << dendl; } - if (pos >= 0) { - *last_entry = std::move((--vcurrents[pos])->first); + if (last_entry_visited != nullptr) { + // since we'll not need this any more, might as well move it... + *last_entry = std::move(last_entry_visited->key); ldout(cct, 20) << "RGWRados::" << __func__ << ": returning, last_entry=" << *last_entry << dendl; } else { -- 2.39.5