completions.push_back(iter->second);
pendings.erase(iter);
+ // If the caller needs a list of finished objects, store them
+ // for further processing
+ map<int, string>::iterator miter = pending_objs.find(id);
+ if (miter != pending_objs.end()) {
+ completion_objs.push_back(miter->second);
+ pending_objs.erase(miter);
+ }
+
cond.Signal();
}
bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
- int *num_completions, int *ret_code) {
+ int *num_completions, int *ret_code, vector<string> *objs) {
lock.Lock();
if (pendings.empty() && completions.empty()) {
lock.Unlock();
// Clear the completed AIOs
list<librados::AioCompletion*>::iterator iter = completions.begin();
- for (; iter != completions.end(); ++iter) {
+ list<string>::iterator liter = completion_objs.begin();
+ for (; iter != completions.end() && liter != completion_objs.end(); ++iter, ++liter) {
int r = (*iter)->get_return_value();
+ if (objs && r == 0) {
+ objs->push_back(*liter);
+ }
if (ret_code && (r < 0 && r != valid_ret_code))
(*ret_code) = r;
(*iter)->release();
return issue_bi_log_list_op(io_ctx, iter->first, marker_mgr, max, &manager, &iter->second);
}
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx,
+ string& oid, BucketIndexShardsManager& start_marker_mgr,
+ BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
+ bufferlist in;
+ cls_rgw_bi_log_trim_op call;
+ call.start_marker = start_marker_mgr.get(oid, "");
+ call.end_marker = end_marker_mgr.get(oid, "");
+ ::encode(call, in);
+ ObjectWriteOperation op;
+ op.exec("rgw", "bi_log_trim", in);
+ return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueBILogTrim::issue_op()
+{
+ return issue_bi_log_trim(io_ctx, *iter, start_marker_mgr, end_marker_mgr, &manager);
+}
+
static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
struct rgw_cls_check_index_ret *pdata) {
bufferlist in;
return 0;
}
-int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker)
-{
- do {
- int r;
- bufferlist in, out;
- cls_rgw_bi_log_trim_op call;
- call.start_marker = start_marker;
- call.end_marker = end_marker;
- ::encode(call, in);
- r = io_ctx.exec(oid, "rgw", "bi_log_trim", in, out);
-
- if (r == -ENODATA)
- break;
-
- if (r < 0)
- return r;
-
- } while (1);
-
- return 0;
-}
-
int cls_rgw_usage_log_read(IoCtx& io_ctx, string& oid, string& user,
uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage,
private:
map<int, librados::AioCompletion*> pendings;
list<librados::AioCompletion*> completions;
+ map<int, string> pending_objs;
+ list<string> completion_objs;
int next;
Mutex lock;
Cond cond;
* Return next request ID.
*/
int get_next() { return next++; }
-
+
/*
* Add a new pending AIO completion instance.
*
* @param id - the request ID.
* @param completion - the AIO completion instance.
+ * @param oid - the object id associated with the object, if it is NULL, we don't
+ * track the object id per callback.
*/
- void add_pending(int id, librados::AioCompletion* completion) {
+ void add_pending(int id, librados::AioCompletion* completion, string *oid = NULL) {
Mutex::Locker l(lock);
pendings[id] = completion;
+ if (oid) {
+ pending_objs[id] = *oid;
+ }
}
public:
/*
* Create a new instance.
*/
- BucketIndexAioManager() : pendings(), completions(), next(0),
- lock("BucketIndexAioManager::lock"), cond() {}
+ BucketIndexAioManager() : pendings(), completions(), pending_objs(), completion_objs(),
+ next(0), lock("BucketIndexAioManager::lock"), cond() {}
+
/*
* Do completion for the given AIO request.
* valid_ret_code - valid AIO return code.
* num_completions - number of completions.
* ret_code - return code of failed AIO.
+ * objs - a list of objects that has been finished the AIO.
*
* Return false if there is no pending AIO, true otherwise.
*/
- bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code);
+ bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
+ vector<string> *objs);
/**
* Do aio read operation.
virtual void cleanup() {}
virtual int valid_ret_code() { return 0; }
+ // Return true if multiple rounds of OPs might be needed, this happens when
+ // OP needs to be re-send until a certain code is returned.
+ virtual bool need_multiple_rounds() { return false; }
+ // Add a new object to the end of the container.
+ virtual void add_object(const string& oid) {}
+ virtual void reset_container(vector<string>& objs) {}
public:
CLSRGWConcurrentIO(librados::IoCtx& ioc, T& _objs_container,
}
int num_completions, r = 0;
- while (manager.wait_for_completions(0, &num_completions, &r)) {
+ vector<string> objs;
+ while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, &objs)) {
if (r >= 0 && ret >= 0) {
for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
int issue_ret = issue_op();
} else if (ret >= 0) {
ret = r;
}
+ if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
+ // For those objects which need another round, use them to reset
+ // the container
+ reset_container(objs);
+ }
}
if (ret < 0) {
marker_mgr(_marker_mgr), max(_max) {}
};
+class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO<vector<string> > {
+ BucketIndexShardsManager& start_marker_mgr;
+ BucketIndexShardsManager& end_marker_mgr;
+protected:
+ int issue_op();
+ // Trim until -ENODATA is returned.
+ int valid_ret_code() { return -ENODATA; }
+ bool need_multiple_rounds() { return true; }
+ void add_object(const string& oid) { objs_container.push_back(oid); }
+ void reset_container(vector<string>& objs) {
+ objs_container.swap(objs);
+ iter = objs_container.begin();
+ objs.clear();
+ }
+public:
+ CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
+ BucketIndexShardsManager& _end_marker_mgr, vector<string>& _bucket_objs, uint32_t max_aio) :
+ CLSRGWConcurrentIO<vector<string> >(io_ctx, _bucket_objs, max_aio),
+ start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
+};
+
/**
* Check the bucket index.
*
void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates);
-int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker);
-
/* usage logging */
int cls_rgw_usage_log_read(librados::IoCtx& io_ctx, string& oid, string& user,
uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, string& start_marker, string& end_marker)
{
librados::IoCtx index_ctx;
- string oid;
- int r = open_bucket_index(bucket, index_ctx, oid);
+ vector<string> bucket_objs;
+ int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
- int ret = cls_rgw_bi_log_trim(index_ctx, oid, start_marker, end_marker);
- if (ret < 0)
- return ret;
+ bool has_shards = bucket_objs.size() > 1;
+ BucketIndexShardsManager start_marker_mgr;
+ r = start_marker_mgr.from_string(start_marker, has_shards, bucket_objs.front());
+ if (r < 0)
+ return r;
+ BucketIndexShardsManager end_marker_mgr;
+ r = end_marker_mgr.from_string(end_marker, has_shards, bucket_objs.front());
+ if (r < 0)
+ return r;
- return 0;
+ return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
+ cct->_conf->rgw_bucket_index_max_aio)();
}
int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)