]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Implement sharding for bucket creation.
authorGuang Yang <yguang@yahoo-inc.com>
Fri, 1 Aug 2014 04:54:13 +0000 (04:54 +0000)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 14 Jan 2015 03:21:22 +0000 (19:21 -0800)
Signed-off-by: Guang Yang (yguang@yahoo-inc.com)
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/common/config_opts.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index c13c1a1559c62b7c68cbe7f691925bb013f1df54..47c9dcb8db899fa2855eea00fa0721ade27d1e5e 100644 (file)
 
 using namespace librados;
 
+/*
+ * Callback implementation for AIO request.
+ */
+static void bucket_index_op_completion_cb(void* cb, void* arg) {
+  BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
+  cb_arg->manager->do_completion(cb_arg->id);
+  cb_arg->put();
+}
+
+void BucketIndexAioManager::do_completion(int id) {
+  Mutex::Locker l(lock);
+
+  map<int, librados::AioCompletion*>::iterator iter = pendings.find(id);
+  assert(iter != pendings.end());
+  completions.push_back(iter->second);
+  pendings.erase(iter);
+
+  cond.Signal();
+}
+
+bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
+    int *num_completions, int *ret_code) {
+  lock.Lock();
+  if (pendings.empty() && completions.empty()) {
+    lock.Unlock();
+    return false;
+  }
+  // Wait for AIO completion
+  cond.Wait(lock);
+
+  // Clear the completed AIOs
+  list<librados::AioCompletion*>::iterator iter = completions.begin();
+  for (; iter != completions.end(); ++iter) {
+    int r = (*iter)->get_return_value();
+    if (ret_code && (r < 0 && r != valid_ret_code))
+      (*ret_code) = r;
+    (*iter)->release();
+  }
+  if (num_completions)
+    (*num_completions) = completions.size();
+  completions.clear();
+  lock.Unlock();
+
+  return true;
+}
+
 void cls_rgw_bucket_init(ObjectWriteOperation& o)
 {
   bufferlist in;
   o.exec("rgw", "bucket_init_index", in);
 }
 
+static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
+    const string& oid, BucketIndexAioManager *manager) {
+  bufferlist in;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  op.exec("rgw", "bucket_init_index", in);
+  BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+  AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+  int r = io_ctx.aio_operate(oid, c, &op);
+  if (r >= 0)
+    manager->add_pending(arg->id, c);
+  return r;
+}
+
+int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx,
+        const vector<string>& bucket_objs, uint32_t max_aio)
+{
+  int ret = 0;
+  vector<string>::const_iterator iter = bucket_objs.begin();
+  BucketIndexAioManager manager;
+  // Issue *max_aio* requests, all subsequent requests are issued upon
+  // pending request finishing
+  for (; iter != bucket_objs.end() && max_aio-- > 0; ++iter) {
+    ret = issue_bucket_index_init_op(io_ctx, *iter, &manager);
+    if (ret < 0)
+      break;
+  }
+
+  int num_completions, r;
+  while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) {
+    if (r >= 0 && ret >= 0) {
+      for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
+        int issue_ret = issue_bucket_index_init_op(io_ctx, *iter, &manager);
+        if(issue_ret < 0) {
+          ret = issue_ret;
+          break;
+        }
+      }
+    } else if (ret >= 0) {
+      ret = r;
+    }
+  }
+
+  if (ret < 0) {
+    // Do best effort removal
+    vector<string>::const_iterator citer = bucket_objs.begin();
+    for(; citer != iter; ++citer) {
+      io_ctx.remove(*citer);
+    }
+  }
+
+  return ret;
+}
+
 void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout)
 {
   bufferlist in;
index c6b5b757fa843d0e587cb2a43ecb94951ceb8f6e..9cce78f6331ad1255cffb0c4479a3e137d448b17 100644 (file)
@@ -6,6 +6,71 @@
 #include "cls_rgw_types.h"
 #include "common/RefCountedObj.h"
 
+/*
+ * This class manages AIO completions. This class is not completely thread-safe,
+ * methods like *get_next* is not thread-safe and is expected to be called from
+ * within one thread.
+ */
+class BucketIndexAioManager {
+private:
+  map<int, librados::AioCompletion*> pendings;
+  list<librados::AioCompletion*> completions;
+  int next;
+  Mutex lock;
+  Cond cond;
+public:
+  /*
+   * Create a new instance.
+   */
+  BucketIndexAioManager() : pendings(), completions(), next(0),
+      lock("BucketIndexAioManager::lock"), cond() {}
+
+  /*
+   * Add a new pending AIO completion instance.
+   *
+   * @param id         - the request ID.
+   * @param completion - the AIO completion instance.
+   */
+  void add_pending(int id, librados::AioCompletion* completion) {
+    Mutex::Locker l(lock);
+    pendings[id] = completion;
+  }
+
+  /*
+   * Do completion for the given AIO request.
+   */
+  void do_completion(int id);
+
+  /*
+   * Get next request ID. This method is not thread-safe.
+   *
+   * Return next request ID.
+   */
+  int get_next() { return next++; }
+
+  /*
+   * Wait for AIO completions.
+   *
+   * valid_ret_code  - valid AIO return code.
+   * num_completions - number of completions.
+   * ret_code        - return code of failed AIO.
+   *
+   * Return false if there is no pending AIO, true otherwise.
+   */
+  bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code);
+};
+
+/*
+ * Bucket index AIO request argument, this is used to pass a argument
+ * to callback.
+ */
+struct BucketIndexAioArg : public RefCountedObject {
+  BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
+    id(_id), manager(_manager) {}
+  int id;
+  BucketIndexAioManager* manager;
+};
+
 class RGWGetDirHeader_CB : public RefCountedObject {
 public:
   virtual ~RGWGetDirHeader_CB() {}
@@ -15,6 +80,18 @@ public:
 /* bucket index */
 void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
 
+/**
+ * Init bucket index objects.
+ *
+ * io_ctx      - IO context for rados.
+ * bucket_objs - a lit of bucket index objects.
+ * max_io      - the maximum number of AIO (for throttling).
+ *
+ * Reutrn 0 on success, a failure code otherwise.
+ */
+int cls_rgw_bucket_index_init_op(librados::IoCtx &io_ctx,
+        const vector<string>& bucket_objs, uint32_t max_aio);
+
 void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout);
 
 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
index e0727e4de8e4fcd90dd61b068cbe86f8d2ae25a0..ea2ac25e61c53c14827c907fcfb5bf81fa5978d0 100644 (file)
@@ -851,6 +851,11 @@ OPTION(rgw_max_chunk_size, OPT_INT, 512 * 1024)
  */
 OPTION(rgw_bucket_index_max_shards, OPT_U32, 0)
 
+/**
+ * Represents the maximum AIO pending requests for the bucket index object shards.
+ */
+OPTION(rgw_bucket_index_max_aio, OPT_U32, 8)
+
 OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id")
 OPTION(rgw_enable_apis, OPT_STR, "s3, swift, swift_auth, admin")
 OPTION(rgw_cache_enabled, OPT_BOOL, true)   // rgw cache enabled
index 7a283e0fa9aa678c86b48d44b37d89e1ecba7dbb..0fd332336f7ae1b8707a0c7ba2f6c1e9789a7ab5 100644 (file)
@@ -2366,13 +2366,10 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket)
   string dir_oid =  dir_oid_prefix;
   dir_oid.append(bucket.marker);
 
-  librados::ObjectWriteOperation op;
-  op.create(true);
-  r = cls_rgw_init_index(index_ctx, op, dir_oid);
-  if (r < 0 && r != -EEXIST)
-    return r;
+  vector<string> bucket_objs;
+  get_bucket_index_objects(dir_oid, bucket_index_max_shards, bucket_objs);
 
-  return 0;
+  return cls_rgw_bucket_index_init_op(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio);
 }
 
 /**
@@ -2439,6 +2436,7 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
     info.owner = owner.user_id;
     info.region = region_name;
     info.placement_rule = selected_placement_rule;
+    info.num_shards = bucket_index_max_shards;
     if (!creation_time)
       time(&info.creation_time);
     else
@@ -2467,11 +2465,16 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
 
         /* remove bucket index */
         librados::IoCtx index_ctx; // context for new bucket
-        int r = open_bucket_index_ctx(bucket, index_ctx);
+        vector<string> bucket_objs;
+        int r = open_bucket_index(bucket, index_ctx, bucket_objs);
         if (r < 0)
           return r;
 
-        index_ctx.remove(dir_oid);
+        vector<string>::const_iterator biter;
+        for (biter = bucket_objs.begin(); biter != bucket_objs.end(); ++biter) {
+          // Do best effort removal
+          index_ctx.remove(*biter);
+        }
       }
       /* ret == -ENOENT here */
     }
@@ -3762,6 +3765,44 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
   return 0;
 }
 
+int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+    string& bucket_oid_base) {
+  if (bucket_is_system(bucket))
+    return -EINVAL;
+
+  int r = open_bucket_index_ctx(bucket, index_ctx);
+  if (r < 0)
+    return r;
+
+  if (bucket.marker.empty()) {
+    ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl;
+    return -EIO;
+  }
+
+  bucket_oid_base = dir_oid_prefix;
+  bucket_oid_base.append(bucket.marker);
+
+  return 0;
+
+}
+
+int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+    vector<string>& bucket_objs) {
+  string bucket_oid_base;
+  int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
+  if (ret < 0)
+    return ret;
+
+  // Get the bucket info
+  RGWBucketInfo binfo;
+  ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
+  if (ret < 0)
+    return ret;
+
+  get_bucket_index_objects(bucket_oid_base, binfo.num_shards, bucket_objs);
+  return 0;
+}
+
 static void translate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
 {
   map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
@@ -6712,6 +6753,20 @@ int RGWRados::remove_temp_objects(string date, string time)
   return 0;
 }
 
+void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
+    const uint32_t num_shards, vector<string>& bucket_objects)
+{
+  if (!num_shards) {
+    bucket_objects.push_back(bucket_oid_base);
+  } else {
+    char buf[bucket_oid_base.size() + 32];
+    for (uint32_t i = 0; i < num_shards; ++i) {
+      snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i);
+      bucket_objects.push_back(string(buf));
+    }
+  }
+}
+
 int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid,
                                 time_t epoch, int flags, bool purge)
 {
index 62ac3fa92cda15d97ba3e0b334501b229814354d..e7030ab32a82fed81577ce407771a4feeb67c859 100644 (file)
@@ -1260,7 +1260,10 @@ class RGWRados
   int open_bucket_data_ctx(rgw_bucket& bucket, librados::IoCtx&  io_ctx);
   int open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx&  io_ctx);
   int open_bucket_index(rgw_bucket& bucket, librados::IoCtx&  index_ctx, string& bucket_oid);
-
+  int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx&  index_ctx,
+      string& bucket_oid_base);
+  int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+      vector<string>& bucket_objs);
   struct GetObjState {
     librados::IoCtx io_ctx;
     bool sent_data;
@@ -1367,8 +1370,9 @@ public:
                gc(NULL), use_gc_thread(false), quota_threads(false),
                num_watchers(0), watchers(NULL), watch_handles(NULL),
                watch_initialized(false),
-               bucket_id_lock("rados_bucket_id"), max_bucket_id(0),
+               bucket_id_lock("rados_bucket_id"),
                bucket_index_max_shards(0),
+               max_bucket_id(0),
                cct(NULL), rados(NULL),
                pools_initialized(false),
                quota_handler(NULL),
@@ -1948,6 +1952,17 @@ public:
   }
 
  private:
+  /**
+   * This is a helper method, it generates a list of bucket index objects with the given
+   * bucket base oid and number of shards.
+   *
+   * bucket_oid_base [in] - base name of the bucket index object;
+   * num_shards [in] - number of bucket index object shards.
+   * bucket_objs [out] - filled by this method, a list of bucket index objects.
+   */
+  void get_bucket_index_objects(const string& bucket_oid_base, const uint32_t num_shards,
+      vector<string>& bucket_objs);
+
   int process_intent_log(rgw_bucket& bucket, string& oid,
                         time_t epoch, int flags, bool purge);
   /**