const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
+
+int CLSRGWConcurrentIO::operator()() {
+ int ret = 0;
+ iter = objs_container.begin();
+ for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
+ ret = issue_op(iter->first, iter->second);
+ if (ret < 0)
+ break;
+ }
+
+ int num_completions = 0, r = 0;
+ std::map<int, std::string> completed_objs;
+ std::map<int, std::string> retry_objs;
+ while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r,
+ need_multiple_rounds() ? &completed_objs : nullptr,
+ !need_multiple_rounds() ? &retry_objs : nullptr)) {
+ if (r >= 0 && ret >= 0) {
+ for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+ int issue_ret = issue_op(iter->first, iter->second);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+
+ // if we're at the end with this round, see if another round is needed
+ if (iter == objs_container.end()) {
+ if (need_multiple_rounds() && !completed_objs.empty()) {
+ // For those objects which need another round, use them to reset
+ // the container
+ reset_container(completed_objs);
+ iter = objs_container.begin();
+ } else if (! need_multiple_rounds() && !retry_objs.empty()) {
+ reset_container(retry_objs);
+ iter = objs_container.begin();
+ }
+
+ // re-issue ops if container was reset above (i.e., iter !=
+ // objs_container.end()); if it was not reset above (i.e., iter
+ // == objs_container.end()) the loop will exit immediately
+ // without iterating
+ for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
+ int issue_ret = issue_op(iter->first, iter->second);
+ if (issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ }
+ }
+
+ if (ret < 0) {
+ cleanup();
+ }
+ return ret;
+} // CLSRGWConcurrintIO::operator()()
+
+
/**
* This class represents the bucket index object operation callback context.
*/
ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); }
~ClsBucketIndexOpCtx() override {}
void handle_completion(int r, bufferlist& outbl) override {
- if (r >= 0) {
+ // if successful, or we're asked for a retry, copy result into
+ // destination (*data)
+ if (r >= 0 || r == RGWBIAdvanceAndRetryError) {
try {
auto iter = outbl.cbegin();
decode((*data), iter);
}
bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
- int *num_completions, int *ret_code, map<int, string> *objs) {
+ int *num_completions,
+ int *ret_code,
+ std::map<int, std::string> *completed_objs,
+ std::map<int, std::string> *retry_objs)
+{
std::unique_lock locker{lock};
if (pendings.empty() && completions.empty()) {
return false;
auto iter = completions.begin();
for (; iter != completions.end(); ++iter) {
int r = iter->second->get_return_value();
- if (objs && r == 0) { /* update list of successfully completed objs */
+
+ // see if we may need to copy completions or retries
+ if (completed_objs || retry_objs) {
auto liter = completion_objs.find(iter->first);
if (liter != completion_objs.end()) {
- (*objs)[liter->second.shard_id] = liter->second.oid;
+ if (completed_objs && r == 0) { /* update list of successfully completed objs */
+ (*completed_objs)[liter->second.shard_id] = liter->second.oid;
+ }
+
+ if (r == RGWBIAdvanceAndRetryError) {
+ r = 0;
+ if (retry_objs) {
+ (*retry_objs)[liter->second.shard_id] = liter->second.oid;
+ }
+ }
+ } else {
+ // NB: should we log an error here; currently no logging
+ // context to use
}
}
- if (ret_code && (r < 0 && r != valid_ret_code))
+
+ if (ret_code && (r < 0 && r != valid_ret_code)) {
(*ret_code) = r;
+ }
+
iter->second->release();
}
- if (num_completions)
+
+ if (num_completions) {
(*num_completions) = completions.size();
+ }
+
completions.clear();
return true;
*
* Return false if there is no pending AIO, true otherwise.
*/
- bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
- std::map<int, std::string> *objs);
+ bool wait_for_completions(int valid_ret_code,
+ int *num_completions = nullptr,
+ int *ret_code = nullptr,
+ std::map<int, std::string> *completed_objs = nullptr,
+ std::map<int, std::string> *retry_objs = nullptr);
/**
* Do aio read operation.
class CLSRGWConcurrentIO {
protected:
librados::IoCtx& io_ctx;
+
+ // map of shard # to oid; the shards that are remaining to be processed
std::map<int, std::string>& objs_container;
+ // iterator to work through objs_container
std::map<int, std::string>::iterator iter;
+
uint32_t max_aio;
BucketIndexAioManager manager;
virtual ~CLSRGWConcurrentIO()
{}
- int operator()() {
- int ret = 0;
- iter = objs_container.begin();
- for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
- ret = issue_op(iter->first, iter->second);
- if (ret < 0)
- break;
- }
+ int operator()();
+}; // class CLSRGWConcurrentIO
- int num_completions = 0, r = 0;
- std::map<int, std::string> objs;
- std::map<int, std::string> *pobjs = (need_multiple_rounds() ? &objs : NULL);
- while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) {
- if (r >= 0 && ret >= 0) {
- for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
- int issue_ret = issue_op(iter->first, iter->second);
- if (issue_ret < 0) {
- ret = issue_ret;
- break;
- }
- }
- } else if (ret >= 0) {
- ret = r;
- }
- if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
- // For those objects which need another round, use them to reset
- // the container
- reset_container(objs);
- iter = objs_container.begin();
- for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
- int issue_ret = issue_op(iter->first, iter->second);
- if (issue_ret < 0) {
- ret = issue_ret;
- break;
- }
- }
- }
- }
-
- if (ret < 0) {
- cleanup();
- }
- return ret;
- }
-};
class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
protected:
int valid_ret_code() override { return -EEXIST; }
void cleanup() override;
public:
- CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, std::map<int, std::string>& _bucket_objs,
- uint32_t _max_aio) :
+ CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc,
+ std::map<int, std::string>& _bucket_objs,
+ uint32_t _max_aio) :
CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
};