From: Guang Yang Date: Fri, 1 Aug 2014 04:54:13 +0000 (+0000) Subject: Implement sharding for bucket creation. X-Git-Tag: v0.92~12^2~36 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5d004d3eacac36ccfed38928ab60239ed072c04b;p=ceph.git Implement sharding for bucket creation. Signed-off-by: Guang Yang (yguang@yahoo-inc.com) --- diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index c13c1a1559c6..47c9dcb8db89 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -11,12 +11,112 @@ using namespace librados; +/* + * Callback implementation for AIO request. + */ +static void bucket_index_op_completion_cb(void* cb, void* arg) { + BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg; + cb_arg->manager->do_completion(cb_arg->id); + cb_arg->put(); +} + +void BucketIndexAioManager::do_completion(int id) { + Mutex::Locker l(lock); + + map::iterator iter = pendings.find(id); + assert(iter != pendings.end()); + completions.push_back(iter->second); + pendings.erase(iter); + + cond.Signal(); +} + +bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, + int *num_completions, int *ret_code) { + lock.Lock(); + if (pendings.empty() && completions.empty()) { + lock.Unlock(); + return false; + } + // Wait for AIO completion + cond.Wait(lock); + + // Clear the completed AIOs + list::iterator iter = completions.begin(); + for (; iter != completions.end(); ++iter) { + int r = (*iter)->get_return_value(); + if (ret_code && (r < 0 && r != valid_ret_code)) + (*ret_code) = r; + (*iter)->release(); + } + if (num_completions) + (*num_completions) = completions.size(); + completions.clear(); + lock.Unlock(); + + return true; +} + void cls_rgw_bucket_init(ObjectWriteOperation& o) { bufferlist in; o.exec("rgw", "bucket_init_index", in); } +static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx, + const string& oid, BucketIndexAioManager *manager) { + bufferlist in; + librados::ObjectWriteOperation op; + op.create(true); + op.exec("rgw", "bucket_init_index", in); + BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager); + AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb); + int r = io_ctx.aio_operate(oid, c, &op); + if (r >= 0) + manager->add_pending(arg->id, c); + return r; +} + +int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx, + const vector& bucket_objs, uint32_t max_aio) +{ + int ret = 0; + vector::const_iterator iter = bucket_objs.begin(); + BucketIndexAioManager manager; + // Issue *max_aio* requests, all subsequent requests are issued upon + // pending request finishing + for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) { + ret = issue_bucket_index_init_op(io_ctx, *iter, &manager); + if (ret < 0) + break; + } + + int num_completions, r; + while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) { + if (r >= 0 && ret >= 0) { + for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) { + int issue_ret = issue_bucket_index_init_op(io_ctx, *iter, &manager); + if(issue_ret < 0) { + ret = issue_ret; + break; + } + } + } else if (ret >= 0) { + ret = r; + } + } + + if (ret < 0) { + // Do best effort removal + vector::const_iterator citer = bucket_objs.begin(); + for(; citer != iter; ++citer) { + io_ctx.remove(*citer); + } + } + + return ret; +} + void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout) { bufferlist in; diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index c6b5b757fa84..9cce78f6331a 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -6,6 +6,71 @@ #include "cls_rgw_types.h" #include "common/RefCountedObj.h" +/* + * This class manages AIO completions. This class is not completely thread-safe, + * methods like *get_next* is not thread-safe and is expected to be called from + * within one thread. + */ +class BucketIndexAioManager { +private: + map pendings; + list completions; + int next; + Mutex lock; + Cond cond; +public: + /* + * Create a new instance. + */ + BucketIndexAioManager() : pendings(), completions(), next(0), + lock("BucketIndexAioManager::lock"), cond() {} + + /* + * Add a new pending AIO completion instance. + * + * @param id - the request ID. + * @param completion - the AIO completion instance. + */ + void add_pending(int id, librados::AioCompletion* completion) { + Mutex::Locker l(lock); + pendings[id] = completion; + } + + /* + * Do completion for the given AIO request. + */ + void do_completion(int id); + + /* + * Get next request ID. This method is not thread-safe. + * + * Return next request ID. + */ + int get_next() { return next++; } + + /* + * Wait for AIO completions. + * + * valid_ret_code - valid AIO return code. + * num_completions - number of completions. + * ret_code - return code of failed AIO. + * + * Return false if there is no pending AIO, true otherwise. + */ + bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code); +}; + +/* + * Bucket index AIO request argument, this is used to pass a argument + * to callback. + */ +struct BucketIndexAioArg : public RefCountedObject { + BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) : + id(_id), manager(_manager) {} + int id; + BucketIndexAioManager* manager; +}; + class RGWGetDirHeader_CB : public RefCountedObject { public: virtual ~RGWGetDirHeader_CB() {} @@ -15,6 +80,18 @@ public: /* bucket index */ void cls_rgw_bucket_init(librados::ObjectWriteOperation& o); +/** + * Init bucket index objects. + * + * io_ctx - IO context for rados. + * bucket_objs - a lit of bucket index objects. + * max_io - the maximum number of AIO (for throttling). + * + * Reutrn 0 on success, a failure code otherwise. + */ +int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx, + const vector& bucket_objs, uint32_t max_aio); + void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout); void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag, diff --git a/src/common/config_opts.h b/src/common/config_opts.h index e0727e4de8e4..ea2ac25e61c5 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -851,6 +851,11 @@ OPTION(rgw_max_chunk_size, OPT_INT, 512 * 1024) */ OPTION(rgw_bucket_index_max_shards, OPT_U32, 0) +/** + * Represents the maximum AIO pending requests for the bucket index object shards. + */ +OPTION(rgw_bucket_index_max_aio, OPT_U32, 8) + OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id") OPTION(rgw_enable_apis, OPT_STR, "s3, swift, swift_auth, admin") OPTION(rgw_cache_enabled, OPT_BOOL, true) // rgw cache enabled diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 7a283e0fa9aa..0fd332336f7a 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2366,13 +2366,10 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket) string dir_oid = dir_oid_prefix; dir_oid.append(bucket.marker); - librados::ObjectWriteOperation op; - op.create(true); - r = cls_rgw_init_index(index_ctx, op, dir_oid); - if (r < 0 && r != -EEXIST) - return r; + vector bucket_objs; + get_bucket_index_objects(dir_oid, bucket_index_max_shards, bucket_objs); - return 0; + return cls_rgw_bucket_index_init_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio); } /** @@ -2439,6 +2436,7 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, info.owner = owner.user_id; info.region = region_name; info.placement_rule = selected_placement_rule; + info.num_shards = bucket_index_max_shards; if (!creation_time) time(&info.creation_time); else @@ -2467,11 +2465,16 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket, /* remove bucket index */ librados::IoCtx index_ctx; // context for new bucket - int r = open_bucket_index_ctx(bucket, index_ctx); + vector bucket_objs; + int r = open_bucket_index(bucket, index_ctx, bucket_objs); if (r < 0) return r; - index_ctx.remove(dir_oid); + vector::const_iterator biter; + for (biter = bucket_objs.begin(); biter != bucket_objs.end(); ++biter) { + // Do best effort removal + index_ctx.remove(*biter); + } } /* ret == -ENOENT here */ } @@ -3762,6 +3765,44 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, return 0; } +int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx, + string& bucket_oid_base) { + if (bucket_is_system(bucket)) + return -EINVAL; + + int r = open_bucket_index_ctx(bucket, index_ctx); + if (r < 0) + return r; + + if (bucket.marker.empty()) { + ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl; + return -EIO; + } + + bucket_oid_base = dir_oid_prefix; + bucket_oid_base.append(bucket.marker); + + return 0; + +} + +int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, + vector& bucket_objs) { + string bucket_oid_base; + int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base); + if (ret < 0) + return ret; + + // Get the bucket info + RGWBucketInfo binfo; + ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL); + if (ret < 0) + return ret; + + get_bucket_index_objects(bucket_oid_base, binfo.num_shards, bucket_objs); + return 0; +} + static void translate_raw_stats(rgw_bucket_dir_header& header, map& stats) { map::iterator iter = header.stats.begin(); @@ -6712,6 +6753,20 @@ int RGWRados::remove_temp_objects(string date, string time) return 0; } +void RGWRados::get_bucket_index_objects(const string& bucket_oid_base, + const uint32_t num_shards, vector& bucket_objects) +{ + if (!num_shards) { + bucket_objects.push_back(bucket_oid_base); + } else { + char buf[bucket_oid_base.size() + 32]; + for (uint32_t i = 0; i < num_shards; ++i) { + snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i); + bucket_objects.push_back(string(buf)); + } + } +} + int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 62ac3fa92cda..e7030ab32a82 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1260,7 +1260,10 @@ class RGWRados int open_bucket_data_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx); int open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& io_ctx); int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, string& bucket_oid); - + int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx, + string& bucket_oid_base); + int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, + vector& bucket_objs); struct GetObjState { librados::IoCtx io_ctx; bool sent_data; @@ -1367,8 +1370,9 @@ public: gc(NULL), use_gc_thread(false), quota_threads(false), num_watchers(0), watchers(NULL), watch_handles(NULL), watch_initialized(false), - bucket_id_lock("rados_bucket_id"), max_bucket_id(0), + bucket_id_lock("rados_bucket_id"), bucket_index_max_shards(0), + max_bucket_id(0), cct(NULL), rados(NULL), pools_initialized(false), quota_handler(NULL), @@ -1948,6 +1952,17 @@ public: } private: + /** + * This is a helper method, it generates a list of bucket index objects with the given + * bucket base oid and number of shards. + * + * bucket_oid_base [in] - base name of the bucket index object; + * num_shards [in] - number of bucket index object shards. + * bucket_objs [out] - filled by this method, a list of bucket index objects. + */ + void get_bucket_index_objects(const string& bucket_oid_base, const uint32_t num_shards, + vector& bucket_objs); + int process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge); /**