]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "Revert "Merge remote-tracking branch 'origin/wip-bi-sharding-3' into next""
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 22 Jan 2015 01:30:32 +0000 (17:30 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 22 Jan 2015 01:30:32 +0000 (17:30 -0800)
Following a merge of next to master, the feature got reverted (because
it was reverted on next). Undoing.

This reverts commit 6613358ddc5339c8e33c409387fd6044db0b6f26.

22 files changed:
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/common/config_opts.h
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_common.cc
src/rgw/rgw_common.h
src/rgw/rgw_json_enc.cc
src/rgw/rgw_op.cc
src/rgw/rgw_quota.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_replica_log.cc
src/rgw/rgw_replica_log.h
src/rgw/rgw_rest_log.cc
src/rgw/rgw_rest_log.h
src/rgw/rgw_rest_replica_log.cc
src/rgw/rgw_rest_swift.cc
src/test/Makefile.am
src/test/cls_rgw/test_cls_rgw.cc

index eb4a4232d1892f6a624d0fee2db41f064c5cee7d..6198d62810bac3c49db9615fd1fa88f3b64ce09b 100644 (file)
@@ -814,8 +814,6 @@ static int bi_log_iterate_entries(cls_method_context_t hctx, const string& marke
 
   map<string, bufferlist> keys;
   string filter_prefix, end_key;
-  bufferlist start_bl;
-  bool start_key_added = false;
   uint32_t i = 0;
   string key;
 
@@ -829,10 +827,6 @@ static int bi_log_iterate_entries(cls_method_context_t hctx, const string& marke
     key.append(marker);
 
     start_key = key;
-    int ret = cls_cxx_map_get_val(hctx, start_key, &start_bl);
-    if ((ret < 0) && (ret != -ENOENT)) {
-        return ret;
-    } 
   } else {
     start_key = key_iter;
   }
@@ -856,10 +850,6 @@ static int bi_log_iterate_entries(cls_method_context_t hctx, const string& marke
     if (ret < 0)
       return ret;
 
-    if ((start_bl.length() > 0) && (!start_key_added)) {
-      keys[start_key] = start_bl;
-      start_key_added = true;
-    }
     map<string, bufferlist>::iterator iter = keys.begin();
     if (iter == keys.end())
       break;
index c13c1a1559c62b7c68cbe7f691925bb013f1df54..545b36bcff569e11acba7361e4c468c05a6409f9 100644 (file)
 
 using namespace librados;
 
+const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
+const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
+
+/**
+ * This class represents the bucket index object operation callback context.
+ */
+template <typename T>
+class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
+private:
+  T *data;
+  int *ret_code;
+public:
+  ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); }
+  ~ClsBucketIndexOpCtx() {}
+  void handle_completion(int r, bufferlist& outbl) {
+    if (r >= 0) {
+      try {
+        bufferlist::iterator iter = outbl.begin();
+        ::decode((*data), iter);
+      } catch (buffer::error& err) {
+        r = -EIO;
+      }
+    }
+    if (ret_code) {
+      *ret_code = r;
+    }
+  }
+};
+
+void BucketIndexAioManager::do_completion(int id) {
+  Mutex::Locker l(lock);
+
+  map<int, librados::AioCompletion*>::iterator iter = pendings.find(id);
+  assert(iter != pendings.end());
+  completions[id] = iter->second;
+  pendings.erase(iter);
+
+  // If the caller needs a list of finished objects, store them
+  // for further processing
+  map<int, string>::iterator miter = pending_objs.find(id);
+  if (miter != pending_objs.end()) {
+    completion_objs[id] = miter->second;
+    pending_objs.erase(miter);
+  }
+
+  cond.Signal();
+}
+
+bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
+    int *num_completions, int *ret_code, map<int, string> *objs) {
+  lock.Lock();
+  if (pendings.empty() && completions.empty()) {
+    lock.Unlock();
+    return false;
+  }
+
+  if (completions.empty()) {
+    // Wait for AIO completion
+    cond.Wait(lock);
+  }
+
+  // Clear the completed AIOs
+  map<int, librados::AioCompletion*>::iterator iter = completions.begin();
+  for (; iter != completions.end(); ++iter) {
+    int r = iter->second->get_return_value();
+    if (objs && r == 0) { /* update list of successfully completed objs */
+      map<int, string>::iterator liter = completion_objs.find(iter->first);
+      if (liter != completion_objs.end()) {
+        (*objs)[liter->first] = liter->second;
+      }
+    }
+    if (ret_code && (r < 0 && r != valid_ret_code))
+      (*ret_code) = r;
+    iter->second->release();
+  }
+  if (num_completions)
+    (*num_completions) = completions.size();
+  completions.clear();
+  lock.Unlock();
+
+  return true;
+}
+
 void cls_rgw_bucket_init(ObjectWriteOperation& o)
 {
   bufferlist in;
   o.exec("rgw", "bucket_init_index", in);
 }
 
-void cls_rgw_bucket_set_tag_timeout(ObjectWriteOperation& o, uint64_t tag_timeout)
-{
+static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
+    const string& oid, BucketIndexAioManager *manager) {
+  bufferlist in;
+  librados::ObjectWriteOperation op;
+  op.create(true);
+  op.exec("rgw", "bucket_init_index", in);
+  return manager->aio_operate(io_ctx, oid, &op);
+}
+
+static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
+    const string& oid, uint64_t timeout, BucketIndexAioManager *manager) {
   bufferlist in;
   struct rgw_cls_tag_timeout_op call;
-  call.tag_timeout = tag_timeout;
+  call.tag_timeout = timeout;
   ::encode(call, in);
-  o.exec("rgw", "bucket_set_tag_timeout", in);
+  ObjectWriteOperation op;
+  op.exec("rgw", "bucket_set_tag_timeout", in);
+  return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueBucketIndexInit::issue_op(int shard_id, const string& oid)
+{
+  return issue_bucket_index_init_op(io_ctx, oid, &manager);
+}
+
+void CLSRGWIssueBucketIndexInit::cleanup()
+{
+  // Do best effort removal
+  for (map<int, string>::iterator citer = objs_container.begin(); citer != iter; ++citer) {
+    io_ctx.remove(citer->second);
+  }
+}
+
+int CLSRGWIssueSetTagTimeout::issue_op(int shard_id, const string& oid)
+{
+  return issue_bucket_set_tag_timeout_op(io_ctx, oid, tag_timeout, &manager);
 }
 
 void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, string& tag,
@@ -59,70 +171,89 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string&
   o.exec("rgw", "bucket_complete_op", in);
 }
 
-
-int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj,
-                    string& filter_prefix, uint32_t num_entries,
-                    rgw_bucket_dir *dir, bool *is_truncated)
-{
-  bufferlist in, out;
+static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
+    const string& oid, const string& start_obj, const string& filter_prefix,
+    uint32_t num_entries, BucketIndexAioManager *manager,
+    struct rgw_cls_list_ret *pdata) {
+  bufferlist in;
   struct rgw_cls_list_op call;
   call.start_obj = start_obj;
   call.filter_prefix = filter_prefix;
   call.num_entries = num_entries;
   ::encode(call, in);
-  int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
-  if (r < 0)
-    return r;
 
-  struct rgw_cls_list_ret ret;
-  try {
-    bufferlist::iterator iter = out.begin();
-    ::decode(ret, iter);
-  } catch (buffer::error& err) {
-    return -EIO;
-  }
+  librados::ObjectReadOperation op;
+  op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
+  return manager->aio_operate(io_ctx, oid, &op);
+}
 
-  if (dir)
-    *dir = ret.dir;
-  if (is_truncated)
-    *is_truncated = ret.is_truncated;
+int CLSRGWIssueBucketList::issue_op(int shard_id, const string& oid)
+{
+  return issue_bucket_list_op(io_ctx, oid, start_obj, filter_prefix, num_entries, &manager, &result[shard_id]);
+}
 
- return r;
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+                                 BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
+    struct cls_rgw_bi_log_list_ret *pdata) {
+  bufferlist in;
+  cls_rgw_bi_log_list_op call;
+  call.marker = marker_mgr.get(shard_id, "");
+  call.max = max;
+  ::encode(call, in);
+
+  librados::ObjectReadOperation op;
+  op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx<struct cls_rgw_bi_log_list_ret>(pdata, NULL));
+  return manager->aio_operate(io_ctx, oid, &op);
 }
 
-int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,
-                                 rgw_bucket_dir_header *existing_header,
-                                 rgw_bucket_dir_header *calculated_header)
+int CLSRGWIssueBILogList::issue_op(int shard_id, const string& oid)
 {
-  bufferlist in, out;
-  int r = io_ctx.exec(oid, "rgw", "bucket_check_index", in, out);
-  if (r < 0)
-    return r;
+  return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
+}
 
-  struct rgw_cls_check_index_ret ret;
-  try {
-    bufferlist::iterator iter = out.begin();
-    ::decode(ret, iter);
-  } catch (buffer::error& err) {
-    return -EIO;
-  }
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, int shard_id,
+                              BucketIndexShardsManager& start_marker_mgr,
+                              BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
+  bufferlist in;
+  cls_rgw_bi_log_trim_op call;
+  call.start_marker = start_marker_mgr.get(shard_id, "");
+  call.end_marker = end_marker_mgr.get(shard_id, "");
+  ::encode(call, in);
+  ObjectWriteOperation op;
+  op.exec("rgw", "bi_log_trim", in);
+  return manager->aio_operate(io_ctx, oid, &op);
+}
 
-  if (existing_header)
-    *existing_header = ret.existing_header;
-  if (calculated_header)
-    *calculated_header = ret.calculated_header;
+int CLSRGWIssueBILogTrim::issue_op(int shard_id, const string& oid)
+{
+  return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
+}
 
-  return 0;
+static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
+    struct rgw_cls_check_index_ret *pdata) {
+  bufferlist in;
+  librados::ObjectReadOperation op;
+  op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
+        pdata, NULL));
+  return manager->aio_operate(io_ctx, oid, &op);
 }
 
-int cls_rgw_bucket_rebuild_index_op(IoCtx& io_ctx, string& oid)
+int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
 {
-  bufferlist in, out;
-  int r = io_ctx.exec(oid, "rgw", "bucket_rebuild_index", in, out);
-  if (r < 0)
-    return r;
+  return issue_bucket_check_index_op(io_ctx, oid, &manager, &result[shard_id]);
+}
 
-  return 0;
+static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
+    BucketIndexAioManager *manager) {
+  bufferlist in;
+  librados::ObjectWriteOperation op;
+  op.exec("rgw", "bucket_rebuild_index", in);
+  return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueBucketRebuild::issue_op(int shard_id, const string& oid)
+{
+  return issue_bucket_rebuild_index_op(io_ctx, oid, &manager);
 }
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
@@ -136,28 +267,9 @@ void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
   o.exec("rgw", "dir_suggest_changes", updates);
 }
 
-int cls_rgw_get_dir_header(IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header)
+int CLSRGWIssueGetDirHeader::issue_op(int shard_id, const string& oid)
 {
-  bufferlist in, out;
-  struct rgw_cls_list_op call;
-  call.num_entries = 0;
-  ::encode(call, in);
-  int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
-  if (r < 0)
-    return r;
-
-  struct rgw_cls_list_ret ret;
-  try {
-    bufferlist::iterator iter = out.begin();
-    ::decode(ret, iter);
-  } catch (buffer::error& err) {
-    return -EIO;
-  }
-
-  if (header)
-    *header = ret.dir.header;
-
- return r;
+  return issue_bucket_list_op(io_ctx, oid, "", "", 0, &manager, &result[shard_id]);
 }
 
 class GetDirHeaderCompletion : public ObjectOperationCompletion {
@@ -198,56 +310,6 @@ int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB
   return 0;
 }
 
-int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
-                    list<rgw_bi_log_entry>& entries, bool *truncated)
-{
-  bufferlist in, out;
-  cls_rgw_bi_log_list_op call;
-  call.marker = marker;
-  call.max = max;
-  ::encode(call, in);
-  int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out);
-  if (r < 0)
-    return r;
-
-  cls_rgw_bi_log_list_ret ret;
-  try {
-    bufferlist::iterator iter = out.begin();
-    ::decode(ret, iter);
-  } catch (buffer::error& err) {
-    return -EIO;
-  }
-
-  entries = ret.entries;
-
-  if (truncated)
-    *truncated = ret.truncated;
-
- return r;
-}
-
-int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker)
-{
-  do {
-    int r;
-    bufferlist in, out;
-    cls_rgw_bi_log_trim_op call;
-    call.start_marker = start_marker;
-    call.end_marker = end_marker;
-    ::encode(call, in);
-    r = io_ctx.exec(oid, "rgw", "bi_log_trim", in, out);
-
-    if (r == -ENODATA)
-      break;
-
-    if (r < 0)
-      return r;
-
-  } while (1);
-
-  return 0;
-}
-
 int cls_rgw_usage_log_read(IoCtx& io_ctx, string& oid, string& user,
                            uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
                            string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage,
index c6b5b757fa843d0e587cb2a43ecb94951ceb8f6e..79de35825eff1ef0b45ee1ea5873b2b2d5dd542b 100644 (file)
 #define CEPH_CLS_RGW_CLIENT_H
 
 #include "include/types.h"
+#include "include/str_list.h"
 #include "include/rados/librados.hpp"
 #include "cls_rgw_types.h"
+#include "cls_rgw_ops.h"
 #include "common/RefCountedObj.h"
 
+// Forward declaration
+class BucketIndexAioManager;
+
+/*
+ * Bucket index AIO request argument, this is used to pass a argument
+ * to callback.
+ */
+struct BucketIndexAioArg : public RefCountedObject {
+  BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
+    id(_id), manager(_manager) {}
+  int id;
+  BucketIndexAioManager* manager;
+};
+
+/*
+ * This class manages AIO completions. This class is not completely thread-safe,
+ * methods like *get_next* is not thread-safe and is expected to be called from
+ * within one thread.
+ */
+class BucketIndexAioManager {
+private:
+  map<int, librados::AioCompletion*> pendings;
+  map<int, librados::AioCompletion*> completions;
+  map<int, string> pending_objs;
+  map<int, string> completion_objs;
+  int next;
+  Mutex lock;
+  Cond cond;
+  /*
+   * Callback implementation for AIO request.
+   */
+  static void bucket_index_op_completion_cb(void* cb, void* arg) {
+    BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
+    cb_arg->manager->do_completion(cb_arg->id);
+    cb_arg->put();
+  }
+
+  /*
+   * Get next request ID. This method is not thread-safe.
+   *
+   * Return next request ID.
+   */
+  int get_next() { return next++; }
+    
+  /*
+   * Add a new pending AIO completion instance.
+   *
+   * @param id         - the request ID.
+   * @param completion - the AIO completion instance.
+   * @param oid        - the object id associated with the object, if it is NULL, we don't
+   *                     track the object id per callback.
+   */
+  void add_pending(int id, librados::AioCompletion* completion, const string& oid) {
+    pendings[id] = completion;
+    pending_objs[id] = oid;
+  }
+public:
+  /*
+   * Create a new instance.
+   */
+  BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {}
+
+
+  /*
+   * Do completion for the given AIO request.
+   */
+  void do_completion(int id);
+
+  /*
+   * Wait for AIO completions.
+   *
+   * valid_ret_code  - valid AIO return code.
+   * num_completions - number of completions.
+   * ret_code        - return code of failed AIO.
+   * objs            - a list of objects that has been finished the AIO.
+   *
+   * Return false if there is no pending AIO, true otherwise.
+   */
+  bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
+      map<int, string> *objs);
+
+  /**
+   * Do aio read operation.
+   */
+  bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) {
+    Mutex::Locker l(lock);
+    BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+    librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+    int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
+    if (r >= 0) {
+      add_pending(arg->id, c, oid);
+    }
+    return r;
+  }
+
+  /**
+   * Do aio write operation.
+   */
+  bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) {
+    Mutex::Locker l(lock);
+    BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+    librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+    int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
+    if (r >= 0) {
+      add_pending(arg->id, c, oid);
+    }
+    return r;
+  }
+};
+
 class RGWGetDirHeader_CB : public RefCountedObject {
 public:
   virtual ~RGWGetDirHeader_CB() {}
   virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0;
 };
 
+class BucketIndexShardsManager {
+private:
+  // Per shard setting manager, for example, marker.
+  map<int, string> value_by_shards;
+public:
+  const static string KEY_VALUE_SEPARATOR;
+  const static string SHARDS_SEPARATOR;
+
+  void add(int shard, const string& value) {
+    value_by_shards[shard] = value;
+  }
+
+  const string& get(int shard, const string& default_value) {
+    map<int, string>::iterator iter = value_by_shards.find(shard);
+    return (iter == value_by_shards.end() ? default_value : iter->second);
+  }
+
+  map<int, string>& get() {
+    return value_by_shards;
+  }
+
+  bool empty() {
+    return value_by_shards.empty();
+  }
+
+  void to_string(string *out) const {
+    if (!out) {
+      return;
+    }
+    out->clear();
+    map<int, string>::const_iterator iter = value_by_shards.begin();
+    for (; iter != value_by_shards.end(); ++iter) {
+      if (out->length()) {
+        // Not the first item, append a separator first
+        out->append(SHARDS_SEPARATOR);
+      }
+      char buf[16];
+      snprintf(buf, sizeof(buf), "%d", iter->first);
+      out->append(buf);
+      out->append(KEY_VALUE_SEPARATOR);
+      out->append(iter->second);
+    }
+  }
+
+  static bool is_shards_marker(const string& marker) {
+    return marker.find(KEY_VALUE_SEPARATOR) != string::npos;
+  }
+
+  /*
+   * convert from string. There are two options of how the string looks like:
+   *
+   * 1. Single shard, no shard id specified, e.g. 000001.23.1
+   *
+   * for this case, if passed shard_id >= 0, use this shard id, otherwise assume that it's a
+   * bucket with no shards.
+   *
+   * 2. One or more shards, shard id specified for each shard, e.g., 0#00002.12,1#00003.23.2
+   *
+   */
+  int from_string(const string& composed_marker, int shard_id) {
+    value_by_shards.clear();
+    vector<string> shards;
+    get_str_vec(composed_marker, SHARDS_SEPARATOR.c_str(), shards);
+    if (shards.size() > 1 && shard_id >= 0) {
+      return -EINVAL;
+    }
+    vector<string>::const_iterator iter = shards.begin();
+    for (; iter != shards.end(); ++iter) {
+      size_t pos = iter->find(KEY_VALUE_SEPARATOR);
+      if (pos == string::npos) {
+        if (!value_by_shards.empty()) {
+          return -EINVAL;
+        }
+        if (shard_id < 0) {
+          add(0, *iter);
+        } else {
+          add(shard_id, *iter);
+        }
+        return 0;
+      }
+      string shard_str = iter->substr(0, pos);
+      string err;
+      int shard = (int)strict_strtol(shard_str.c_str(), 10, &err);
+      if (!err.empty()) {
+        return -EINVAL;
+      }
+      add(shard, iter->substr(pos + 1));
+    }
+    return 0;
+  }
+};
+
 /* bucket index */
 void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
 
-void cls_rgw_bucket_set_tag_timeout(librados::ObjectWriteOperation& o, uint64_t tag_timeout);
+class CLSRGWConcurrentIO {
+protected:
+  librados::IoCtx& io_ctx;
+  map<int, string>& objs_container;
+  map<int, string>::iterator iter;
+  uint32_t max_aio;
+  BucketIndexAioManager manager;
+
+  virtual int issue_op(int shard_id, const string& oid) = 0;
+
+  virtual void cleanup() {}
+  virtual int valid_ret_code() { return 0; }
+  // Return true if multiple rounds of OPs might be needed, this happens when
+  // OP needs to be re-send until a certain code is returned.
+  virtual bool need_multiple_rounds() { return false; }
+  // Add a new object to the end of the container.
+  virtual void add_object(int shard, const string& oid) {}
+  virtual void reset_container(map<int, string>& objs) {}
+
+public:
+  CLSRGWConcurrentIO(librados::IoCtx& ioc, map<int, string>& _objs_container,
+                     uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
+  virtual ~CLSRGWConcurrentIO() {}
+
+  int operator()() {
+    int ret = 0;
+    iter = objs_container.begin();
+    for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
+      ret = issue_op(iter->first, iter->second);
+      if (ret < 0)
+        break;
+    }
+
+    int num_completions, r = 0;
+    map<int, string> objs;
+    map<int, string> *pobjs = (need_multiple_rounds() ? &objs : NULL);
+    while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) {
+      if (r >= 0 && ret >= 0) {
+        for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
+          int issue_ret = issue_op(iter->first, iter->second);
+          if(issue_ret < 0) {
+            ret = issue_ret;
+            break;
+          }
+        }
+      } else if (ret >= 0) {
+        ret = r;
+      }
+      if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
+        // For those objects which need another round, use them to reset
+        // the container
+        reset_container(objs);
+      }
+    }
+
+    if (ret < 0) {
+      cleanup();
+    }
+    return ret;
+  }
+};
+
+class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
+protected:
+  int issue_op(int shard_id, const string& oid);
+  int valid_ret_code() { return -EEXIST; }
+  void cleanup();
+public:
+  CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
+                     uint32_t _max_aio) :
+    CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
+};
+
+class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
+  uint64_t tag_timeout;
+protected:
+  int issue_op(int shard_id, const string& oid);
+public:
+  CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
+                     uint32_t _max_aio, uint64_t _tag_timeout) :
+    CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {}
+};
 
 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
                                string& name, string& locator, bool log_op);
@@ -24,28 +309,118 @@ void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp o
                                 rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta,
                                list<string> *remove_objs, bool log_op);
 
-int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj,
-                    string& filter_prefix, uint32_t num_entries,
-                    rgw_bucket_dir *dir, bool *is_truncated);
+/**
+ * List the bucket with the starting object and filter prefix.
+ * NOTE: this method do listing requests for each bucket index shards identified by
+ *       the keys of the *list_results* map, which means the map should be popludated
+ *       by the caller to fill with each bucket index object id.
+ *
+ * io_ctx        - IO context for rados.
+ * start_obj     - marker for the listing.
+ * filter_prefix - filter prefix.
+ * num_entries   - number of entries to request for each object (note the total
+ *                 amount of entries returned depends on the number of shardings).
+ * list_results  - the list results keyed by bucket index object id.
+ * max_aio       - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+*/
+
+class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
+  string start_obj;
+  string filter_prefix;
+  uint32_t num_entries;
+  map<int, rgw_cls_list_ret>& result;
+protected:
+  int issue_op(int shard_id, const string& oid);
+public:
+  CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const string& _start_obj,
+                        const string& _filter_prefix, uint32_t _num_entries,
+                        map<int, string>& oids,
+                        map<int, struct rgw_cls_list_ret>& list_results,
+                        uint32_t max_aio) :
+  CLSRGWConcurrentIO(io_ctx, oids, max_aio),
+  start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), result(list_results) {}
+};
+
+class CLSRGWIssueBILogList : public CLSRGWConcurrentIO {
+  map<int, struct cls_rgw_bi_log_list_ret>& result;
+  BucketIndexShardsManager& marker_mgr;
+  uint32_t max;
+protected:
+  int issue_op(int shard_id, const string& oid);
+public:
+  CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
+                       map<int, string>& oids,
+                       map<int, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
+    CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists),
+    marker_mgr(_marker_mgr), max(_max) {}
+};
+
+class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO {
+  BucketIndexShardsManager& start_marker_mgr;
+  BucketIndexShardsManager& end_marker_mgr;
+protected:
+  int issue_op(int shard_id, const string& oid);
+  // Trim until -ENODATA is returned.
+  int valid_ret_code() { return -ENODATA; }
+  bool need_multiple_rounds() { return true; }
+  void add_object(int shard, const string& oid) { objs_container[shard] = oid; }
+  void reset_container(map<int, string>& objs) {
+    objs_container.swap(objs);
+    iter = objs_container.begin();
+    objs.clear();
+  }
+public:
+  CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
+      BucketIndexShardsManager& _end_marker_mgr, map<int, string>& _bucket_objs, uint32_t max_aio) :
+    CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio),
+    start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
+};
+
+/**
+ * Check the bucket index.
+ *
+ * io_ctx          - IO context for rados.
+ * bucket_objs_ret - check result for all shards.
+ * max_aio         - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+ */
+class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /*<map<string, struct rgw_cls_check_index_ret> >*/ {
+  map<int, struct rgw_cls_check_index_ret>& result;
+protected:
+  int issue_op(int shard_id, const string& oid);
+public:
+  CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map<int, string>& oids, map<int, struct rgw_cls_check_index_ret>& bucket_objs_ret,
+                     uint32_t _max_aio) :
+    CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {}
+};
+
+class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
+protected:
+  int issue_op(int shard_id, const string& oid);
+public:
+  CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, map<int, string>& bucket_objs,
+                           uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {}
+};
+
+class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO {
+  map<int, rgw_cls_list_ret>& result;
+protected:
+  int issue_op(int shard_id, const string& oid);
+public:
+  CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map<int, string>& oids, map<int, rgw_cls_list_ret>& dir_headers,
+                          uint32_t max_aio) :
+    CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
+};
 
-int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
-                                 rgw_bucket_dir_header *existing_header,
-                                 rgw_bucket_dir_header *calculated_header);
-int cls_rgw_bucket_rebuild_index_op(librados::IoCtx& io_ctx, string& oid);
-  
-int cls_rgw_get_dir_header(librados::IoCtx& io_ctx, string& oid, rgw_bucket_dir_header *header);
 int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
 
 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
 
 void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates);
 
-/* bucket index log */
-
-int cls_rgw_bi_log_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
-                    list<rgw_bi_log_entry>& entries, bool *truncated);
-int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker);
-
 /* usage logging */
 int cls_rgw_usage_log_read(librados::IoCtx& io_ctx, string& oid, string& user,
                            uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
index bb627cea2cbfc8fb8e3a5d3b7e7b160e74090189..bd053b1d372d65586a4f3550f452c790ad7913fc 100644 (file)
@@ -883,6 +883,22 @@ OPTION(nss_db_path, OPT_STR, "") // path to nss db
 
 OPTION(rgw_max_chunk_size, OPT_INT, 512 * 1024)
 
+/**
+ * override max bucket index shards in zone configuration (if not zero)
+ *
+ * Represents the number of shards for the bucket index object, a value of zero
+ * indicates there is no sharding. By default (no sharding, the name of the object
+ * is '.dir.{marker}', with sharding, the name is '.dir.{markder}.{sharding_id}',
+ * sharding_id is zero-based value. It is not recommended to set a too large value
+ * (e.g. thousand) as it increases the cost for bucket listing.
+ */
+OPTION(rgw_override_bucket_index_max_shards, OPT_U32, 0)
+
+/**
+ * Represents the maximum AIO pending requests for the bucket index object shards.
+ */
+OPTION(rgw_bucket_index_max_aio, OPT_U32, 8)
+
 OPTION(rgw_data, OPT_STR, "/var/lib/ceph/radosgw/$cluster-$id")
 OPTION(rgw_enable_apis, OPT_STR, "s3, swift, swift_auth, admin")
 OPTION(rgw_cache_enabled, OPT_BOOL, true)   // rgw cache enabled
index 2c775ca1e25f2593c4a721937ddc6356e5e5adcb..03b51a59685ee018101229254aef00611d3da37c 100644 (file)
@@ -519,7 +519,7 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter)
     return r;
 
   map<RGWObjCategory, RGWStorageStats> stats;
-  uint64_t bucket_ver, master_ver;
+  string bucket_ver, master_ver;
   string max_marker;
   int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker);
   if (ret < 0) {
@@ -535,8 +535,8 @@ int bucket_stats(rgw_bucket& bucket, Formatter *formatter)
   formatter->dump_string("marker", bucket.marker);
   formatter->dump_string("owner", bucket_info.owner);
   formatter->dump_int("mtime", mtime);
-  formatter->dump_int("ver", bucket_ver);
-  formatter->dump_int("master_ver", master_ver);
+  formatter->dump_string("ver", bucket_ver);
+  formatter->dump_string("master_ver", master_ver);
   formatter->dump_string("max_marker", max_marker);
   dump_bucket_usage(stats, formatter);
   formatter->close_section();
@@ -2350,7 +2350,7 @@ next:
 
     do {
       list<rgw_bi_log_entry> entries;
-      ret = store->list_bi_log_entries(bucket, marker, max_entries - count, entries, &truncated);
+      ret = store->list_bi_log_entries(bucket, shard_id, marker, max_entries - count, entries, &truncated);
       if (ret < 0) {
         cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
         return -ret;
@@ -2382,7 +2382,7 @@ next:
       cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
       return -ret;
     }
-    ret = store->trim_bi_log_entries(bucket, start_marker, end_marker);
+    ret = store->trim_bi_log_entries(bucket, shard_id, start_marker, end_marker);
     if (ret < 0) {
       cerr << "ERROR: trim_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
       return -ret;
@@ -2565,7 +2565,7 @@ next:
       }
 
       RGWReplicaBucketLogger logger(store);
-      ret = logger.get_bounds(bucket, bounds);
+      ret = logger.get_bounds(bucket, shard_id, bounds);
       if (ret < 0)
         return -ret;
     } else { // shouldn't get here
@@ -2616,7 +2616,7 @@ next:
       }
 
       RGWReplicaBucketLogger logger(store);
-      ret = logger.delete_bound(bucket, daemon_id);
+      ret = logger.delete_bound(bucket, shard_id, daemon_id);
       if (ret < 0)
         return -ret;
     }
index 4afe1ae10192a8bc5b4277adaa12aedb8b3ce9c0..48abc4d72e66fb535d244529e9b3937d6c680a60 100644 (file)
@@ -233,6 +233,32 @@ int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersi
   return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
 }
 
+int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
+{
+  ssize_t pos = bucket_instance.rfind(':');
+  if (pos < 0) {
+    return -EINVAL;
+  }
+
+  string first = bucket_instance.substr(0, pos);
+  string second = bucket_instance.substr(pos + 1);
+
+  if (first.find(':') == string::npos) {
+    *shard_id = -1;
+    *target_bucket_instance = bucket_instance;
+    return 0;
+  }
+
+  *target_bucket_instance = first;
+  string err;
+  *shard_id = strict_strtol(second.c_str(), 10, &err);
+  if (!err.empty()) {
+    return -EINVAL;
+  }
+
+  return 0;
+}
+
 int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
                          map<string, bufferlist>& attrs,
                          map<string, bufferlist>* rmattrs,
@@ -358,7 +384,7 @@ int rgw_remove_bucket(RGWRados *store, const string& bucket_owner, rgw_bucket& b
   RGWBucketInfo info;
   bufferlist bl;
 
-  uint64_t bucket_ver, master_ver;
+  string bucket_ver, master_ver;
 
   ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, NULL);
   if (ret < 0)
@@ -729,9 +755,9 @@ int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
   while (is_truncated) {
     map<string, RGWObjEnt> result;
 
-    int r = store->cls_bucket_list(bucket, marker, prefix, 1000, result,
-             &is_truncated, &marker,
-             bucket_object_check_filter);
+    int r = store->cls_bucket_list(bucket, marker, prefix, 1000,
+                                   result, &is_truncated, &marker,
+                                  bucket_object_check_filter);
 
     if (r == -ENOENT) {
       break;
@@ -957,7 +983,7 @@ static int bucket_stats(RGWRados *store, std::string&  bucket_name, Formatter *f
 
   bucket = bucket_info.bucket;
 
-  uint64_t bucket_ver, master_ver;
+  string bucket_ver, master_ver;
   string max_marker;
   int ret = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, stats, &max_marker);
   if (ret < 0) {
@@ -972,8 +998,8 @@ static int bucket_stats(RGWRados *store, std::string&  bucket_name, Formatter *f
   formatter->dump_string("id", bucket.bucket_id);
   formatter->dump_string("marker", bucket.marker);
   formatter->dump_string("owner", bucket_info.owner);
-  formatter->dump_int("ver", bucket_ver);
-  formatter->dump_int("master_ver", master_ver);
+  formatter->dump_string("ver", bucket_ver);
+  formatter->dump_string("master_ver", master_ver);
   formatter->dump_int("mtime", mtime);
   formatter->dump_string("max_marker", max_marker);
   dump_bucket_usage(stats, formatter);
@@ -1076,9 +1102,10 @@ void rgw_data_change::dump(Formatter *f) const
 }
 
 
-int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
-    string& name = bucket.name;
-    uint32_t r = ceph_str_hash_linux(name.c_str(), name.size()) % num_shards;
+int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
+    const string& name = bs.bucket.name;
+    int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
+    uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
 
     return (int)r;
 }
@@ -1090,19 +1117,22 @@ int RGWDataChangesLog::renew_entries()
 
   /* we can't keep the bucket name as part of the cls_log_entry, and we need
    * it later, so we keep two lists under the map */
-  map<int, pair<list<string>, list<cls_log_entry> > > m;
+  map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
 
   lock.Lock();
-  map<string, rgw_bucket> entries;
+  map<rgw_bucket_shard, bool> entries;
   entries.swap(cur_cycle);
   lock.Unlock();
 
-  map<string, rgw_bucket>::iterator iter;
+  map<rgw_bucket_shard, bool>::iterator iter;
   string section;
   utime_t ut = ceph_clock_now(cct);
   for (iter = entries.begin(); iter != entries.end(); ++iter) {
-    rgw_bucket& bucket = iter->second;
-    int index = choose_oid(bucket);
+    const rgw_bucket_shard& bs = iter->first;
+    const rgw_bucket& bucket = bs.bucket;
+    int shard_id = bs.shard_id;
+
+    int index = choose_oid(bs);
 
     cls_log_entry entry;
 
@@ -1110,16 +1140,21 @@ int RGWDataChangesLog::renew_entries()
     bufferlist bl;
     change.entity_type = ENTITY_TYPE_BUCKET;
     change.key = bucket.name + ":" + bucket.bucket_id;
+    if (shard_id >= 0) {
+      char buf[16];
+      snprintf(buf, sizeof(buf), ":%d", shard_id);
+      change.key += buf;
+    }
     change.timestamp = ut;
     ::encode(change, bl);
 
     store->time_log_prepare_entry(entry, ut, section, bucket.name, bl);
 
-    m[index].first.push_back(bucket.name);
+    m[index].first.push_back(bs);
     m[index].second.push_back(entry);
   }
 
-  map<int, pair<list<string>, list<cls_log_entry> > >::iterator miter;
+  map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
   for (miter = m.begin(); miter != m.end(); ++miter) {
     list<cls_log_entry>& entries = miter->second.second;
 
@@ -1136,8 +1171,8 @@ int RGWDataChangesLog::renew_entries()
     utime_t expiration = now;
     expiration += utime_t(cct->_conf->rgw_data_log_window, 0);
 
-    list<string>& buckets = miter->second.first;
-    list<string>::iterator liter;
+    list<rgw_bucket_shard>& buckets = miter->second.first;
+    list<rgw_bucket_shard>::iterator liter;
     for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
       update_renewed(*liter, expiration);
     }
@@ -1146,39 +1181,41 @@ int RGWDataChangesLog::renew_entries()
   return 0;
 }
 
-void RGWDataChangesLog::_get_change(string& bucket_name, ChangeStatusPtr& status)
+void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
 {
   assert(lock.is_locked());
-  if (!changes.find(bucket_name, status)) {
+  if (!changes.find(bs, status)) {
     status = ChangeStatusPtr(new ChangeStatus);
-    changes.add(bucket_name, status);
+    changes.add(bs, status);
   }
 }
 
-void RGWDataChangesLog::register_renew(rgw_bucket& bucket)
+void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
 {
   Mutex::Locker l(lock);
-  cur_cycle[bucket.name] = bucket;
+  cur_cycle[bs] = true;
 }
 
-void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration)
+void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, utime_t& expiration)
 {
   Mutex::Locker l(lock);
   ChangeStatusPtr status;
-  _get_change(bucket_name, status);
+  _get_change(bs, status);
 
-  ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bucket_name << " expiration=" << expiration << dendl;
+  ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
   status->cur_expiration = expiration;
 }
 
-int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
+int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
   if (!store->need_to_log_data())
     return 0;
 
+  rgw_bucket_shard bs(bucket, shard_id);
+
   lock.Lock();
 
   ChangeStatusPtr status;
-  _get_change(bucket.name, status);
+  _get_change(bs, status);
 
   lock.Unlock();
 
@@ -1186,13 +1223,13 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
 
   status->lock->Lock();
 
-  ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
+  ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
 
   if (now < status->cur_expiration) {
     /* no need to send, recently completed */
     status->lock->Unlock();
 
-    register_renew(bucket);
+    register_renew(bs);
     return 0;
   }
 
@@ -1209,7 +1246,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
     int ret = cond->wait();
     cond->put();
     if (!ret) {
-      register_renew(bucket);
+      register_renew(bs);
     }
     return ret;
   }
@@ -1217,7 +1254,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
   status->cond = new RefCountedCond;
   status->pending = true;
 
-  string& oid = oids[choose_oid(bucket)];
+  string& oid = oids[choose_oid(bs)];
   utime_t expiration;
 
   int ret;
@@ -1234,6 +1271,11 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
     rgw_data_change change;
     change.entity_type = ENTITY_TYPE_BUCKET;
     change.key = bucket.name + ":" + bucket.bucket_id;
+    if (shard_id >= 0) {
+      char buf[16];
+      snprintf(buf, sizeof(buf), ":%d", shard_id);
+      change.key += buf;
+    }
     change.timestamp = now;
     ::encode(change, bl);
     string section;
@@ -1686,7 +1728,7 @@ public:
 
     objv_tracker = bci.info.objv_tracker;
 
-    ret = store->init_bucket_index(bci.info.bucket);
+    ret = store->init_bucket_index(bci.info.bucket, bci.info.num_shards);
     if (ret < 0)
       return ret;
 
index 3bdd68c057d0d0256bbc361422d99d9a3041a4f0..d0c2f4b184938a9cd7cebfddd5d31e51e3d18e40 100644 (file)
@@ -32,6 +32,8 @@ extern int rgw_bucket_instance_store_info(RGWRados *store, string& oid, bufferli
                                  map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
                                  time_t mtime);
 
+extern int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id);
+
 extern int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker);
 
 extern int rgw_bucket_delete_bucket_obj(RGWRados *store, string& bucket_name, RGWObjVersionTracker& objv_tracker);
@@ -314,13 +316,13 @@ class RGWDataChangesLog {
 
   typedef ceph::shared_ptr<ChangeStatus> ChangeStatusPtr;
 
-  lru_map<string, ChangeStatusPtr> changes;
+  lru_map<rgw_bucket_shard, ChangeStatusPtr> changes;
 
-  map<string, rgw_bucket> cur_cycle;
+  map<rgw_bucket_shard, bool> cur_cycle;
 
-  void _get_change(string& bucket_name, ChangeStatusPtr& status);
-  void register_renew(rgw_bucket& bucket);
-  void update_renewed(string& bucket_name, utime_t& expiration);
+  void _get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status);
+  void register_renew(rgw_bucket_shard& bs);
+  void update_renewed(rgw_bucket_shard& bs, utime_t& expiration);
 
   class ChangesRenewThread : public Thread {
     CephContext *cct;
@@ -362,8 +364,8 @@ public:
 
   ~RGWDataChangesLog();
 
-  int choose_oid(rgw_bucket& bucket);
-  int add_entry(rgw_bucket& bucket);
+  int choose_oid(const rgw_bucket_shard& bs);
+  int add_entry(rgw_bucket& bucket, int shard_id);
   int renew_entries();
   int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
                   list<rgw_data_change>& entries,
index dfe3361a83997ddbf60ea7e5a1e0b705a1e11fa7..a36d89de2a06861aa1a6d6f20a5ad624538023fc 100644 (file)
@@ -26,6 +26,8 @@
 
 PerfCounters *perfcounter = NULL;
 
+const uint32_t RGWBucketInfo::NUM_SHARDS_BLIND_BUCKET(UINT32_MAX);
+
 int rgw_perf_start(CephContext *cct)
 {
   PerfCountersBuilder plb(cct, cct->_conf->name.to_str(), l_rgw_first, l_rgw_last);
index d9175e8dc93e1e6e84cbae33ac097ac0d3038d21..a4c9b41ef167b8b956f69cd7583970ed490418f5 100644 (file)
@@ -140,6 +140,10 @@ using ceph::crypto::MD5;
 #define ERR_USER_SUSPENDED       2100
 #define ERR_INTERNAL_ERROR       2200
 
+#ifndef UINT32_MAX
+#define UINT32_MAX (4294967295)
+#endif
+
 typedef void *RGWAccessHandle;
 
 
@@ -677,6 +681,25 @@ inline ostream& operator<<(ostream& out, const rgw_bucket &b) {
   return out;
 }
 
+struct rgw_bucket_shard {
+  rgw_bucket bucket;
+  int shard_id;
+
+  rgw_bucket_shard() : shard_id(-1) {}
+  rgw_bucket_shard(rgw_bucket& _b, int _sid) : bucket(_b), shard_id(_sid) {}
+
+  bool operator<(const rgw_bucket_shard& b) const {
+    if (bucket < b.bucket) {
+      return true;
+    }
+    if (b.bucket < bucket) {
+      return false;
+    }
+    return shard_id < b.shard_id;
+  }
+};
+
+
 struct RGWObjVersionTracker {
   obj_version read_version;
   obj_version write_version;
@@ -721,6 +744,10 @@ enum RGWBucketFlags {
 
 struct RGWBucketInfo
 {
+  enum BIShardsHashType {
+    MOD = 0
+  };
+
   rgw_bucket bucket;
   string owner;
   uint32_t flags;
@@ -732,8 +759,20 @@ struct RGWBucketInfo
   obj_version ep_objv; /* entry point object version, for runtime tracking only */
   RGWQuotaInfo quota;
 
+  // Represents the number of bucket index object shards:
+  //   - value of 0 indicates there is no sharding (this is by default before this
+  //     feature is implemented).
+  //   - value of UINT32_T::MAX indicates this is a blind bucket.
+  uint32_t num_shards;
+
+  // Represents the bucket index shard hash type.
+  uint8_t bucket_index_shard_hash_type;
+
+  // Represents the shard number for blind bucket.
+  const static uint32_t NUM_SHARDS_BLIND_BUCKET;
+
   void encode(bufferlist& bl) const {
-     ENCODE_START(9, 4, bl);
+     ENCODE_START(11, 4, bl);
      ::encode(bucket, bl);
      ::encode(owner, bl);
      ::encode(flags, bl);
@@ -743,6 +782,8 @@ struct RGWBucketInfo
      ::encode(placement_rule, bl);
      ::encode(has_instance_obj, bl);
      ::encode(quota, bl);
+     ::encode(num_shards, bl);
+     ::encode(bucket_index_shard_hash_type, bl);
      ENCODE_FINISH(bl);
   }
   void decode(bufferlist::iterator& bl) {
@@ -765,6 +806,10 @@ struct RGWBucketInfo
        ::decode(has_instance_obj, bl);
      if (struct_v >= 9)
        ::decode(quota, bl);
+     if (struct_v >= 10)
+       ::decode(num_shards, bl);
+     if (struct_v >= 11)
+       ::decode(bucket_index_shard_hash_type, bl);
      DECODE_FINISH(bl);
   }
   void dump(Formatter *f) const;
@@ -772,7 +817,7 @@ struct RGWBucketInfo
 
   void decode_json(JSONObj *obj);
 
-  RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false) {}
+  RGWBucketInfo() : flags(0), creation_time(0), has_instance_obj(false), num_shards(0), bucket_index_shard_hash_type(MOD) {}
 };
 WRITE_CLASS_ENCODER(RGWBucketInfo)
 
@@ -1022,6 +1067,9 @@ public:
 
   bool in_extra_data; /* in-memory only member, does not serialize */
 
+  // Represents the hash index source for this object once it is set (non-empty)
+  std::string index_hash_source;
+
   rgw_obj() : in_extra_data(false) {}
   rgw_obj(const char *b, const char *o) : in_extra_data(false) {
     rgw_bucket _b(b);
@@ -1120,6 +1168,9 @@ public:
       return orig_key;
   }
 
+  string& get_hash_object() {
+    return index_hash_source.empty() ? object : index_hash_source;
+  }
   /**
    * Translate a namespace-mangled object name to the user-facing name
    * existing in the given namespace.
index cd731b78a5920a36a14055e8baf0a563ff2e1c41..f4ce380d20cdc9616663f3a02a977037185c8e72 100644 (file)
@@ -545,6 +545,8 @@ void RGWBucketInfo::dump(Formatter *f) const
   encode_json("placement_rule", placement_rule, f);
   encode_json("has_instance_obj", has_instance_obj, f);
   encode_json("quota", quota, f);
+  encode_json("num_shards", num_shards, f);
+  encode_json("bi_shard_hash_type", (uint32_t)bucket_index_shard_hash_type, f);
 }
 
 void RGWBucketInfo::decode_json(JSONObj *obj) {
@@ -556,6 +558,10 @@ void RGWBucketInfo::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("placement_rule", placement_rule, obj);
   JSONDecoder::decode_json("has_instance_obj", has_instance_obj, obj);
   JSONDecoder::decode_json("quota", quota, obj);
+  JSONDecoder::decode_json("num_shards", num_shards, obj);
+  uint32_t hash_type;
+  JSONDecoder::decode_json("bi_shard_hash_type", hash_type, obj);
+  bucket_index_shard_hash_type = (uint8_t)hash_type;
 }
 
 void RGWObjEnt::dump(Formatter *f) const
@@ -658,6 +664,7 @@ void RGWZone::dump(Formatter *f) const
   encode_json("endpoints", endpoints, f);
   encode_json("log_meta", log_meta, f);
   encode_json("log_data", log_data, f);
+  encode_json("bucket_index_max_shards", bucket_index_max_shards, f);
 }
 
 void RGWZone::decode_json(JSONObj *obj)
@@ -666,6 +673,7 @@ void RGWZone::decode_json(JSONObj *obj)
   JSONDecoder::decode_json("endpoints", endpoints, obj);
   JSONDecoder::decode_json("log_meta", log_meta, obj);
   JSONDecoder::decode_json("log_data", log_data, obj);
+  JSONDecoder::decode_json("bucket_index_max_shards", bucket_index_max_shards, obj);
 }
 
 void RGWRegionPlacementTarget::dump(Formatter *f) const
index 11bb93d2d57a0b3df697a3e1e4877ebac0c69157..679b70bd14c2348750d2435742850f457be310df 100644 (file)
@@ -323,7 +323,14 @@ static int rgw_build_policies(RGWRados *store, struct req_state *s, bool only_bu
   string obj_str;
   RGWUserInfo bucket_owner_info;
 
-  s->bucket_instance_id = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance");
+  string bi = s->info.args.get(RGW_SYS_PARAM_PREFIX "bucket-instance");
+  if (!bi.empty()) {
+    int shard_id;
+    ret = rgw_bucket_parse_bucket_instance(bi, &s->bucket_instance_id, &shard_id);
+    if (ret < 0) {
+      return ret;
+    }
+  }
 
   s->bucket_acl = new RGWAccessControlPolicy(s->cct);
 
@@ -1454,6 +1461,7 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx, string
   }
 
   head_obj = manifest_gen.get_cur_obj();
+  head_obj.index_hash_source = obj_str;
   cur_obj = head_obj;
 
   return 0;
@@ -2531,6 +2539,7 @@ void RGWInitMultipart::execute()
     obj.init_ns(s->bucket, tmp_obj_name, mp_ns);
     // the meta object will be indexed with 0 size, we c
     obj.set_in_extra_data(true);
+    obj.index_hash_source = s->object_str;
     ret = store->put_obj_meta(s->obj_ctx, obj, 0, NULL, attrs, RGW_OBJ_CATEGORY_MULTIMETA, PUT_OBJ_CREATE_EXCL, s->owner.get_id());
   } while (ret == -EEXIST);
 }
@@ -2739,6 +2748,7 @@ void RGWCompleteMultipart::execute()
 
   meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
   meta_obj.set_in_extra_data(true);
+  meta_obj.index_hash_source = s->object_str;
 
   ret = get_obj_attrs(store, s, meta_obj, attrs, NULL, NULL);
   if (ret < 0) {
@@ -2890,6 +2900,7 @@ void RGWAbortMultipart::execute()
         string oid = mp.get_part(obj_iter->second.num);
         rgw_obj obj;
         obj.init_ns(s->bucket, oid, mp_ns);
+        obj.index_hash_source = s->object_str;
         ret = store->delete_obj(s->obj_ctx, owner, obj);
         if (ret < 0 && ret != -ENOENT)
           return;
@@ -2898,6 +2909,7 @@ void RGWAbortMultipart::execute()
         RGWObjManifest::obj_iterator oiter;
         for (oiter = manifest.obj_begin(); oiter != manifest.obj_end(); ++oiter) {
           rgw_obj loc = oiter.get_location();
+          loc.index_hash_source = s->object_str;
           ret = store->delete_obj(s->obj_ctx, owner, loc);
           if (ret < 0 && ret != -ENOENT)
             return;
@@ -2909,6 +2921,7 @@ void RGWAbortMultipart::execute()
   // and also remove the metadata obj
   meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
   meta_obj.set_in_extra_data(true);
+  meta_obj.index_hash_source = s->object_str;
   ret = store->delete_obj(s->obj_ctx, owner, meta_obj);
   if (ret == -ENOENT) {
     ret = -ERR_NO_SUCH_BUCKET;
index a48ce69890bdf34031808c239e42cb9f409c32b8..910da2fffb7c2f15d30da68aee2856c4e4d1dfc8 100644 (file)
@@ -318,8 +318,8 @@ int RGWBucketStatsCache::fetch_stats_from_storage(const string& user, rgw_bucket
 {
   RGWBucketInfo bucket_info;
 
-  uint64_t bucket_ver;
-  uint64_t master_ver;
+  string bucket_ver;
+  string master_ver;
 
   map<RGWObjCategory, RGWStorageStats> bucket_stats;
   int r = store->get_bucket_stats(bucket, &bucket_ver, &master_ver, bucket_stats, NULL);
index bb110142f49128c275be17a64bee4c6fffc8e6a1..164363f83034a144aed94130cb1300702241f80a 100644 (file)
@@ -19,6 +19,7 @@
 #include "rgw_metadata.h"
 #include "rgw_bucket.h"
 
+#include "cls/rgw/cls_rgw_ops.h"
 #include "cls/rgw/cls_rgw_types.h"
 #include "cls/rgw/cls_rgw_client.h"
 #include "cls/refcount/cls_refcount_client.h"
@@ -48,6 +49,8 @@ using namespace librados;
 
 #define dout_subsys ceph_subsys_rgw
 
+#define MAX_BUCKET_INDEX_SHARDS_PRIME 7877
+
 using namespace std;
 
 static RGWCache<RGWRados> cached_rados_provider;
@@ -77,7 +80,6 @@ static RGWObjCategory main_category = RGW_OBJ_CATEGORY_MAIN;
 
 #define RGW_STATELOG_OBJ_PREFIX "statelog."
 
-
 #define dout_subsys ceph_subsys_rgw
 
 void RGWDefaultRegionInfo::dump(Formatter *f) const {
@@ -1451,6 +1453,15 @@ int RGWRados::init_complete()
 
   quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
 
+  bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
+                             zone_public_config.bucket_index_max_shards);
+  if (bucket_index_max_shards > MAX_BUCKET_INDEX_SHARDS_PRIME) {
+    bucket_index_max_shards = MAX_BUCKET_INDEX_SHARDS_PRIME;
+    ldout(cct, 1) << __func__ << " bucket index max shards is too large, reset to value: "
+      << MAX_BUCKET_INDEX_SHARDS_PRIME << dendl;
+  }
+  ldout(cct, 20) << __func__ << " bucket index max shards: " << bucket_index_max_shards << dendl;
+
   return ret;
 }
 
@@ -1672,6 +1683,15 @@ int RGWRados::open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& da
   return 0;
 }
 
+void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
+      string *marker) {
+  if (marker) {
+    *marker = shard_id_str;
+    marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR);
+    marker->append(shard_marker);
+  }
+}
+
 int RGWRados::open_bucket_index_ctx(rgw_bucket& bucket, librados::IoCtx& index_ctx)
 {
   int r = open_bucket_pool_ctx(bucket.name, bucket.index_pool, index_ctx);
@@ -1994,7 +2014,7 @@ void RGWRados::shard_name(const string& prefix, unsigned max_shards, const strin
   name = prefix + buf;
 }
 
-void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl)
+void RGWRados::time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl)
 {
   cls_log_add_prepare_entry(entry, ut, section, key, bl);
 }
@@ -2322,6 +2342,10 @@ int RGWRados::list_objects(rgw_bucket& bucket, int max, string& prefix, string&
       result.push_back(ent);
       count++;
     }
+
+    // Either the back-end telling us truncated, or we don't consume all
+    // items returned per the amount caller request
+    truncated = (truncated || eiter != ent_map.end());
   }
 
 done:
@@ -2358,7 +2382,7 @@ int RGWRados::create_pool(rgw_bucket& bucket)
   return 0;
 }
 
-int RGWRados::init_bucket_index(rgw_bucket& bucket)
+int RGWRados::init_bucket_index(rgw_bucket& bucket, int num_shards)
 {
   librados::IoCtx index_ctx; // context for new bucket
 
@@ -2369,13 +2393,10 @@ int RGWRados::init_bucket_index(rgw_bucket& bucket)
   string dir_oid =  dir_oid_prefix;
   dir_oid.append(bucket.marker);
 
-  librados::ObjectWriteOperation op;
-  op.create(true);
-  r = cls_rgw_init_index(index_ctx, op, dir_oid);
-  if (r < 0 && r != -EEXIST)
-    return r;
+  map<int, string> bucket_objs;
+  get_bucket_index_objects(dir_oid, num_shards, bucket_objs);
 
-  return 0;
+  return CLSRGWIssueBucketIndexInit(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
 }
 
 /**
@@ -2426,7 +2447,7 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
     string dir_oid =  dir_oid_prefix;
     dir_oid.append(bucket.marker);
 
-    r = init_bucket_index(bucket);
+    r = init_bucket_index(bucket, bucket_index_max_shards);
     if (r < 0)
       return r;
 
@@ -2442,6 +2463,8 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
     info.owner = owner.user_id;
     info.region = region_name;
     info.placement_rule = selected_placement_rule;
+    info.num_shards = bucket_index_max_shards;
+    info.bucket_index_shard_hash_type = RGWBucketInfo::MOD;
     if (!creation_time)
       time(&info.creation_time);
     else
@@ -2470,11 +2493,16 @@ int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
 
         /* remove bucket index */
         librados::IoCtx index_ctx; // context for new bucket
-        int r = open_bucket_index_ctx(bucket, index_ctx);
+        map<int, string> bucket_objs;
+        int r = open_bucket_index(bucket, index_ctx, bucket_objs);
         if (r < 0)
           return r;
 
-        index_ctx.remove(dir_oid);
+        map<int, string>::const_iterator biter;
+        for (biter = bucket_objs.begin(); biter != bucket_objs.end(); ++biter) {
+          // Do best effort removal
+          index_ctx.remove(biter->second);
+        }
       }
       /* ret == -ENOENT here */
     }
@@ -2807,6 +2835,25 @@ int RGWRados::get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bu
   return 0;
 }
 
+int RGWRados::BucketShard::init(rgw_bucket& _bucket, rgw_obj& obj)
+{
+  bucket = _bucket;
+
+  if (store->bucket_is_system(bucket)) {
+    return 0;
+  }
+
+  int ret = store->open_bucket_index_shard(bucket, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
+    return ret;
+  }
+  ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
+
+  return 0;
+}
+
+
 /**
  * Write/overwrite an object to the bucket storage.
  * bucket: the bucket to store the object in
@@ -2928,7 +2975,14 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj,  uint64_t size,
     index_tag = state->write_tag;
   }
 
-  r = prepare_update_index(NULL, bucket, CLS_RGW_OP_ADD, obj, index_tag);
+  librados::IoCtx index_ctx;
+  BucketShard bs(this);
+  r = bs.init(bucket, obj);
+  if (r < 0) {
+    return r;
+  }
+
+  r = prepare_update_index(NULL, bs, CLS_RGW_OP_ADD, obj, index_tag);
   if (r < 0)
     return r;
 
@@ -2950,8 +3004,8 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj,  uint64_t size,
     ldout(cct, 0) << "ERROR: complete_atomic_overwrite returned r=" << r << dendl;
   }
 
-  r = complete_update_index(bucket, obj.object, index_tag, poolid, epoch, size,
-                            ut, etag, content_type, &acl_bl, category, remove_objs);
+  r = complete_update_index(bs, obj, index_tag, poolid, epoch, size,
+      ut, etag, content_type, &acl_bl, category, remove_objs);
   if (r < 0)
     goto done_cancel;
 
@@ -2967,7 +3021,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj,  uint64_t size,
   return 0;
 
 done_cancel:
-  int ret = complete_update_index_cancel(bucket, obj.object, index_tag);
+  int ret = complete_update_index_cancel(bs, obj, index_tag);
   if (ret < 0) {
     ldout(cct, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl;
   }
@@ -3765,7 +3819,86 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
   return 0;
 }
 
-static void translate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
+int RGWRados::open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+    string& bucket_oid_base) {
+  if (bucket_is_system(bucket))
+    return -EINVAL;
+
+  int r = open_bucket_index_ctx(bucket, index_ctx);
+  if (r < 0)
+    return r;
+
+  if (bucket.marker.empty()) {
+    ldout(cct, 0) << "ERROR: empty marker for bucket operation" << dendl;
+    return -EIO;
+  }
+
+  bucket_oid_base = dir_oid_prefix;
+  bucket_oid_base.append(bucket.marker);
+
+  return 0;
+
+}
+
+int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+    map<int, string>& bucket_objs, int shard_id, map<int, string> *bucket_instance_ids) {
+  string bucket_oid_base;
+  int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
+  if (ret < 0)
+    return ret;
+
+  // Get the bucket info
+  RGWBucketInfo binfo;
+  ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
+  if (ret < 0)
+    return ret;
+
+  get_bucket_index_objects(bucket_oid_base, binfo.num_shards, bucket_objs, shard_id);
+  if (bucket_instance_ids) {
+    get_bucket_instance_ids(binfo, shard_id, bucket_instance_ids);
+  }
+  return 0;
+}
+
+template<typename T>
+int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+                                map<int, string>& oids, map<int, T>& bucket_objs,
+                                int shard_id, map<int, string> *bucket_instance_ids)
+{
+  int ret = open_bucket_index(bucket, index_ctx, oids, shard_id, bucket_instance_ids);
+  if (ret < 0)
+    return ret;
+
+  map<int, string>::const_iterator iter = oids.begin();
+  for (; iter != oids.end(); ++iter) {
+    bucket_objs[iter->first] = T();
+  }
+  return 0;
+}
+
+int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+    const string& obj_key, string *bucket_obj, int *shard_id)
+{
+  string bucket_oid_base;
+  int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
+  if (ret < 0)
+    return ret;
+
+  // Get the bucket info
+  RGWBucketInfo binfo;
+  ret = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
+  if (ret < 0)
+    return ret;
+
+  ret = get_bucket_index_object(bucket_oid_base, obj_key, binfo.num_shards,
+        (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj, shard_id);
+  if (ret < 0) {
+    ldout(cct, 10) << "get_bucket_index_object() returned ret=" << ret << dendl;
+  }
+  return 0;
+}
+
+static void accumulate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategory, RGWStorageStats>& stats)
 {
   map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
   for (; iter != header.stats.end(); ++iter) {
@@ -3773,9 +3906,9 @@ static void translate_raw_stats(rgw_bucket_dir_header& header, map<RGWObjCategor
     RGWStorageStats& s = stats[category];
     struct rgw_bucket_category_stats& header_stats = iter->second;
     s.category = (RGWObjCategory)iter->first;
-    s.num_kb = ((header_stats.total_size + 1023) / 1024);
-    s.num_kb_rounded = ((header_stats.total_size_rounded + 1023) / 1024);
-    s.num_objects = header_stats.num_entries;
+    s.num_kb += ((header_stats.total_size + 1023) / 1024);
+    s.num_kb_rounded += ((header_stats.total_size_rounded + 1023) / 1024);
+    s.num_objects += header_stats.num_entries;
   }
 }
 
@@ -3784,21 +3917,24 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket,
                                 map<RGWObjCategory, RGWStorageStats> *calculated_stats)
 {
   librados::IoCtx index_ctx;
-  string oid;
-
-  int ret = open_bucket_index(bucket, index_ctx, oid);
+  // key - bucket index object id
+  // value - bucket index check OP returned result with the given bucket index object (shard)
+  map<int, string> oids;
+  map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
+  int ret = open_bucket_index(bucket, index_ctx, oids, bucket_objs_ret);
   if (ret < 0)
     return ret;
 
-  rgw_bucket_dir_header existing_header;
-  rgw_bucket_dir_header calculated_header;
-
-  ret = cls_rgw_bucket_check_index_op(index_ctx, oid, &existing_header, &calculated_header);
+  ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
   if (ret < 0)
     return ret;
 
-  translate_raw_stats(existing_header, *existing_stats);
-  translate_raw_stats(calculated_header, *calculated_stats);
+  // Aggregate results (from different shards if there is any)
+  map<int, struct rgw_cls_check_index_ret>::iterator iter;
+  for (iter = bucket_objs_ret.begin(); iter != bucket_objs_ret.end(); ++iter) {
+    accumulate_raw_stats(iter->second.existing_header, *existing_stats);
+    accumulate_raw_stats(iter->second.calculated_header, *calculated_stats);
+  }
 
   return 0;
 }
@@ -3806,13 +3942,12 @@ int RGWRados::bucket_check_index(rgw_bucket& bucket,
 int RGWRados::bucket_rebuild_index(rgw_bucket& bucket)
 {
   librados::IoCtx index_ctx;
-  string oid;
-
-  int ret = open_bucket_index(bucket, index_ctx, oid);
-  if (ret < 0)
-    return ret;
+  map<int, string> bucket_objs;
+  int r = open_bucket_index(bucket, index_ctx, bucket_objs);
+  if (r < 0)
+    return r;
 
-  return cls_rgw_bucket_rebuild_index_op(index_ctx, oid);
+  return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
 }
 
 
@@ -3874,8 +4009,14 @@ int RGWRados::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& ob
 
   bool ret_not_existed = (state && !state->exists);
 
+  BucketShard bs(this);
+  r = bs.init(bucket, obj);
+  if (r < 0) {
+    return r;
+  }
+
   string tag;
-  r = prepare_update_index(state, bucket, CLS_RGW_OP_DEL, obj, tag);
+  r = prepare_update_index(state, bs, CLS_RGW_OP_DEL, obj, tag);
   if (r < 0)
     return r;
 
@@ -3890,9 +4031,9 @@ int RGWRados::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& ob
   int64_t poolid = ref.ioctx.get_id();
   if (r >= 0 || r == -ENOENT) {
     uint64_t epoch = ref.ioctx.get_last_version();
-    r = complete_update_index_del(bucket, obj.object, tag, poolid, epoch);
+    r = complete_update_index_del(bs, obj, tag, poolid, epoch);
   } else {
-    int ret = complete_update_index_cancel(bucket, obj.object, tag);
+    int ret = complete_update_index_cancel(bs, obj, tag);
     if (ret < 0) {
       ldout(cct, 0) << "ERROR: complete_update_index_cancel returned ret=" << ret << dendl;
     }
@@ -3950,8 +4091,14 @@ int RGWRados::delete_obj_index(rgw_obj& obj)
   std::string oid, key;
   get_obj_bucket_and_oid_key(obj, bucket, oid, key);
 
+  BucketShard bs(this);
+  int r = bs.init(bucket, obj);
+  if (r < 0) {
+    return r;
+  }
+
   string tag;
-  int r = complete_update_index_del(bucket, obj.object, tag, -1 /* pool */, 0);
+  r = complete_update_index_del(bs, obj, tag, -1 /* pool */, 0);
 
   return r;
 }
@@ -4287,9 +4434,17 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj,
   if (!op.size())
     return 0;
 
+  librados::IoCtx index_ctx;
+  BucketShard bs(this);
+  r = bs.init(bucket, obj);
+  if (r < 0) {
+    ldout(cct, 10) << "bs.init() returned r=" << r << dendl;
+    return r;
+  }
+
   string tag;
   if (state) {
-    r = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, obj, tag);
+    r = prepare_update_index(state, bs, CLS_RGW_OP_ADD, obj, tag);
     if (r < 0)
       return r;
   }
@@ -4305,10 +4460,10 @@ int RGWRados::set_attrs(void *ctx, rgw_obj& obj,
       uint64_t epoch = ref.ioctx.get_last_version();
       int64_t poolid = ref.ioctx.get_id();
       utime_t mtime = ceph_clock_now(cct);
-      r = complete_update_index(bucket, obj.object, tag, poolid, epoch, state->size,
+      r = complete_update_index(bs, obj, tag, poolid, epoch, state->size,
                                 mtime, etag, content_type, &acl_bl, RGW_OBJ_CATEGORY_MAIN, NULL);
     } else {
-      int ret = complete_update_index_cancel(bucket, obj.object, tag);
+      int ret = complete_update_index_cancel(bs, obj, tag);
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl;
       }
@@ -4504,13 +4659,13 @@ done_err:
   return r;
 }
 
-int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
+int RGWRados::prepare_update_index(RGWObjState *state, BucketShard& bs,
                                    RGWModifyOp op, rgw_obj& obj, string& tag)
 {
-  if (bucket_is_system(bucket))
+  if (bucket_is_system(bs.bucket))
     return 0;
 
-  int ret = data_log->add_entry(obj.bucket);
+  int ret = data_log->add_entry(bs.bucket, bs.shard_id);
   if (ret < 0) {
     lderr(cct) << "ERROR: failed writing data log" << dendl;
     return ret;
@@ -4527,21 +4682,20 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
       append_rand_alpha(cct, tag, tag, 32);
     }
   }
-  ret = cls_obj_prepare_op(bucket, op, tag,
-                               obj.object, obj.key);
+  ret = cls_obj_prepare_op(bs, op, tag, obj.object, obj.key);
 
   return ret;
 }
 
-int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+int RGWRados::complete_update_index(BucketShard& bs, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
                                     utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
                                     list<string> *remove_objs)
 {
-  if (bucket_is_system(bucket))
+  if (bucket_is_system(bs.bucket))
     return 0;
 
   RGWObjEnt ent;
-  ent.name = oid;
+  ent.name = oid.object;
   ent.size = size;
   ent.mtime = ut;
   ent.etag = etag;
@@ -4556,7 +4710,7 @@ int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag
   ent.owner_display_name = owner.get_display_name();
   ent.content_type = content_type;
 
-  int ret = cls_obj_complete_add(bucket, tag, poolid, epoch, ent, category, remove_objs);
+  int ret = cls_obj_complete_add(bs, tag, poolid, epoch, ent, category, remove_objs);
 
   return ret;
 }
@@ -4580,6 +4734,7 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj,
   string etag;
   string content_type;
   bufferlist acl_bl;
+  BucketShard bs(this);
   bool update_index = (category == RGW_OBJ_CATEGORY_MAIN ||
                        category == RGW_OBJ_CATEGORY_MULTIMETA);
 
@@ -4660,8 +4815,13 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj,
   int64_t poolid = io_ctx.get_id();
   int ret;
 
+  ret = bs.init(bucket, dst_obj);
+  if (ret < 0) {
+    goto done;
+  }
+
   if (update_index) {
-    ret = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, dst_obj, tag);
+    ret = prepare_update_index(state, bs, CLS_RGW_OP_ADD, dst_obj, tag);
     if (ret < 0)
       goto done;
   }
@@ -4675,10 +4835,10 @@ done:
 
   if (update_index) {
     if (ret >= 0) {
-      ret = complete_update_index(bucket, dst_obj.object, tag, poolid, epoch, size,
+      ret = complete_update_index(bs, dst_obj, tag, poolid, epoch, size,
                                   ut, etag, content_type, &acl_bl, category, NULL);
     } else {
-      int r = complete_update_index_cancel(bucket, dst_obj.object, tag);
+      int r = complete_update_index_cancel(bs, dst_obj, tag);
       if (r < 0) {
         ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl;
       }
@@ -5377,57 +5537,95 @@ int RGWRados::obj_stat(void *ctx, rgw_obj& obj, uint64_t *psize, time_t *pmtime,
   return 0;
 }
 
-int RGWRados::get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
-                               string *max_marker)
+int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
+    map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker)
 {
-  rgw_bucket_dir_header header;
-  int r = cls_bucket_head(bucket, header);
+  map<string, rgw_bucket_dir_header> headers;
+  map<int, string> bucket_instance_ids;
+  int r = cls_bucket_head(bucket, headers, &bucket_instance_ids);
   if (r < 0)
     return r;
 
-  stats.clear();
-
-  translate_raw_stats(header, stats);
-
-  *bucket_ver = header.ver;
-  *master_ver = header.master_ver;
-
-  if (max_marker)
-    *max_marker = header.max_marker;
-
+  assert(headers.size() == bucket_instance_ids.size());
+
+  map<string, rgw_bucket_dir_header>::iterator iter = headers.begin();
+  map<int, string>::iterator viter = bucket_instance_ids.begin();
+  BucketIndexShardsManager ver_mgr;
+  BucketIndexShardsManager master_ver_mgr;
+  BucketIndexShardsManager marker_mgr;
+  char buf[64];
+  for(; iter != headers.end(); ++iter, ++viter) {
+    accumulate_raw_stats(iter->second, stats);
+    snprintf(buf, sizeof(buf), "%lu", iter->second.ver);
+    ver_mgr.add(viter->first, string(buf));
+    snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver);
+    master_ver_mgr.add(viter->first, string(buf));
+    marker_mgr.add(viter->first, iter->second.max_marker);
+  }
+  ver_mgr.to_string(bucket_ver);
+  master_ver_mgr.to_string(master_ver);
+  marker_mgr.to_string(max_marker);
   return 0;
 }
 
 class RGWGetBucketStatsContext : public RGWGetDirHeader_CB {
   RGWGetBucketStats_CB *cb;
+  uint32_t pendings;
+  map<RGWObjCategory, RGWStorageStats> stats;
+  int ret_code;
+  bool should_cb;
+  Mutex lock;
 
 public:
-  RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb) : cb(_cb) {}
+  RGWGetBucketStatsContext(RGWGetBucketStats_CB *_cb, uint32_t _pendings)
+    : cb(_cb), pendings(_pendings), stats(), ret_code(0), should_cb(true),
+    lock("RGWGetBucketStatsContext") {}
+
   void handle_response(int r, rgw_bucket_dir_header& header) {
-    map<RGWObjCategory, RGWStorageStats> stats;
+    Mutex::Locker l(lock);
+    if (should_cb) {
+      if ( r >= 0) {
+        accumulate_raw_stats(header, stats);
+      } else {
+        ret_code = r;
+      }
 
-    if (r >= 0) {
-      translate_raw_stats(header, stats);
-      cb->set_response(header.ver, header.master_ver, &stats, header.max_marker);
+      // Are we all done?
+      if (--pendings == 0) {
+        if (!ret_code) {
+          cb->set_response(&stats);
+        }
+        cb->handle_response(ret_code);
+        cb->put();
+      }
     }
+  }
 
-    cb->handle_response(r);
-
-    cb->put();
+  void unset_cb() {
+    Mutex::Locker l(lock);
+    should_cb = false;
   }
 };
 
 int RGWRados::get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *ctx)
 {
-  RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx);
-  int r = cls_bucket_head_async(bucket, get_ctx);
+  RGWBucketInfo binfo;
+  int r = get_bucket_instance_info(NULL, bucket, binfo, NULL, NULL);
+  if (r < 0)
+    return r;
+
+  int num_aio = 0;
+  RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, binfo.num_shards);
+  assert(get_ctx);
+  r = cls_bucket_head_async(bucket, get_ctx, &num_aio);
+  get_ctx->put();
   if (r < 0) {
     ctx->put();
-    delete get_ctx;
-    return r;
+    if (num_aio) {
+      get_ctx->unset_cb();
+    }
   }
-
-  return 0;
+  return r;
 }
 
 class RGWGetUserStatsContext : public RGWGetUserHeader_CB {
@@ -5521,7 +5719,7 @@ int RGWRados::get_bucket_instance_info(void *ctx, rgw_bucket& bucket, RGWBucketI
                                        time_t *pmtime, map<string, bufferlist> *pattrs)
 {
   string oid;
-  if (!bucket.oid.empty()) {
+  if (bucket.oid.empty()) {
     get_bucket_meta_oid(bucket, oid);
   } else {
     oid = bucket.oid;
@@ -5836,21 +6034,21 @@ int RGWRados::update_containers_stats(map<string, RGWBucketEnt>& m)
     RGWBucketEnt& ent = iter->second;
     rgw_bucket& bucket = ent.bucket;
 
-    rgw_bucket_dir_header header;
-    int r = cls_bucket_head(bucket, header);
+    map<string, rgw_bucket_dir_header> headers;
+    int r = cls_bucket_head(bucket, headers);
     if (r < 0)
       return r;
 
-    ent.count = 0;
-    ent.size = 0;
-
-    RGWObjCategory category = main_category;
-    map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.find((uint8_t)category);
-    if (iter != header.stats.end()) {
-      struct rgw_bucket_category_stats& stats = iter->second;
-      ent.count = stats.num_entries;
-      ent.size = stats.total_size;
-      ent.size_rounded = stats.total_size_rounded;
+    map<string, rgw_bucket_dir_header>::iterator hiter = headers.begin();
+    for (; hiter != headers.end(); ++hiter) {
+      RGWObjCategory category = main_category;
+      map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = (hiter->second.stats).find((uint8_t)category);
+      if (iter != hiter->second.stats.end()) {
+        struct rgw_bucket_category_stats& stats = iter->second;
+        ent.count += stats.num_entries;
+        ent.size += stats.total_size;
+        ent.size_rounded += stats.total_size_rounded;
+      }
     }
   }
 
@@ -5973,43 +6171,125 @@ int RGWRados::list_raw_objects(rgw_bucket& pool, const string& prefix_filter,
   return oids.size();
 }
 
-int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max,
+int RGWRados::list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max,
                                   std::list<rgw_bi_log_entry>& result, bool *truncated)
 {
+  ldout(cct, 20) << __func__ << ": " << bucket << " marker " << marker << " shard_id=" << shard_id << " max " << max << dendl;
   result.clear();
 
   librados::IoCtx index_ctx;
-  string oid;
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  map<int, string> oids;
+  map<int, cls_rgw_bi_log_list_ret> bi_log_lists;
+  map<int, string> bucket_instance_ids;
+  int r = open_bucket_index(bucket, index_ctx, oids, shard_id, &bucket_instance_ids);
   if (r < 0)
     return r;
 
-  std::list<rgw_bi_log_entry> entries;
-  int ret = cls_rgw_bi_log_list(index_ctx, oid, marker, max - result.size(), entries, truncated);
-  if (ret < 0)
-    return ret;
+  BucketIndexShardsManager marker_mgr;
+  bool has_shards = (oids.size() > 1 || shard_id >= 0);
+  // If there are multiple shards for the bucket index object, the marker
+  // should have the pattern '{shard_id_1}#{shard_marker_1},{shard_id_2}#
+  // {shard_marker_2}...', if there is no sharding, the bi_log_list should
+  // only contain one record, and the key is the bucket instance id.
+  r = marker_mgr.from_string(marker, shard_id);
+  if (r < 0)
+    return r;
+  r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, oids, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
+  if (r < 0)
+    return r;
 
-  std::list<rgw_bi_log_entry>::iterator iter;
-  for (iter = entries.begin(); iter != entries.end(); ++iter) {
-    result.push_back(*iter);
+  vector<string> shard_ids_str;
+  map<int, list<rgw_bi_log_entry>::iterator> vcurrents;
+  map<int, list<rgw_bi_log_entry>::iterator> vends;
+  if (truncated) {
+    *truncated = false;
+  }
+  map<int, cls_rgw_bi_log_list_ret>::iterator miter = bi_log_lists.begin();
+  for (; miter != bi_log_lists.end(); ++miter) {
+    int shard_id = miter->first;
+    vcurrents[shard_id] = miter->second.entries.begin();
+    vends[shard_id] = miter->second.entries.end();
+    if (truncated) {
+      *truncated = (*truncated || miter->second.truncated);
+    }
+  }
+
+  size_t total = 0;
+  bool has_more = true;
+  map<int, list<rgw_bi_log_entry>::iterator>::iterator viter;
+  map<int, list<rgw_bi_log_entry>::iterator>::iterator eiter;
+  while (total < max && has_more) {
+    has_more = false;
+
+    viter = vcurrents.begin();
+    eiter = vends.begin();
+
+    for (; total < max && viter != vcurrents.end(); ++viter, ++eiter) {
+      assert (eiter != vends.end());
+
+      int shard_id = viter->first;
+      list<rgw_bi_log_entry>::iterator& liter = viter->second;
+
+      if (liter == eiter->second){
+        continue;
+      }
+      rgw_bi_log_entry& entry = *(liter);
+      if (has_shards) {
+        char buf[16];
+        snprintf(buf, sizeof(buf), "%d", shard_id);
+        string tmp_id;
+        build_bucket_index_marker(buf, entry.id, &tmp_id);
+        entry.id.swap(tmp_id);
+      }
+      marker_mgr.add(shard_id, entry.id);
+      result.push_back(entry);
+      total++;
+      has_more = true;
+      ++liter;
+    }
+  }
+
+  if (truncated) {
+    for (viter = vcurrents.begin(), eiter = vends.begin(); viter != vcurrents.end(); ++viter, ++eiter) {
+      assert (eiter != vends.end());
+      *truncated = (*truncated || (viter->second != eiter->second));
+    }
+  }
+
+  // Refresh marker, if there are multiple shards, the output will look like
+  // '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...',
+  // if there is no sharding, the simply marker (without oid) is returned
+  if (has_shards) {
+    marker_mgr.to_string(&marker);
+  } else {
+    if (!result.empty()) {
+      marker = result.rbegin()->id;
+    }
   }
 
   return 0;
 }
 
-int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, string& start_marker, string& end_marker)
+int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& start_marker, string& end_marker)
 {
   librados::IoCtx index_ctx;
-  string oid;
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  map<int, string> bucket_objs;
+  int r = open_bucket_index(bucket, index_ctx, bucket_objs, shard_id);
   if (r < 0)
     return r;
 
-  int ret = cls_rgw_bi_log_trim(index_ctx, oid, start_marker, end_marker);
-  if (ret < 0)
-    return ret;
+  BucketIndexShardsManager start_marker_mgr;
+  r = start_marker_mgr.from_string(start_marker, shard_id);
+  if (r < 0)
+    return r;
+  BucketIndexShardsManager end_marker_mgr;
+  r = end_marker_mgr.from_string(end_marker, shard_id);
+  if (r < 0)
+    return r;
 
-  return 0;
+  return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
+      cct->_conf->rgw_bucket_index_max_aio)();
 }
 
 int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)
@@ -6048,34 +6328,20 @@ int RGWRados::cls_rgw_init_index(librados::IoCtx& index_ctx, librados::ObjectWri
   return r;
 }
 
-int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
+int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
                                  string& name, string& locator)
 {
-  librados::IoCtx index_ctx;
-  string oid;
-
-  int r = open_bucket_index(bucket, index_ctx, oid);
-  if (r < 0)
-    return r;
-
   ObjectWriteOperation o;
   cls_rgw_bucket_prepare_op(o, op, tag, name, locator, zone_public_config.log_data);
-  r = index_ctx.operate(oid, &o);
-  return r;
+  int ret = bs.index_ctx.operate(bs.bucket_obj, &o);
+  return ret;
 }
 
-int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
+int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
                                   int64_t pool, uint64_t epoch,
                                   RGWObjEnt& ent, RGWObjCategory category,
                                  list<string> *remove_objs)
 {
-  librados::IoCtx index_ctx;
-  string oid;
-
-  int r = open_bucket_index(bucket, index_ctx, oid);
-  if (r < 0)
-    return r;
-
   ObjectWriteOperation o;
   rgw_bucket_dir_entry_meta dir_meta;
   dir_meta.size = ent.size;
@@ -6092,77 +6358,97 @@ int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& ta
   cls_rgw_bucket_complete_op(o, op, tag, ver, ent.name, dir_meta, remove_objs, zone_public_config.log_data);
 
   AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
-  r = index_ctx.aio_operate(oid, c, &o);
+  int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
   c->release();
-  return r;
+  return ret;
 }
 
-int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag,
+int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
                                    int64_t pool, uint64_t epoch,
                                    RGWObjEnt& ent, RGWObjCategory category,
-                                   list<string> *remove_objs)
+                                   list<string> *remove_obj)
 {
-  return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs);
+  return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj);
 }
 
-int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag,
+int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
                                    int64_t pool, uint64_t epoch,
                                    string& name)
 {
   RGWObjEnt ent;
   ent.name = name;
-  return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
+  return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
 }
 
-int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name)
+int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name)
 {
   RGWObjEnt ent;
   ent.name = name;
-  return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
+  return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
 }
 
 int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout)
 {
   librados::IoCtx index_ctx;
-  string oid;
-
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  map<int, string> bucket_objs;
+  int r = open_bucket_index(bucket, index_ctx, bucket_objs);
   if (r < 0)
     return r;
 
-  ObjectWriteOperation o;
-  cls_rgw_bucket_set_tag_timeout(o, timeout);
-
-  r = index_ctx.operate(oid, &o);
-
-  return r;
+  return CLSRGWIssueSetTagTimeout(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio, timeout)();
 }
 
-int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix,
-                             uint32_t num, map<string, RGWObjEnt>& m,
-                             bool *is_truncated, string *last_entry,
-                             bool (*force_check_filter)(const string&  name))
+int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,
+                             uint32_t num_entries, map<string, RGWObjEnt>& m, bool *is_truncated,
+                             string *last_entry, bool (*force_check_filter)(const string&  name))
 {
-  ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num " << num << dendl;
+  ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num_entries " << num_entries << dendl;
 
   librados::IoCtx index_ctx;
-  string oid;
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  // key   - oid (for different shards if there is any)
+  // value - list result for the corresponding oid (shard), it is filled by the AIO callback
+  map<int, string> oids;
+  map<int, struct rgw_cls_list_ret> list_results;
+  int r = open_bucket_index(bucket, index_ctx, oids);
   if (r < 0)
     return r;
 
-  struct rgw_bucket_dir dir;
-  r = cls_rgw_list_op(index_ctx, oid, start, prefix, num, &dir, is_truncated);
+  r = CLSRGWIssueBucketList(index_ctx, start, prefix, num_entries, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
   if (r < 0)
     return r;
 
-  map<string, struct rgw_bucket_dir_entry>::iterator miter;
-  bufferlist updates;
-  for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) {
-    RGWObjEnt e;
-    rgw_bucket_dir_entry& dirent = miter->second;
+  // Create a list of iterators that are used to iterate each shard
+  vector<map<string, struct rgw_bucket_dir_entry>::iterator> vcurrents(list_results.size());
+  vector<map<string, struct rgw_bucket_dir_entry>::iterator> vends(list_results.size());
+  vector<string> vnames(list_results.size());
+  map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+  *is_truncated = false;
+  for (; iter != list_results.end(); ++iter) {
+    vcurrents.push_back(iter->second.dir.m.begin());
+    vends.push_back(iter->second.dir.m.end());
+    vnames.push_back(oids[iter->first]);
+    *is_truncated = (*is_truncated || iter->second.is_truncated);
+  }
+
+  // Create a map to track the next candidate entry from each shard, if the entry
+  // from a specified shard is selected/erased, the next entry from that shard will
+  // be inserted for next round selection
+  map<string, size_t> candidates;
+  for (size_t i = 0; i < vcurrents.size(); ++i) {
+    if (vcurrents[i] != vends[i]) {
+      candidates[vcurrents[i]->second.name] = i;
+    }
+  }
+
+  map<string, bufferlist> updates;
+  uint32_t count = 0;
+  while (count < num_entries && !candidates.empty()) {
+    // Select the next one
+    int pos = candidates.begin()->second;
+    struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
 
     // fill it in with initial values; we may correct later
+    RGWObjEnt e;
     e.name = dirent.name;
     e.size = dirent.meta.size;
     e.mtime = dirent.meta.mtime;
@@ -6172,44 +6458,53 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix,
     e.content_type = dirent.meta.content_type;
     e.tag = dirent.tag;
 
-    /* oh, that shouldn't happen! */
-    if (e.name.empty()) {
-      ldout(cct, 0) << "WARNING: got empty dirent name, skipping" << dendl;
-      continue;
-    }
-
     bool force_check = force_check_filter && force_check_filter(dirent.name);
-
     if (!dirent.exists || !dirent.pending_map.empty() || force_check) {
       /* there are uncommitted ops. We need to check the current state,
        * and if the tags are old we need to do cleanup as well. */
       librados::IoCtx sub_ctx;
       sub_ctx.dup(index_ctx);
-      r = check_disk_state(sub_ctx, bucket, dirent, e, updates);
-      if (r < 0) {
-        if (r == -ENOENT)
-          continue;
-        else
+      r = check_disk_state(sub_ctx, bucket, dirent, e, updates[vnames[pos]]);
+      if (r < 0 && r != -ENOENT) {
           return r;
       }
     }
-    m[e.name] = e;
-    ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl;
+    if (r >= 0) {
+      m[e.name] = e;
+      ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl;
+      ++count;
+    }
+
+    // Refresh the candidates map
+    candidates.erase(candidates.begin());
+    ++vcurrents[pos];
+    if (vcurrents[pos] != vends[pos]) {
+      candidates[vcurrents[pos]->second.name] = pos;
+    }
   }
 
-  if (dir.m.size()) {
-    *last_entry = dir.m.rbegin()->first;
+  // Suggest updates if there is any
+  map<string, bufferlist>::iterator miter = updates.begin();
+  for (; miter != updates.end(); ++miter) {
+    if (miter->second.length()) {
+      ObjectWriteOperation o;
+      cls_rgw_suggest_changes(o, miter->second);
+      // we don't care if we lose suggested updates, send them off blindly
+      AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+      index_ctx.aio_operate(miter->first, c, &o);
+        c->release();
+    }
   }
 
-  if (updates.length()) {
-    ObjectWriteOperation o;
-    cls_rgw_suggest_changes(o, updates);
-    // we don't care if we lose suggested updates, send them off blindly
-    AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
-    r = index_ctx.aio_operate(oid, c, &o);
-    c->release();
+  // Check if all the returned entries are consumed or not
+  for (size_t i = 0; i < vcurrents.size(); ++i) {
+    if (vcurrents[i] != vends[i])
+      *is_truncated = true;
   }
-  return m.size();
+  if (m.size())
+    *last_entry = m.rbegin()->first;
+
+  return 0;
 }
 
 int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
@@ -6402,34 +6697,45 @@ int RGWRados::check_disk_state(librados::IoCtx io_ctx,
   return 0;
 }
 
-int RGWRados::cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header)
+int RGWRados::cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids)
 {
   librados::IoCtx index_ctx;
-  string oid;
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  map<int, string> oids;
+  map<int, struct rgw_cls_list_ret> list_results;
+  int r = open_bucket_index(bucket, index_ctx, oids, list_results, -1, bucket_instance_ids);
   if (r < 0)
     return r;
 
-  r = cls_rgw_get_dir_header(index_ctx, oid, &header);
+  r = CLSRGWIssueGetDirHeader(index_ctx, oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
   if (r < 0)
     return r;
 
+  map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+  for(; iter != list_results.end(); ++iter) {
+    headers[oids[iter->first]] = iter->second.dir.header;
+  }
   return 0;
 }
 
-int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx)
+int RGWRados::cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio)
 {
   librados::IoCtx index_ctx;
-  string oid;
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  map<int, string> bucket_objs;
+  int r = open_bucket_index(bucket, index_ctx, bucket_objs);
   if (r < 0)
     return r;
 
-  r = cls_rgw_get_dir_header_async(index_ctx, oid, ctx);
-  if (r < 0)
-    return r;
-
-  return 0;
+  map<int, string>::iterator iter = bucket_objs.begin();
+  for (; iter != bucket_objs.end(); ++iter) {
+    r = cls_rgw_get_dir_header_async(index_ctx, iter->second, static_cast<RGWGetDirHeader_CB*>(ctx->get()));
+    if (r < 0) {
+      ctx->put();
+      break;
+    } else {
+      (*num_aio)++;
+    }
+  }
+  return r;
 }
 
 int RGWRados::cls_user_get_header(const string& user_id, cls_user_header *header)
@@ -6480,8 +6786,8 @@ int RGWRados::cls_user_get_header_async(const string& user_id, RGWGetUserHeader_
 
 int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket)
 {
-  rgw_bucket_dir_header header;
-  int r = cls_bucket_head(bucket, header);
+  map<string, struct rgw_bucket_dir_header> headers;
+  int r = cls_bucket_head(bucket, headers);
   if (r < 0) {
     ldout(cct, 20) << "cls_bucket_header() returned " << r << dendl;
     return r;
@@ -6491,12 +6797,15 @@ int RGWRados::cls_user_sync_bucket_stats(rgw_obj& user_obj, rgw_bucket& bucket)
 
   bucket.convert(&entry.bucket);
 
-  map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = header.stats.begin();
-  for (; iter != header.stats.end(); ++iter) {
-    struct rgw_bucket_category_stats& header_stats = iter->second;
-    entry.size += header_stats.total_size;
-    entry.size_rounded += header_stats.total_size_rounded;
-    entry.count += header_stats.num_entries;
+  map<string, struct rgw_bucket_dir_header>::iterator hiter = headers.begin();
+  for (; hiter != headers.end(); ++hiter) {
+    map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = hiter->second.stats.begin();
+    for (; iter != hiter->second.stats.end(); ++iter) {
+      struct rgw_bucket_category_stats& header_stats = iter->second;
+      entry.size += header_stats.total_size;
+      entry.size_rounded += header_stats.total_size_rounded;
+      entry.count += header_stats.num_entries;
+    }
   }
 
   list<cls_user_bucket_entry> entries;
@@ -6715,6 +7024,81 @@ int RGWRados::remove_temp_objects(string date, string time)
   return 0;
 }
 
+void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
+    uint32_t num_shards, map<int, string>& bucket_objects, int shard_id)
+{
+  if (!num_shards) {
+    bucket_objects[0] = bucket_oid_base;
+  } else {
+    char buf[bucket_oid_base.size() + 32];
+    if (shard_id < 0) {
+      for (uint32_t i = 0; i < num_shards; ++i) {
+        snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), i);
+        bucket_objects[i] = buf;
+      }
+    } else {
+      if ((uint32_t)shard_id > num_shards) {
+        return;
+      }
+      snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), shard_id);
+      bucket_objects[shard_id] = buf;
+    }
+  }
+}
+
+void RGWRados::get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result)
+{
+  rgw_bucket& bucket = bucket_info.bucket;
+  string plain_id = bucket.name + ":" + bucket.bucket_id;
+  if (!bucket_info.num_shards) {
+    (*result)[0] = plain_id;
+  } else {
+    char buf[16];
+    if (shard_id < 0) {
+      for (uint32_t i = 0; i < bucket_info.num_shards; ++i) {
+        snprintf(buf, sizeof(buf), ":%d", i);
+        (*result)[i] = plain_id + buf;
+      }
+    } else {
+      if ((uint32_t)shard_id > bucket_info.num_shards) {
+        return;
+      }
+      snprintf(buf, sizeof(buf), ":%d", shard_id);
+      (*result)[shard_id] = plain_id + buf;
+    }
+  }
+}
+
+int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
+    uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id)
+{
+  int r = 0;
+  switch (hash_type) {
+    case RGWBucketInfo::MOD:
+      if (!num_shards) {
+        // By default with no sharding, we use the bucket oid as itself
+        (*bucket_obj) = bucket_oid_base;
+        if (shard_id) {
+          *shard_id = -1;
+        }
+      } else {
+        uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
+        uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
+        sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards;
+        char buf[bucket_oid_base.size() + 32];
+        snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
+        (*bucket_obj) = buf;
+        if (shard_id) {
+          *shard_id = (int)sid;
+        }
+      }
+      break;
+    default:
+      r = -ENOTSUP;
+  }
+  return r;
+}
+
 int RGWRados::process_intent_log(rgw_bucket& bucket, string& oid,
                                 time_t epoch, int flags, bool purge)
 {
index 57338b860163045603f2c65529339dd067b60f01..1071b2f1ae619c0291df1c28c3fb087ec5faa699 100644 (file)
@@ -913,14 +913,24 @@ struct RGWZone {
   bool log_meta;
   bool log_data;
 
-  RGWZone() : log_meta(false), log_data(false) {}
+/**
+ * Represents the number of shards for the bucket index object, a value of zero
+ * indicates there is no sharding. By default (no sharding, the name of the object
+ * is '.dir.{marker}', with sharding, the name is '.dir.{markder}.{sharding_id}',
+ * sharding_id is zero-based value. It is not recommended to set a too large value
+ * (e.g. thousand) as it increases the cost for bucket listing.
+ */
+  uint32_t bucket_index_max_shards;
+
+  RGWZone() : log_meta(false), log_data(false), bucket_index_max_shards(0) {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     ::encode(name, bl);
     ::encode(endpoints, bl);
     ::encode(log_meta, bl);
     ::encode(log_data, bl);
+    ::encode(bucket_index_max_shards, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -932,6 +942,9 @@ struct RGWZone {
       ::decode(log_meta, bl);
       ::decode(log_data, bl);
     }
+    if (struct_v >= 3) {
+      ::decode(bucket_index_max_shards, bl);
+    }
     DECODE_FINISH(bl);
   }
   void dump(Formatter *f) const;
@@ -1190,21 +1203,13 @@ public:
 class RGWGetBucketStats_CB : public RefCountedObject {
 protected:
   rgw_bucket bucket;
-  uint64_t bucket_ver;
-  uint64_t master_ver;
   map<RGWObjCategory, RGWStorageStats> *stats;
-  string max_marker;
 public:
   RGWGetBucketStats_CB(rgw_bucket& _bucket) : bucket(_bucket), stats(NULL) {}
   virtual ~RGWGetBucketStats_CB() {}
   virtual void handle_response(int r) = 0;
-  virtual void set_response(uint64_t _bucket_ver, uint64_t _master_ver,
-                            map<RGWObjCategory, RGWStorageStats> *_stats,
-                            const string &_max_marker) {
-    bucket_ver = _bucket_ver;
-    master_ver = _master_ver;
+  virtual void set_response(map<RGWObjCategory, RGWStorageStats> *_stats) {
     stats = _stats;
-    max_marker = _max_marker;
   }
 };
 
@@ -1261,6 +1266,20 @@ class RGWRados
   int open_bucket_data_ctx(rgw_bucket& bucket, librados::IoCtx&  io_ctx);
   int open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx&  io_ctx);
   int open_bucket_index(rgw_bucket& bucket, librados::IoCtx&  index_ctx, string& bucket_oid);
+  int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx&  index_ctx,
+      string& bucket_oid_base);
+  int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+      const string& obj_key, string *bucket_obj, int *shard_id);
+  int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+      map<int, string>& bucket_objs, int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
+  template<typename T>
+  int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+                        map<int, string>& oids, map<int, T>& bucket_objs,
+                        int shard_id = -1, map<int, string> *bucket_instance_ids = NULL);
+  void build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
+      string *marker);
+
+  void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
 
   struct GetObjState {
     librados::IoCtx io_ctx;
@@ -1294,6 +1313,9 @@ class RGWRados
 
   Mutex bucket_id_lock;
 
+  // This field represents the number of bucket index object shards
+  uint32_t bucket_index_max_shards;
+
   int get_obj_ioctx(const rgw_obj& obj, librados::IoCtx *ioctx);
   int get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bucket, bool ref_system_obj = false);
   uint64_t max_bucket_id;
@@ -1365,7 +1387,9 @@ public:
                gc(NULL), use_gc_thread(false), quota_threads(false),
                num_watchers(0), watchers(NULL), watch_handles(NULL),
                watch_initialized(false),
-               bucket_id_lock("rados_bucket_id"), max_bucket_id(0),
+               bucket_id_lock("rados_bucket_id"),
+               bucket_index_max_shards(0),
+               max_bucket_id(0),
                cct(NULL), rados(NULL),
                pools_initialized(false),
                quota_handler(NULL),
@@ -1479,7 +1503,7 @@ public:
    * create a bucket with name bucket and the given list of attrs
    * returns 0 on success, -ERR# otherwise.
    */
-  virtual int init_bucket_index(rgw_bucket& bucket);
+  virtual int init_bucket_index(rgw_bucket& bucket, int num_shards);
   int select_bucket_placement(RGWUserInfo& user_info, const string& region_name, const std::string& rule,
                               const std::string& bucket_name, rgw_bucket& bucket, string *pselected_rule);
   int select_legacy_bucket_placement(const string& bucket_name, rgw_bucket& bucket);
@@ -1811,8 +1835,8 @@ public:
   }
 
   int decode_policy(bufferlist& bl, ACLOwner *owner);
-  int get_bucket_stats(rgw_bucket& bucket, uint64_t *bucket_ver, uint64_t *master_ver, map<RGWObjCategory, RGWStorageStats>& stats,
-                       string *max_marker);
+  int get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *master_ver,
+      map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker);
   int get_bucket_stats_async(rgw_bucket& bucket, RGWGetBucketStats_CB *cb);
   int get_user_stats(const string& user, RGWStorageStats& stats);
   int get_user_stats_async(const string& user, RGWGetUserStats_CB *cb);
@@ -1836,39 +1860,50 @@ public:
   virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv,
                                      map<string, bufferlist> *pattrs, bool create_entry_point);
 
+  struct BucketShard {
+    RGWRados *store;
+    rgw_bucket bucket;
+    int shard_id;
+    librados::IoCtx index_ctx;
+    string bucket_obj;
+
+    BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
+    int init(rgw_bucket& _bucket, rgw_obj& obj);
+  };
+
   int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
-  int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
-                         string& name, string& locator);
-  int cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
+  int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, string& name, string& locator);
+  int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
                           RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
-  int cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
-  int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name);
-  int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name);
+  int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
+  int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, string& name);
+  int cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name);
   int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout);
-  int cls_bucket_list(rgw_bucket& bucket, string start, string prefix, uint32_t num,
-                      map<string, RGWObjEnt>& m, bool *is_truncated,
-                      string *last_entry, bool (*force_check_filter)(const string&  name) = NULL);
-  int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
-  int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx);
-  int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
+  int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
+                      map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
+                      bool (*force_check_filter)(const string&  name) = NULL);
+  int cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids = NULL);
+  int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio);
+
+  int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard,
                            RGWModifyOp op, rgw_obj& oid, string& tag);
-  int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+  int complete_update_index(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
                             utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
                            list<string> *remove_objs);
-  int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, int64_t pool, uint64_t epoch) {
-    if (bucket_is_system(bucket))
+  int complete_update_index_del(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) {
+    if (bucket_is_system(bucket_shard.bucket))
       return 0;
 
-    return cls_obj_complete_del(bucket, tag, pool, epoch, oid);
+    return cls_obj_complete_del(bucket_shard, tag, pool, epoch, oid.object);
   }
-  int complete_update_index_cancel(rgw_bucket& bucket, string& oid, string& tag) {
-    if (bucket_is_system(bucket))
+  int complete_update_index_cancel(BucketShard& bucket_shard, rgw_obj& oid, string& tag) {
+    if (bucket_is_system(bucket_shard.bucket))
       return 0;
 
-    return cls_obj_complete_cancel(bucket, tag, oid);
+    return cls_obj_complete_cancel(bucket_shard, tag, oid.object);
   }
-  int list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
-  int trim_bi_log_entries(rgw_bucket& bucket, string& marker, string& end_marker);
+  int list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
+  int trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, string& end_marker);
 
   int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info);
   int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
@@ -1877,7 +1912,7 @@ public:
 
   void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name);
   void shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
-  void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, string& section, string& key, bufferlist& bl);
+  void time_log_prepare_entry(cls_log_entry& entry, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
   int time_log_add(const string& oid, list<cls_log_entry>& entries);
   int time_log_add(const string& oid, const utime_t& ut, const string& section, const string& key, bufferlist& bl);
   int time_log_list(const string& oid, utime_t& start_time, utime_t& end_time,
@@ -1945,6 +1980,32 @@ public:
   }
 
  private:
+  /**
+   * This is a helper method, it generates a list of bucket index objects with the given
+   * bucket base oid and number of shards.
+   *
+   * bucket_oid_base [in] - base name of the bucket index object;
+   * num_shards [in] - number of bucket index object shards.
+   * bucket_objs [out] - filled by this method, a list of bucket index objects.
+   */
+  void get_bucket_index_objects(const string& bucket_oid_base, uint32_t num_shards,
+      map<int, string>& bucket_objs, int shard_id = -1);
+
+  /**
+   * Get the bucket index object with the given base bucket index object and object key,
+   * and the number of bucket index shards.
+   *
+   * bucket_oid_base [in] - bucket object base name.
+   * obj_key [in] - object key.
+   * num_shards [in] - number of bucket index shards.
+   * hash_type [in] - type of hash to find the shard ID.
+   * bucket_obj [out] - the bucket index object for the given object.
+   *
+   * Return 0 on success, a failure code otherwise.
+   */
+  int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
+      uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard);
+
   int process_intent_log(rgw_bucket& bucket, string& oid,
                         time_t epoch, int flags, bool purge);
   /**
index 961abc2e188061deec4170b255f699b13e4ba532..d72b6af23c305bf7fdf0b9d56b4ba69939e5481d 100644 (file)
 
 #include "rgw_replica_log.h"
 #include "cls/replica_log/cls_replica_log_client.h"
+#include "cls/rgw/cls_rgw_client.h"
 #include "rgw_rados.h"
 
+#define dout_subsys ceph_subsys_rgw
 
 void RGWReplicaBounds::dump(Formatter *f) const
 {
@@ -132,3 +134,50 @@ RGWReplicaBucketLogger::RGWReplicaBucketLogger(RGWRados *_store) :
   prefix = _store->ctx()->_conf->rgw_replica_log_obj_prefix;
   prefix.append(".");
 }
+
+string RGWReplicaBucketLogger::obj_name(const rgw_bucket& bucket, int shard_id)
+{
+  string s = prefix + bucket.name;
+
+  if (shard_id >= 0) {
+    char buf[16];
+    snprintf(buf, sizeof(buf), ".%d", shard_id);
+    s += buf;
+  }
+  return s;
+}
+
+int RGWReplicaBucketLogger::update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
+                   const string& marker, const utime_t& time,
+                   const list<RGWReplicaItemMarker> *entries)
+{
+  if (shard_id >= 0 ||
+      !BucketIndexShardsManager::is_shards_marker(marker)) {
+    return RGWReplicaLogger::update_bound(obj_name(bucket, shard_id), pool,
+                                          daemon_id, marker, time, entries);
+  }
+
+  BucketIndexShardsManager sm;
+  int ret = sm.from_string(marker, shard_id);
+  if (ret < 0) {
+    ldout(cct, 0) << "ERROR: could not parse shards marker: " << marker << dendl;
+    return ret;
+  }
+
+  map<int, string>& vals = sm.get();
+
+  ret = 0;
+
+  map<int, string>::iterator iter;
+  for (iter = vals.begin(); iter != vals.end(); ++iter) {
+    ldout(cct, 20) << "updating bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
+    int r = RGWReplicaLogger::update_bound(obj_name(bucket, iter->first), pool,
+                                          daemon_id, iter->second, time, entries);
+    if (r < 0) {
+      ldout(cct, 0) << "failed to update bound: bucket=" << bucket << " shard=" << iter->first << " marker=" << marker << dendl;
+      ret = r;
+    }
+  }
+
+  return ret;
+}
index 456b230a6520e3a3f31d1659b0c281be367f12f3..a9adc9eedbe6ff1ee89ccba9c27df522ed47cdd7 100644 (file)
@@ -97,20 +97,19 @@ public:
 class RGWReplicaBucketLogger : private RGWReplicaLogger {
   string pool;
   string prefix;
+
+  string obj_name(const rgw_bucket& bucket, int shard_id);
 public:
   RGWReplicaBucketLogger(RGWRados *_store);
-  int update_bound(const rgw_bucket& bucket, const string& daemon_id,
+  int update_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id,
                    const string& marker, const utime_t& time,
-                   const list<RGWReplicaItemMarker> *entries) {
-    return RGWReplicaLogger::update_bound(prefix+bucket.name, pool,
-                                          daemon_id, marker, time, entries);
-  }
-  int delete_bound(const rgw_bucket& bucket, const string& daemon_id) {
-    return RGWReplicaLogger::delete_bound(prefix+bucket.name, pool,
+                   const list<RGWReplicaItemMarker> *entries);
+  int delete_bound(const rgw_bucket& bucket, int shard_id, const string& daemon_id) {
+    return RGWReplicaLogger::delete_bound(obj_name(bucket, shard_id), pool,
                                           daemon_id);
   }
-  int get_bounds(const rgw_bucket& bucket, RGWReplicaBounds& bounds) {
-    return RGWReplicaLogger::get_bounds(prefix+bucket.name, pool,
+  int get_bounds(const rgw_bucket& bucket, int shard_id, RGWReplicaBounds& bounds) {
+    return RGWReplicaLogger::get_bounds(obj_name(bucket, shard_id), pool,
                                         bounds);
   }
 };
index 9f32fc9ebd3f96a0ded857ab4b14da3f47fdea54..1db6cadb0d8e0335288d75d4161d16cf4be41239 100644 (file)
@@ -282,6 +282,12 @@ void RGWOp_BILog_List::execute() {
     return;
   }
 
+  int shard_id;
+  http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+  if (http_ret < 0) {
+    return;
+  }
+
   if (!bucket_instance.empty()) {
     http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL);
     if (http_ret < 0) {
@@ -307,7 +313,7 @@ void RGWOp_BILog_List::execute() {
   send_response();
   do {
     list<rgw_bi_log_entry> entries;
-    int ret = store->list_bi_log_entries(bucket_info.bucket,
+    int ret = store->list_bi_log_entries(bucket_info.bucket, shard_id,
                                           marker, max_entries - count, 
                                           entries, &truncated);
     if (ret < 0) {
@@ -419,6 +425,13 @@ void RGWOp_BILog_Delete::execute() {
     http_ret = -EINVAL;
     return;
   }
+
+  int shard_id;
+  http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+  if (http_ret < 0) {
+    return;
+  }
+
   if (!bucket_instance.empty()) {
     http_ret = store->get_bucket_instance_info(NULL, bucket_instance, bucket_info, NULL, NULL);
     if (http_ret < 0) {
@@ -432,7 +445,7 @@ void RGWOp_BILog_Delete::execute() {
       return;
     }
   }
-  http_ret = store->trim_bi_log_entries(bucket_info.bucket, start_marker, end_marker);
+  http_ret = store->trim_bi_log_entries(bucket_info.bucket, shard_id, start_marker, end_marker);
   if (http_ret < 0) {
     dout(5) << "ERROR: trim_bi_log_entries() " << dendl;
   }
index ff1bf3466d378558aab620613a97d3a6f903dc68..22221d4078c7dec1e0143c9651a0064a9b41193b 100644 (file)
@@ -38,11 +38,11 @@ public:
 };
 
 class RGWOp_BILog_Info : public RGWRESTOp {
-  uint64_t bucket_ver;
-  uint64_t master_ver;
+  string bucket_ver;
+  string master_ver;
   string max_marker;
 public:
-  RGWOp_BILog_Info() : bucket_ver(0), master_ver(0) {}
+  RGWOp_BILog_Info() : bucket_ver(), master_ver() {}
   ~RGWOp_BILog_Info() {}
 
   int check_caps(RGWUserCaps& caps) {
index e7dd962f0f7647bc0a93e2d3b156a56941a336d5..0309a2cabd25b56bed87cb46fa109978acec7195 100644 (file)
@@ -181,6 +181,13 @@ void RGWOp_BILog_SetBounds::execute() {
     return;
   }
 
+  int shard_id;
+  http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+  if (http_ret < 0) {
+    dout(5) << "failed to parse bucket instance" << dendl;
+    return;
+  }
+
   rgw_bucket bucket;
   if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) 
     return;
@@ -194,7 +201,7 @@ void RGWOp_BILog_SetBounds::execute() {
     return;
   }
 
-  http_ret = rl.update_bound(bucket, daemon_id, marker, ut, &markers);
+  http_ret = rl.update_bound(bucket, shard_id, daemon_id, marker, ut, &markers);
 }
 
 void RGWOp_BILog_GetBounds::execute() {
@@ -206,12 +213,19 @@ void RGWOp_BILog_GetBounds::execute() {
     return;
   }
 
+  int shard_id;
+  http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+  if (http_ret < 0) {
+    dout(5) << "failed to parse bucket instance" << dendl;
+    return;
+  }
+
   rgw_bucket bucket;
   if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) 
     return;
 
   RGWReplicaBucketLogger rl(store);
-  http_ret = rl.get_bounds(bucket, bounds);
+  http_ret = rl.get_bounds(bucket, shard_id, bounds);
 }
 
 void RGWOp_BILog_GetBounds::send_response() {
@@ -237,12 +251,19 @@ void RGWOp_BILog_DeleteBounds::execute() {
     return;
   }
   
+  int shard_id;
+  http_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bucket_instance, &shard_id);
+  if (http_ret < 0) {
+    dout(5) << "failed to parse bucket instance" << dendl;
+    return;
+  }
+
   rgw_bucket bucket;
   if ((http_ret = bucket_instance_to_bucket(store, bucket_instance, bucket)) < 0) 
     return;
   
   RGWReplicaBucketLogger rl(store);
-  http_ret = rl.delete_bound(bucket, daemon_id);
+  http_ret = rl.delete_bound(bucket, shard_id, daemon_id);
 }
 
 RGWOp *RGWHandler_ReplicaLog::op_get() {
index 960cf9c3d02c0099f2545bd57258a036ab2cf2d5..20ac37e9990b503a319a690d050bf802ce9f5b1d 100644 (file)
@@ -559,7 +559,6 @@ void RGWCopyObj_ObjStore_SWIFT::send_response()
 int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, off_t bl_len)
 {
   const char *content_type = NULL;
-  int orig_ret = ret;
   map<string, string> response_attrs;
   map<string, string>::iterator riter;
 
@@ -601,11 +600,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, o
     }
   }
 
-  if (partial_content && !ret)
-    ret = -STATUS_PARTIAL_CONTENT;
-
-  if (ret)
-    set_req_state_err(s, ret);
+  set_req_state_err(s, (partial_content && !ret) ? STATUS_PARTIAL_CONTENT : ret);
   dump_errno(s);
 
   for (riter = response_attrs.begin(); riter != response_attrs.end(); ++riter) {
@@ -619,7 +614,7 @@ int RGWGetObj_ObjStore_SWIFT::send_response_data(bufferlist& bl, off_t bl_ofs, o
   sent_header = true;
 
 send_data:
-  if (get_data && !orig_ret) {
+  if (get_data && !ret) {
     int r = s->cio->write(bl.c_str() + bl_ofs, bl_len);
     if (r < 0)
       return r;
index ca200a840c356426ddc51afab5ce5fc1e189812a..8e1aeaed2f1920a3f38eff9478ac487309d1ad7e 100644 (file)
@@ -849,8 +849,8 @@ bin_DEBUGPROGRAMS += ceph_test_cls_hello
 if WITH_RADOSGW
 ceph_test_cls_rgw_SOURCES = test/cls_rgw/test_cls_rgw.cc
 ceph_test_cls_rgw_LDADD = \
-       $(LIBRADOS) libcls_rgw_client.la \
-       $(LIBCOMMON) $(UNITTEST_LDADD) $(RADOS_TEST_LDADD)
+       $(LIBRADOS) $(CRYPTO_LIBS) libcls_rgw_client.la \
+       $(LIBCOMMON) $(UNITTEST_LDADD) $(CEPH_GLOBAL) $(RADOS_TEST_LDADD)
 ceph_test_cls_rgw_CXXFLAGS = $(UNITTEST_CXXFLAGS)
 bin_DEBUGPROGRAMS += ceph_test_cls_rgw
 endif # WITH_RADOSGW
index 44cb30307245d2117e4adb7cb628d1d08719aa1c..0df7ab2a1874387f42fd2d242b14f2bbf869d9a1 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "include/types.h"
 #include "cls/rgw/cls_rgw_client.h"
+#include "cls/rgw/cls_rgw_ops.h"
 
 #include "gtest/gtest.h"
 #include "test/librados/test.h"
@@ -10,6 +11,7 @@
 #include <errno.h>
 #include <string>
 #include <vector>
+#include <map>
 
 using namespace librados;
 
@@ -66,12 +68,20 @@ public:
 
 void test_stats(librados::IoCtx& ioctx, string& oid, int category, uint64_t num_entries, uint64_t total_size)
 {
-  rgw_bucket_dir_header header;
-  ASSERT_EQ(0, cls_rgw_get_dir_header(ioctx, oid, &header));
-
-  rgw_bucket_category_stats& stats = header.stats[category];
-  ASSERT_EQ(total_size, stats.total_size);
-  ASSERT_EQ(num_entries, stats.num_entries);
+  map<int, struct rgw_cls_list_ret> results;
+  map<int, string> oids;
+  oids[0] = oid;
+  ASSERT_EQ(0, CLSRGWIssueGetDirHeader(ioctx, oids, results, 8)());
+
+  uint64_t entries = 0;
+  uint64_t size = 0;
+  map<int, struct rgw_cls_list_ret>::iterator iter = results.begin();
+  for (; iter != results.end(); ++iter) {
+    entries += (iter->second).dir.header.stats[category].num_entries;
+    size += (iter->second).dir.header.stats[category].total_size;
+  }
+  ASSERT_EQ(total_size, size);
+  ASSERT_EQ(num_entries, entries);
 }
 
 void index_prepare(OpMgr& mgr, librados::IoCtx& ioctx, string& oid, RGWModifyOp index_op, string& tag, string& obj, string& loc)
@@ -339,9 +349,10 @@ TEST(cls_rgw, index_suggest)
     cls_rgw_encode_suggestion(suggest_op, dirent, updates);
   }
 
-  op = mgr.write_op();
-  cls_rgw_bucket_set_tag_timeout(*op, 1); // short tag timeout
-  ASSERT_EQ(0, ioctx.operate(bucket_oid, op));
+  map<int, string> bucket_objs;
+  bucket_objs[0] = bucket_oid;
+  int r = CLSRGWIssueSetTagTimeout(ioctx, bucket_objs, 8 /* max aio */, 1)();
+  ASSERT_EQ(0, r);
 
   sleep(1);