using namespace librados;
+/*
+ * Callback implementation for AIO request.
+ */
+static void bucket_index_op_completion_cb(void* cb, void* arg) {
+ BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
+ cb_arg->manager->do_completion(cb_arg->id);
+ cb_arg->put();
+}
+
+void BucketIndexAioManager::do_completion(int id) {
+ Mutex::Locker l(lock);
+
+ map<int, librados::AioCompletion*>::iterator iter = pendings.find(id);
+ assert(iter != pendings.end());
+ completions.push_back(iter->second);
+ pendings.erase(iter);
+
+ cond.Signal();
+}
+
+bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
+ int *num_completions, int *ret_code) {
+ lock.Lock();
+ if (pendings.empty() && completions.empty()) {
+ lock.Unlock();
+ return false;
+ }
+ // Wait for AIO completion
+ cond.Wait(lock);
+
+ // Clear the completed AIOs
+ list<librados::AioCompletion*>::iterator iter = completions.begin();
+ for (; iter != completions.end(); ++iter) {
+ int r = (*iter)->get_return_value();
+ if (ret_code && (r < 0 && r != valid_ret_code))
+ (*ret_code) = r;
+ (*iter)->release();
+ }
+ if (num_completions)
+ (*num_completions) = completions.size();
+ completions.clear();
+ lock.Unlock();
+
+ return true;
+}
+
void cls_rgw_bucket_init(ObjectWriteOperation& o)
{
bufferlist in;
o.exec("rgw", "bucket_init_index", in);
}
+static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
+ const string& oid, BucketIndexAioManager *manager) {
+ bufferlist in;
+ librados::ObjectWriteOperation op;
+ op.create(true);
+ op.exec("rgw", "bucket_init_index", in);
+ BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+ AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+ int r = io_ctx.aio_operate(oid, c, &op);
+ if (r >= 0)
+ manager->add_pending(arg->id, c);
+ return r;
+}
+
+int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx,
+ const vector<string>& bucket_objs, uint32_t max_aio)
+{
+ int ret = 0;
+ vector<string>::const_iterator iter = bucket_objs.begin();
+ BucketIndexAioManager manager;
+ // Issue *max_aio* requests, all subsequent requests are issued upon
+ // pending request finishing
+ for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
+ ret = issue_bucket_index_init_op(io_ctx, *iter, &manager);
+ if (ret < 0)
+ break;
+ }
+
+ int num_completions, r;
+ while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) {
+ if (r >= 0 && ret >= 0) {
+ for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
+ int issue_ret = issue_bucket_index_init_op(io_ctx, *iter, &manager);
+ if(issue_ret < 0) {
+ ret = issue_ret;
+ break;
+ }
+ }
+ } else if (ret >= 0) {
+ ret = r;
+ }
+ }
+
+ if (ret < 0) {
+ // Do best effort removal
+ vector<string>::const_iterator citer = bucket_objs.begin();
+ for(; citer != iter; ++citer) {
+ io_ctx.remove(*citer);
+ }
+ }
+
+ return ret;
+}
+
void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout)
{
bufferlist in;
#include "cls_rgw_types.h"
#include "common/RefCountedObj.h"
+/*
+ * This class manages AIO completions. This class is not completely thread-safe,
+ * methods like *get_next* is not thread-safe and is expected to be called from
+ * within one thread.
+ */
+class BucketIndexAioManager {
+private:
+ map<int, librados::AioCompletion*> pendings;
+ list<librados::AioCompletion*> completions;
+ int next;
+ Mutex lock;
+ Cond cond;
+public:
+ /*
+ * Create a new instance.
+ */
+ BucketIndexAioManager() : pendings(), completions(), next(0),
+ lock("BucketIndexAioManager::lock"), cond() {}
+
+ /*
+ * Add a new pending AIO completion instance.
+ *
+ * @param id - the request ID.
+ * @param completion - the AIO completion instance.
+ */
+ void add_pending(int id, librados::AioCompletion* completion) {
+ Mutex::Locker l(lock);
+ pendings[id] = completion;
+ }
+
+ /*
+ * Do completion for the given AIO request.
+ */
+ void do_completion(int id);
+
+ /*
+ * Get next request ID. This method is not thread-safe.
+ *
+ * Return next request ID.
+ */
+ int get_next() { return next++; }
+
+ /*
+ * Wait for AIO completions.
+ *
+ * valid_ret_code - valid AIO return code.
+ * num_completions - number of completions.
+ * ret_code - return code of failed AIO.
+ *
+ * Return false if there is no pending AIO, true otherwise.
+ */
+ bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code);
+};
+
+/*
+ * Bucket index AIO request argument, this is used to pass a argument
+ * to callback.
+ */
+struct BucketIndexAioArg : public RefCountedObject {
+ BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
+ id(_id), manager(_manager) {}
+ int id;
+ BucketIndexAioManager* manager;
+};
+
class RGWGetDirHeader_CB : public RefCountedObject {
public:
virtual ~RGWGetDirHeader_CB() {}
/* bucket index */
void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
+/**
+ * Init bucket index objects.
+ *
+ * io_ctx - IO context for rados.
+ * bucket_objs - a lit of bucket index objects.
+ * max_io - the maximum number of AIO (for throttling).
+ *
+ * Reutrn 0 on success, a failure code otherwise.
+ */
+int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
+ const vector<string>& bucket_objs, uint32_t max_aio);
+
void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout);
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
*/
OPTION(rgw_bucket_index_max_shards, OPT_U32, 0)
+/**
+ * Represents the maximum AIO pending requests for the bucket index object shards.
+ */
+OPTION(rgw_bucket_index_max_aio, OPT_U32, 8)
+
OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id")
OPTION(rgw_enable_apis, OPT_STR, "s3, swift, swift_auth, admin")
OPTION(rgw_cache_enabled, OPT_BOOL, true) // rgw cache enabled
string dir_oid = dir_oid_prefix;
dir_oid.append(bucket.marker);
- librados::ObjectWriteOperation op;
- op.create(true);
- r = cls_rgw_init_index(index_ctx, op, dir_oid);
- if (r < 0 && r != -EEXIST)
- return r;
+ vector<string> bucket_objs;
+ get_bucket_index_objects(dir_oid, bucket_index_max_shards, bucket_objs);
- return 0;
+ return cls_rgw_bucket_index_init_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio);
}
/**
info.owner = owner.user_id;
info.region = region_name;
info.placement_rule = selected_placement_rule;
+ info.num_shards = bucket_index_max_shards;
if (!creation_time)
time(&info.creation_time);
else
/* remove bucket index */
librados::IoCtx index_ctx; // context for new bucket
- int r = open_bucket_index_ctx(bucket, index_ctx);
+ vector<string> bucket_objs;
+ int r = open_bucket_index(bucket, index_ctx, bucket_objs);
if (r < 0)
return r;
- index_ctx.remove(dir_oid);
+ vector<string>::const_iterator biter;
+ for (biter = bucket_objs.begin(); biter != bucket_objs.end(); ++biter) {
+ // Do best effort removal
+ index_ctx.remove(*biter);
+ }
}
/* ret == -ENOENT here */
}
return 0;
}
+int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ string& bucket_oid_base) {
+ if (bucket_is_system(bucket))
+ return -EINVAL;
+
+ int r = open_bucket_index_ctx(bucket, index_ctx);
+ if (r < 0)
+ return r;
+
+ if (bucket.marker.empty()) {
+ ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl;
+ return -EIO;
+ }
+
+ bucket_oid_base = dir_oid_prefix;
+ bucket_oid_base.append(bucket.marker);
+
+ return 0;
+
+}
+
+int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ vector<string>& bucket_objs) {
+ string bucket_oid_base;
+ int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
+ if (ret < 0)
+ return ret;
+
+ // Get the bucket info
+ RGWBucketInfo binfo;
+ ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
+ if (ret < 0)
+ return ret;
+
+ get_bucket_index_objects(bucket_oid_base, binfo.num_shards, bucket_objs);
+ return 0;
+}
+
static void translate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
{
map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
return 0;
}
+void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
+ const uint32_t num_shards, vector<string>& bucket_objects)
+{
+ if (!num_shards) {
+ bucket_objects.push_back(bucket_oid_base);
+ } else {
+ char buf[bucket_oid_base.size() + 32];
+ for (uint32_t i = 0; i < num_shards; ++i) {
+ snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i);
+ bucket_objects.push_back(string(buf));
+ }
+ }
+}
+
int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge)
{
int open_bucket_data_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx);
int open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx);
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, string& bucket_oid);
-
+ int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ string& bucket_oid_base);
+ int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+ vector<string>& bucket_objs);
struct GetObjState {
librados::IoCtx io_ctx;
bool sent_data;
gc(NULL), use_gc_thread(false), quota_threads(false),
num_watchers(0), watchers(NULL), watch_handles(NULL),
watch_initialized(false),
- bucket_id_lock("rados_bucket_id"), max_bucket_id(0),
+ bucket_id_lock("rados_bucket_id"),
bucket_index_max_shards(0),
+ max_bucket_id(0),
cct(NULL), rados(NULL),
pools_initialized(false),
quota_handler(NULL),
}
private:
+ /**
+ * This is a helper method, it generates a list of bucket index objects with the given
+ * bucket base oid and number of shards.
+ *
+ * bucket_oid_base [in] - base name of the bucket index object;
+ * num_shards [in] - number of bucket index object shards.
+ * bucket_objs [out] - filled by this method, a list of bucket index objects.
+ */
+ void get_bucket_index_objects(const string& bucket_oid_base, const uint32_t num_shards,
+ vector<string>& bucket_objs);
+
int process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge);
/**