]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Adjust bi log listing to work with multiple bucket shards.
authorGuang Yang <yguang@yahoo-inc.com>
Tue, 23 Sep 2014 23:14:24 +0000 (23:14 +0000)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 14 Jan 2015 03:21:24 +0000 (19:21 -0800)
Signed-off-by: Guang Yang (yguang@yahoo-inc.com)
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index e253474d7bebc4dce1d82a8b3e9ac165df244ec0..0632b3a7355c044bea68c28f093bb34c84008577 100644 (file)
@@ -11,6 +11,9 @@
 
 using namespace librados;
 
+const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
+const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
+
 /**
  * This class represents the bucket index object operation callback context.
  */
@@ -37,15 +40,6 @@ public:
   }
 };
 
-/*
- * Callback implementation for AIO request.
- */
-static void bucket_index_op_completion_cb(void* cb, void* arg) {
-  BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
-  cb_arg->manager->do_completion(cb_arg->id);
-  cb_arg->put();
-}
-
 void BucketIndexAioManager::do_completion(int id) {
   Mutex::Locker l(lock);
 
@@ -95,12 +89,7 @@ static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
   librados::ObjectWriteOperation op;
   op.create(true);
   op.exec("rgw", "bucket_init_index", in);
-  BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
-  AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
-  int r = io_ctx.aio_operate(oid, c, &op);
-  if (r >= 0)
-    manager->add_pending(arg->id, c);
-  return r;
+  return manager->aio_operate(io_ctx, oid, &op);
 }
 
 static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
@@ -111,13 +100,7 @@ static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
   ::encode(call, in);
   ObjectWriteOperation op;
   op.exec("rgw", "bucket_set_tag_timeout", in);
-  BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
-  AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
-  int r = io_ctx.aio_operate(oid, c, &op);
-  if (r >= 0) {
-    manager->add_pending(arg->id, c);
-  }
-  return r;
+  return manager->aio_operate(io_ctx, oid, &op);
 }
 
 int CLSRGWIssueBucketIndexInit::issue_op()
@@ -184,13 +167,7 @@ static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
 
   librados::ObjectReadOperation op;
   op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
-
-  BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
-  AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
-  int r = io_ctx.aio_operate(oid, c, &op, NULL);
-  if (r >= 0)
-    manager->add_pending(arg->id, c);
-  return r;
+  return manager->aio_operate(io_ctx, oid, &op);
 }
 
 int CLSRGWIssueBucketList::issue_op()
@@ -198,19 +175,32 @@ int CLSRGWIssueBucketList::issue_op()
   return issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
 }
 
+static bool issue_bi_log_list_op(librados::IoCtx& io_ctx,
+    const string& oid, BucketIndexShardsManager& marker_mgr, uint32_t max, BucketIndexAioManager *manager,
+    struct cls_rgw_bi_log_list_ret *pdata) {
+  bufferlist in;
+  cls_rgw_bi_log_list_op call;
+  call.marker = marker_mgr.get(oid, "");
+  call.max = max;
+  ::encode(call, in);
+
+  librados::ObjectReadOperation op;
+  op.exec("rgw", "bi_log_list", in, new ClsBucketIndexOpCtx<struct cls_rgw_bi_log_list_ret>(pdata, NULL));
+  return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueBILogList::issue_op()
+{
+  return issue_bi_log_list_op(io_ctx, iter->first, marker_mgr, max, &manager, &iter->second);
+}
+
 static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
     struct rgw_cls_check_index_ret *pdata) {
   bufferlist in;
   librados::ObjectReadOperation op;
   op.exec("rgw", "bucket_check_index", in, new ClsBucketIndexOpCtx<struct rgw_cls_check_index_ret>(
         pdata, NULL));
-  BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
-  AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
-  int r = io_ctx.aio_operate(oid, c, &op, NULL);
-  if (r >= 0) {
-    manager->add_pending(arg->id, c);
-  }
-  return r;
+  return manager->aio_operate(io_ctx, oid, &op);
 }
 
 int CLSRGWIssueBucketCheck::issue_op()
@@ -223,13 +213,7 @@ static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const string& oid,
   bufferlist in;
   librados::ObjectWriteOperation op;
   op.exec("rgw", "bucket_rebuild_index", in);
-  BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
-  AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
-  int r = io_ctx.aio_operate(oid, c, &op);
-  if (r >= 0) {
-    manager->add_pending(arg->id, c);
-  }
-  return r;
+  return manager->aio_operate(io_ctx, oid, &op);
 }
 
 int CLSRGWIssueBucketRebuild::issue_op()
@@ -291,34 +275,6 @@ int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB
   return 0;
 }
 
-int cls_rgw_bi_log_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
-                    list<rgw_bi_log_entry>& entries, bool *truncated)
-{
-  bufferlist in, out;
-  cls_rgw_bi_log_list_op call;
-  call.marker = marker;
-  call.max = max;
-  ::encode(call, in);
-  int r = io_ctx.exec(oid, "rgw", "bi_log_list", in, out);
-  if (r < 0)
-    return r;
-
-  cls_rgw_bi_log_list_ret ret;
-  try {
-    bufferlist::iterator iter = out.begin();
-    ::decode(ret, iter);
-  } catch (buffer::error& err) {
-    return -EIO;
-  }
-
-  entries = ret.entries;
-
-  if (truncated)
-    *truncated = ret.truncated;
-
- return r;
-}
-
 int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker)
 {
   do {
index ebd005fba335d85dbea4b8e5d00e5b1a6db5d1ec..e64d884caa99d030730f486d07db2e13da2ee50a 100644 (file)
@@ -2,11 +2,26 @@
 #define CEPH_CLS_RGW_CLIENT_H
 
 #include "include/types.h"
+#include "include/str_list.h"
 #include "include/rados/librados.hpp"
 #include "cls_rgw_types.h"
 #include "cls_rgw_ops.h"
 #include "common/RefCountedObj.h"
 
+// Forward declaration
+class BucketIndexAioManager;
+
+/*
+ * Bucket index AIO request argument, this is used to pass a argument
+ * to callback.
+ */
+struct BucketIndexAioArg : public RefCountedObject {
+  BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
+    id(_id), manager(_manager) {}
+  int id;
+  BucketIndexAioManager* manager;
+};
+
 /*
  * This class manages AIO completions. This class is not completely thread-safe,
  * methods like *get_next* is not thread-safe and is expected to be called from
@@ -19,12 +34,21 @@ private:
   int next;
   Mutex lock;
   Cond cond;
-public:
   /*
-   * Create a new instance.
+   * Callback implementation for AIO request.
    */
-  BucketIndexAioManager() : pendings(), completions(), next(0),
-      lock("BucketIndexAioManager::lock"), cond() {}
+  static void bucket_index_op_completion_cb(void* cb, void* arg) {
+    BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
+    cb_arg->manager->do_completion(cb_arg->id);
+    cb_arg->put();
+  }
+
+  /*
+   * Get next request ID. This method is not thread-safe.
+   *
+   * Return next request ID.
+   */
+  int get_next() { return next++; }
 
   /*
    * Add a new pending AIO completion instance.
@@ -36,18 +60,17 @@ public:
     Mutex::Locker l(lock);
     pendings[id] = completion;
   }
-
+public:
   /*
-   * Do completion for the given AIO request.
+   * Create a new instance.
    */
-  void do_completion(int id);
+  BucketIndexAioManager() : pendings(), completions(), next(0),
+      lock("BucketIndexAioManager::lock"), cond() {}
 
   /*
-   * Get next request ID. This method is not thread-safe.
-   *
-   * Return next request ID.
+   * Do completion for the given AIO request.
    */
-  int get_next() { return next++; }
+  void do_completion(int id);
 
   /*
    * Wait for AIO completions.
@@ -59,17 +82,32 @@ public:
    * Return false if there is no pending AIO, true otherwise.
    */
   bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code);
-};
 
-/*
- * Bucket index AIO request argument, this is used to pass a argument
- * to callback.
- */
-struct BucketIndexAioArg : public RefCountedObject {
-  BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
-    id(_id), manager(_manager) {}
-  int id;
-  BucketIndexAioManager* manager;
+  /**
+   * Do aio read operation.
+   */
+  bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) {
+    BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+    librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+    int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
+    if (r >= 0) {
+      add_pending(arg->id, c);
+    }
+    return r;
+  }
+
+  /**
+   * Do aio write operation.
+   */
+  bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) {
+    BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
+    librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+    int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
+    if (r >= 0) {
+      add_pending(arg->id, c);
+    }
+    return r;
+  }
 };
 
 class RGWGetDirHeader_CB : public RefCountedObject {
@@ -82,12 +120,23 @@ class BucketIndexShardsManager {
 private:
   // Per shard setting manager, for example, marker.
   map<string, string> value_by_shards;
-  const static char KEY_VALUE_SEPARATOR = '#';
-  const static char SHARDS_SEPARATOR = ',';
 public:
-  void add_item(const string& shard, const string& value) {
+  const static string KEY_VALUE_SEPARATOR;
+  const static string SHARDS_SEPARATOR;
+
+  void add(const string& shard, const string& value) {
     value_by_shards[shard] = value;
   }
+
+  const string& get(const string& shard, const string& default_value) {
+    map<string, string>::iterator iter = value_by_shards.find(shard);
+    return (iter == value_by_shards.end() ? default_value : iter->second);
+  }
+
+  bool empty() {
+    return value_by_shards.empty();
+  }
+
   void to_string(string *out) const {
     if (out) {
       map<string, string>::const_iterator iter = value_by_shards.begin();
@@ -98,15 +147,34 @@ public:
         for (; iter != value_by_shards.end(); ++iter) {
           if (out->length()) {
             // Not the first item, append a separator first
-            out->append(1, SHARDS_SEPARATOR);
+            out->append(SHARDS_SEPARATOR);
           }
           out->append(iter->first);
-          out->append(1, KEY_VALUE_SEPARATOR);
+          out->append(KEY_VALUE_SEPARATOR);
           out->append(iter->second);
         }
       }
     }
   }
+
+  int from_string(const string& composed_marker, bool has_shards, const string& oid) {
+    value_by_shards.clear();
+    if (!has_shards) {
+      add(oid, composed_marker);
+    } else {
+      list<string> shards;
+      get_str_list(composed_marker, SHARDS_SEPARATOR.c_str(), shards);
+      list<string>::const_iterator iter = shards.begin();
+      for (; iter != shards.end(); ++iter) {
+        size_t pos = iter->find(KEY_VALUE_SEPARATOR);
+        if (pos == string::npos)
+          return -EINVAL;
+        string name = iter->substr(0, pos);
+        value_by_shards[name] = iter->substr(pos + 1, iter->length() - pos - 1);
+      }
+    }
+    return 0;
+  }
 };
 
 /* bucket index */
@@ -223,6 +291,18 @@ public:
   start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries) {}
 };
 
+class CLSRGWIssueBILogList : public CLSRGWConcurrentIO<map<string, cls_rgw_bi_log_list_ret> > {
+  BucketIndexShardsManager& marker_mgr;
+  uint32_t max;
+protected:
+  int issue_op();
+public:
+  CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
+      map<string, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
+    CLSRGWConcurrentIO<map<string, cls_rgw_bi_log_list_ret> >(io_ctx, bi_log_lists, max_aio),
+    marker_mgr(_marker_mgr), max(_max) {}
+};
+
 /**
  * Check the bucket index.
  *
@@ -266,10 +346,6 @@ void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist
 
 void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates);
 
-/* bucket index log */
-
-int cls_rgw_bi_log_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max,
-                    list<rgw_bi_log_entry>& entries, bool *truncated);
 int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker);
 
 /* usage logging */
index 605b3ffee685bcd53a1beac4a58f6532b2f55eb8..93c2f9edd238d3d0978aa017508e9ffd42caf5be 100644 (file)
@@ -1676,6 +1676,15 @@ int RGWRados::open_bucket_data_extra_ctx(rgw_bucket& bucket, librados::IoCtx& da
   return 0;
 }
 
+void RGWRados::build_bucket_index_marker(const string& shard_name, const string& shard_marker,
+      string *marker) {
+  if (marker) {
+    *marker = shard_name;
+    marker->append(BucketIndexShardsManager::KEY_VALUE_SEPARATOR);
+    marker->append(shard_marker);
+  }
+}
+
 int RGWRados::open_bucket_index_ctx(rgw_bucket& bucket, librados::IoCtx& index_ctx)
 {
   int r = open_bucket_pool_ctx(bucket.name, bucket.index_pool, index_ctx);
@@ -5479,10 +5488,10 @@ int RGWRados::get_bucket_stats(rgw_bucket& bucket, string *bucket_ver, string *m
   for(; iter != headers.end(); ++iter) {
     accumulate_raw_stats(iter->second, stats);
     snprintf(buf, sizeof(buf), "%lu", iter->second.ver);
-    ver_mgr.add_item(iter->first, string(buf));
+    ver_mgr.add(iter->first, string(buf));
     snprintf(buf, sizeof(buf), "%lu", iter->second.master_ver);
-    master_ver_mgr.add_item(iter->first, string(buf));
-    marker_mgr.add_item(iter->first, iter->second.max_marker);
+    master_ver_mgr.add(iter->first, string(buf));
+    marker_mgr.add(iter->first, iter->second.max_marker);
   }
   ver_mgr.to_string(bucket_ver);
   master_ver_mgr.to_string(master_ver);
@@ -6096,22 +6105,80 @@ int RGWRados::list_raw_objects(rgw_bucket& pool, const string& prefix_filter,
 int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t max,
                                   std::list<rgw_bi_log_entry>& result, bool *truncated)
 {
+  ldout(cct, 20) << __func__ << bucket << " marker " << marker << " max " << max << dendl;
   result.clear();
 
   librados::IoCtx index_ctx;
-  string oid;
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  map<string, cls_rgw_bi_log_list_ret> bi_log_lists;
+  int r = open_bucket_index(bucket, index_ctx, bi_log_lists);
   if (r < 0)
     return r;
 
-  std::list<rgw_bi_log_entry> entries;
-  int ret = cls_rgw_bi_log_list(index_ctx, oid, marker, max - result.size(), entries, truncated);
-  if (ret < 0)
-    return ret;
+  BucketIndexShardsManager marker_mgr;
+  bool has_shards = (bi_log_lists.size() > 1);
+  // If there are multiple shards for the bucket index object, the marker
+  // should have the pattern '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#
+  // {shard_marker_2}...', if there is no sharding, the bi_log_list should
+  // only contain one record, and the key is the bucket index object id.
+  r = marker_mgr.from_string(marker, has_shards, bi_log_lists.begin()->first);
+  if (r < 0)
+    return r;
+  r = CLSRGWIssueBILogList(index_ctx, marker_mgr, max, bi_log_lists, cct->_conf->rgw_bucket_index_max_aio)();
+  if (r < 0)
+    return r;
+
+  vector<list<rgw_bi_log_entry>::iterator> vcurrents;
+  vector<list<rgw_bi_log_entry>::iterator> vends;
+  vector<string> vnames;
+  if (truncated) {
+    *truncated = false;
+  }
+  map<string, cls_rgw_bi_log_list_ret>::iterator miter = bi_log_lists.begin();
+  for (; miter != bi_log_lists.end(); ++miter) {
+    vnames.push_back(miter->first);
+    vcurrents.push_back(miter->second.entries.begin());
+    vends.push_back(miter->second.entries.end());
+    if (truncated) {
+      *truncated = (*truncated || miter->second.truncated);
+    }
+  }
+
+  bool has_more = true;
+  while (result.size() < max && has_more) {
+    has_more = false;
+    for (size_t i = 0;
+        result.size() < max && i < vcurrents.size() && vcurrents[i] != vends[i];
+        ++vcurrents[i], ++i) {
+      if (vcurrents[i] != vends[i]) {
+        rgw_bi_log_entry& entry = *(vcurrents[i]);
+        if (has_shards) {
+          // Put the shard name as part of the ID, so that caller can easy find out
+          // the next marker
+          string tmp_id;
+          build_bucket_index_marker(vnames[i], entry.id, &tmp_id);
+          entry.id.swap(tmp_id);
+        }
+        marker_mgr.add(vnames[i], entry.id);
+        result.push_back(entry);
+        has_more = true;
+      }
+    }
+  }
 
-  std::list<rgw_bi_log_entry>::iterator iter;
-  for (iter = entries.begin(); iter != entries.end(); ++iter) {
-    result.push_back(*iter);
+  for (size_t i = 0; i < vcurrents.size(); ++i) {
+    if (truncated) {
+      *truncated = (*truncated || (vcurrents[i] != vends[i]));
+    }
+  }
+
+  // Refresh marker, if there are multiple shards, the output will look like
+  // '{shard_oid_1}#{shard_marker_1},{shard_oid_2}#{shard_marker_2}...',
+  // if there is no sharding, the simply marker (without oid) is returned
+  if (has_shards) {
+    marker_mgr.to_string(&marker);
+  } else {
+    marker = result.rbegin()->id;
   }
 
   return 0;
index a8f7c0b6de85c4e32b860712d656bf671cc5f756..b501fa450b2b068b29940bcfa08415a252170eca 100644 (file)
@@ -1261,6 +1261,8 @@ class RGWRados
   template<typename T>
   int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
       map<string, T>& bucket_objs);
+  void build_bucket_index_marker(const string& shard_name, const string& shard_marker,
+      string *marker);
 
   struct GetObjState {
     librados::IoCtx io_ctx;