// key - oid (for different shards if there is any)
// value - list result for the corresponding oid (shard), it is filled by
// the AIO callback
- map<int, string> oids;
+ map<int, string> shard_oids;
int r = svc.bi_rados->open_bucket_index(bucket_info, shard_id,
- &index_pool, &oids, nullptr);
+ &index_pool, &shard_oids,
+ nullptr);
if (r < 0) {
return r;
}
- const uint32_t shard_count = oids.size();
+ const uint32_t shard_count = shard_oids.size();
uint32_t num_entries_per_shard;
if (expansion_factor == 0) {
num_entries_per_shard =
num_entries << " total entries" << dendl;
auto& ioctx = index_pool.ioctx();
- map<int, struct rgw_cls_list_ret> list_results;
+ map<int, rgw_cls_list_ret> shard_list_results;
cls_rgw_obj_key start_after_key(start_after.name, start_after.instance);
r = CLSRGWIssueBucketList(ioctx, start_after_key, prefix, delimiter,
num_entries_per_shard,
- list_versions, oids, list_results,
+ list_versions, shard_oids, shard_list_results,
cct->_conf->rgw_bucket_index_max_aio)();
if (r < 0) {
return r;
}
- // create a list of iterators that are used to iterate each shard
- vector<RGWRados::ent_map_t::iterator> vcurrents;
- vector<RGWRados::ent_map_t::iterator> vends;
- vector<string> vnames;
- vcurrents.reserve(list_results.size());
- vends.reserve(list_results.size());
- vnames.reserve(list_results.size());
- *is_truncated = false;
- *cls_filtered = true;
- for (auto& r : list_results) {
- vcurrents.push_back(r.second.dir.m.begin());
- vends.push_back(r.second.dir.m.end());
- vnames.push_back(oids[r.first]);
+ // to manage the iterators through each shard's list results
+ struct ShardTracker {
+ const size_t shard_idx;
+ rgw_cls_list_ret& result;
+ const std::string& oid_name;
+ RGWRados::ent_map_t::iterator cursor;
+ RGWRados::ent_map_t::iterator end;
+
+ // manages an iterator through a shard and provides other
+ // accessors
+ ShardTracker(size_t _shard_idx,
+ rgw_cls_list_ret& _result,
+ const std::string& _oid_name):
+ shard_idx(_shard_idx),
+ result(_result),
+ oid_name(_oid_name),
+ cursor(_result.dir.m.begin()),
+ end(_result.dir.m.end())
+ {}
+
+ inline const std::string& entry_name() const {
+ return cursor->first;
+ }
+ rgw_bucket_dir_entry& dir_entry() const {
+ return cursor->second;
+ }
+ inline bool is_truncated() const {
+ return result.is_truncated;
+ }
+ inline ShardTracker& advance() {
+ ++cursor;
+ // return a self-reference to allow for chaining of calls, such
+ // as x.advance().at_end()
+ return *this;
+ }
+ inline bool at_end() const {
+ return cursor == end;
+ }
+ }; // ShardTracker
+
+ // one tracker per shard requested (may not be all shards)
+ std::vector<ShardTracker> results_trackers;
+ results_trackers.reserve(shard_list_results.size());
+ for (auto& r : shard_list_results) {
+ results_trackers.emplace_back(r.first, r.second, shard_oids[r.first]);
// if any *one* shard's result is trucated, the entire result is
// truncated
*cls_filtered = *cls_filtered && r.second.cls_filtered;
}
- // create a map to track the next candidate entry from each shard,
- // if the entry from a specified shard is selected/erased, the next
- // entry from that shard will be inserted for next round selection
+ // create a map to track the next candidate entry from ShardTracker
+ // (key=candidate, value=index into results_trackers); as we consume
+ // entries from shards, we replace them with the next entries in the
+ // shards until we run out
map<string, size_t> candidates;
- for (size_t i = 0; i < vcurrents.size(); ++i) {
- if (vcurrents[i] != vends[i]) {
- candidates[vcurrents[i]->first] = i;
+ size_t tracker_idx = 0;
+ for (auto& t : results_trackers) {
+ if (!t.at_end()) {
+ // it's important that the values in the map refer to the index
+ // into the results_trackers vector, which may not be the same
+ // as the shard number (i.e., when not all shards are requested)
+ candidates[t.entry_name()] = tracker_idx;
}
+ ++tracker_idx;
}
+ rgw_bucket_dir_entry*
+ last_entry_visited = nullptr; // to set last_entry (marker)
map<string, bufferlist> updates;
uint32_t count = 0;
- int pos = -1;
while (count < num_entries && !candidates.empty()) {
r = 0;
- // select the next one
- pos = candidates.begin()->second;
- const string& name = vcurrents[pos]->first;
- struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
+ // select the next entry in lexical order (first key in map);
+ // again tracker_idx is not necessarily shard number, but is index
+ // into results_trackers vector
+ tracker_idx = candidates.begin()->second;
+ auto& tracker = results_trackers.at(tracker_idx);
+ last_entry_visited = &tracker.dir_entry();
+ const string& name = tracker.entry_name();
+ rgw_bucket_dir_entry& dirent = tracker.dir_entry();
ldout(cct, 20) << "RGWRados::" << __func__ << " currently processing " <<
- dirent.key << " from shard " << pos << dendl;
+ dirent.key << " from shard " << tracker.shard_idx << dendl;
- bool force_check =
+ const bool force_check =
force_check_filter && force_check_filter(dirent.key.name);
if ((!dirent.exists &&
librados::IoCtx sub_ctx;
sub_ctx.dup(ioctx);
r = check_disk_state(sub_ctx, bucket_info, dirent, dirent,
- updates[vnames[pos]], y);
+ updates[tracker.oid_name], y);
if (r < 0 && r != -ENOENT) {
return r;
}
// refresh the candidates map
candidates.erase(candidates.begin());
- if (++vcurrents[pos] != vends[pos]) { // note: pre-increment
- candidates[vcurrents[pos]->first] = pos;
- } else if (list_results[pos].is_truncated) {
+ if (! tracker.advance().at_end()) {
+ candidates[tracker.entry_name()] = tracker_idx;
+ } else if (tracker.is_truncated()) {
// once we exhaust one shard that is truncated, we need to stop,
// as we cannot be certain that one of the next entries needs to
// come from that shard; S3 and swift protocols allow returning
}
} // updates loop
+ // determine truncation by checking if all the returned entries are
+ // consumed or not
*is_truncated = false;
- // check if all the returned entries are consumed or not
- for (size_t i = 0; i < vcurrents.size(); ++i) {
- if (vcurrents[i] != vends[i] || list_results[i].is_truncated) {
+ for (const auto& t : results_trackers) {
+ if (!t.at_end() || t.is_truncated()) {
*is_truncated = true;
break;
}
count << ", which is truncated" << dendl;
}
- if (pos >= 0) {
- *last_entry = std::move((--vcurrents[pos])->first);
+ if (last_entry_visited != nullptr) {
+ // since we'll not need this any more, might as well move it...
+ *last_entry = std::move(last_entry_visited->key);
ldout(cct, 20) << "RGWRados::" << __func__ <<
": returning, last_entry=" << *last_entry << dendl;
} else {