}
}
+ constexpr int allowed_read_attempts = 2;
string skip_after_delim;
- while (truncated && count <= max) {
+ for (int attempt = 0; attempt < allowed_read_attempts; ++attempt) {
std::map<string, rgw_bucket_dir_entry> ent_map;
int r = store->cls_bucket_list_ordered(target->get_bucket_info(),
shard_id,
result->emplace_back(std::move(entry));
count++;
- }
+ } // eiter for loop
if (!params.delim.empty()) {
int marker_delim_pos = cur_marker.name.find(params.delim, cur_prefix.size());
}
}
}
- }
+
+ // if we finished listing, or if we're returning at least half the
+ // requested entries, that's enough; S3 and swift protocols allow
+ // returning fewer than max entries
+ if (!truncated || count >= max / 2) {
+ break;
+ }
+
+ ldout(cct, 1) << "RGWRados::Bucket::List::" << __func__ <<
+ " INFO ordered bucket listing requires read #" << (2 + attempt) <<
+ dendl;
+ } // read attempt loop
done:
if (is_truncated)
}
+uint32_t RGWRados::calc_ordered_bucket_list_per_shard(uint32_t num_entries,
+ uint32_t num_shards)
+{
+ // We want to minimize the chances that when num_shards >>
+ // num_entries that we return much fewer than num_entries to the
+ // client. Given all the overhead of making a cls call to the osd,
+ // returning a few entries is not much more work than returning one
+ // entry. This minimum might be better tuned based on future
+ // experiments where num_shards >> num_entries. (Note: ">>" should
+ // be interpreted as "much greater than".)
+ constexpr uint32_t min_read = 8;
+
+ // The following is based on _"Balls into Bins" -- A Simple and
+ // Tight Analysis_ by Raab and Steger. We add 1 as a way to handle
+ // cases when num_shards >> num_entries (it almost serves as a
+ // ceiling calculation). We also assume alpha is 1.0 and extract it
+ // from the calculation. Future work could involve memoizing some of
+ // the transcendental functions to minimize repeatedly re-calling
+ // them with the same parameters, which we expect to be the case the
+ // majority of the time.
+ uint32_t calc_read =
+ 1 +
+ static_cast<uint32_t>((num_entries / num_shards) +
+ sqrt((2 * num_entries) *
+ log(num_shards) / num_shards));
+
+ return std::max(min_read, calc_read);
+}
+
+
int RGWRados::cls_bucket_list_ordered(RGWBucketInfo& bucket_info,
int shard_id,
const rgw_obj_index_key& start,
// value - list result for the corresponding oid (shard), it is filled by
// the AIO callback
map<int, string> oids;
- map<int, struct rgw_cls_list_ret> list_results;
int r = open_bucket_index(bucket_info, index_ctx, oids, shard_id);
- if (r < 0)
+ if (r < 0) {
return r;
+ }
+
+ const uint32_t shard_count = oids.size();
+ const uint32_t num_entries_per_shard =
+ calc_ordered_bucket_list_per_shard(num_entries, shard_count);
+
+ ldout(cct, 10) << __func__ << " request from each of " << shard_count <<
+ " shard(s) for " << num_entries_per_shard << " entries to get " <<
+ num_entries << " total entries" << dendl;
+ map<int, struct rgw_cls_list_ret> list_results;
cls_rgw_obj_key start_key(start.name, start.instance);
- r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries,
+ r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries_per_shard,
list_versions, oids, list_results,
cct->_conf->rgw_bucket_index_max_aio)();
- if (r < 0)
+ if (r < 0) {
return r;
+ }
// Create a list of iterators that are used to iterate each shard
vector<map<string, struct rgw_bucket_dir_entry>::iterator> vcurrents;
vcurrents.reserve(list_results.size());
vends.reserve(list_results.size());
vnames.reserve(list_results.size());
- map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
- *is_truncated = false;
- for (; iter != list_results.end(); ++iter) {
- vcurrents.push_back(iter->second.dir.m.begin());
- vends.push_back(iter->second.dir.m.end());
- vnames.push_back(oids[iter->first]);
- *is_truncated = (*is_truncated || iter->second.is_truncated);
+ for (auto& iter : list_results) {
+ vcurrents.push_back(iter.second.dir.m.begin());
+ vends.push_back(iter.second.dir.m.end());
+ vnames.push_back(oids[iter.first]);
}
- // Create a map to track the next candidate entry from each shard, if the entry
- // from a specified shard is selected/erased, the next entry from that shard will
- // be inserted for next round selection
+ // create a map to track the next candidate entry from each shard,
+ // if the entry from a specified shard is selected/erased, the next
+ // entry from that shard will be inserted for next round selection
map<string, size_t> candidates;
for (size_t i = 0; i < vcurrents.size(); ++i) {
if (vcurrents[i] != vends[i]) {
if ((!dirent.exists && !dirent.is_delete_marker()) ||
!dirent.pending_map.empty() ||
force_check) {
- /* there are uncommitted ops. We need to check the current state,
- * and if the tags are old we need to do cleanup as well. */
+ /* there are uncommitted ops. We need to check the current
+ * state, and if the tags are old we need to do clean-up as
+ * well. */
librados::IoCtx sub_ctx;
sub_ctx.dup(index_ctx);
r = check_disk_state(sub_ctx, bucket_info, dirent, dirent,
updates[vnames[pos]]);
if (r < 0 && r != -ENOENT) {
- return r;
+ return r;
}
} else {
- r = 0;
+ r = 0;
}
if (r >= 0) {
ldout(cct, 10) << "RGWRados::cls_bucket_list_ordered: got " <<
++count;
}
- // Refresh the candidates map
+ // refresh the candidates map
candidates.erase(candidates.begin());
- ++vcurrents[pos];
- if (vcurrents[pos] != vends[pos]) {
+ if (++vcurrents[pos] != vends[pos]) { // note: pre-increment
candidates[vcurrents[pos]->first] = pos;
+ } else if (list_results[pos].is_truncated) {
+ // once we exhaust one shard that is truncated, we need to stop,
+ // as we cannot be certain that one of the next entries needs to
+ // come from that shard; S3 and swift protocols allow returning
+ // fewer than what was requested
+ break;
}
- }
+ } // while we haven't provided requested # of result entries
- // Suggest updates if there is any
- map<string, bufferlist>::iterator miter = updates.begin();
- for (; miter != updates.end(); ++miter) {
- if (miter->second.length()) {
+ // suggest updates if there is any
+ for (auto& miter : updates) {
+ if (miter.second.length()) {
ObjectWriteOperation o;
- cls_rgw_suggest_changes(o, miter->second);
+ cls_rgw_suggest_changes(o, miter.second);
// we don't care if we lose suggested updates, send them off blindly
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- index_ctx.aio_operate(miter->first, c, &o);
+ index_ctx.aio_operate(miter.first, c, &o);
c->release();
}
- }
+ } // updates loop
- // Check if all the returned entries are consumed or not
+ *is_truncated = false;
+ // check if all the returned entries are consumed or not
for (size_t i = 0; i < vcurrents.size(); ++i) {
- if (vcurrents[i] != vends[i]) {
+ if (vcurrents[i] != vends[i] || list_results[i].is_truncated) {
*is_truncated = true;
break;
}
}
- if (pos >= 0)
+ if (*is_truncated && count < num_entries) {
+ ldout(cct, 10) << "RGWRados::" << __func__ <<
+ ": INFO requested " << num_entries << " entries but returning " <<
+ count << ", which is truncated" << dendl;
+ }
+
+ if (pos >= 0) {
*last_entry = std::move((--vcurrents[pos])->first);
+ }
return 0;
}