]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Adjust bucket listing to work with multiple shards.
authorGuang Yang <yguang@yahoo-inc.com>
Mon, 18 Aug 2014 11:46:32 +0000 (11:46 +0000)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 14 Jan 2015 03:21:23 +0000 (19:21 -0800)
Signed-off-by: Guang Yang (yguang@yahoo-inc.com)
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/rgw/rgw_bucket.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 47c9dcb8db899fa2855eea00fa0721ade27d1e5e..0d698e3c8150417b285189d04fc97e2bd8868c79 100644 (file)
 
 using namespace librados;
 
+/**
+ * This class represents the bucket index object operation callback context.
+ */
+template <typename T>
+class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
+private:
+  T *data;
+  int *ret_code;
+public:
+  ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { assert(data); }
+  ~ClsBucketIndexOpCtx() {}
+  void handle_completion(int r, bufferlist& outbl) {
+    if (r >= 0) {
+      try {
+        bufferlist::iterator iter = outbl.begin();
+        ::decode((*data), iter);
+      } catch (buffer::error& err) {
+        r = -EIO;
+      }
+    }
+    if (ret_code) {
+      *ret_code = r;
+    }
+  }
+};
+
 /*
  * Callback implementation for AIO request.
  */
@@ -91,7 +117,7 @@ int cls_rgw_bucket_index_init_op(librados::IoCtx& io_ctx,
       break;
   }
 
-  int num_completions, r;
+  int num_completions, r = 0;
   while (manager.wait_for_completions(-EEXIST, &num_completions, &r)) {
     if (r >= 0 && ret >= 0) {
       for(int i = 0; i < num_completions && iter != bucket_objs.end(); ++i, ++iter) {
@@ -159,35 +185,57 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, string&
   o.exec("rgw", "bucket_complete_op", in);
 }
 
-
-int cls_rgw_list_op(IoCtx& io_ctx, string& oid, string& start_obj,
-                    string& filter_prefix, uint32_t num_entries,
-                    rgw_bucket_dir *dir, bool *is_truncated)
-{
-  bufferlist in, out;
+static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
+    const string& oid, const string& start_obj, const string& filter_prefix,
+    uint32_t num_entries, BucketIndexAioManager *manager,
+    struct rgw_cls_list_ret *pdata) {
+  bufferlist in;
   struct rgw_cls_list_op call;
   call.start_obj = start_obj;
   call.filter_prefix = filter_prefix;
   call.num_entries = num_entries;
   ::encode(call, in);
-  int r = io_ctx.exec(oid, "rgw", "bucket_list", in, out);
-  if (r < 0)
-    return r;
 
-  struct rgw_cls_list_ret ret;
-  try {
-    bufferlist::iterator iter = out.begin();
-    ::decode(ret, iter);
-  } catch (buffer::error& err) {
-    return -EIO;
+  librados::ObjectReadOperation op;
+  op.exec("rgw", "bucket_list", in, new ClsBucketIndexOpCtx<struct rgw_cls_list_ret>(pdata, NULL));
+
+  BucketIndexAioArg *arg = new BucketIndexAioArg(manager->get_next(), manager);
+  AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
+  int r = io_ctx.aio_operate(oid, c, &op, NULL);
+  if (r >= 0)
+    manager->add_pending(arg->id, c);
+  return r;
+}
+
+int cls_rgw_list_op(IoCtx& io_ctx, const string& start_obj,
+        const string& filter_prefix, uint32_t num_entries,
+        map<string, struct rgw_cls_list_ret>& list_results, uint32_t max_aio)
+{
+  int ret = 0;
+  BucketIndexAioManager manager;
+  map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+  for (; iter != list_results.end() && max_aio-- > 0; ++iter) {
+    ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
+    if (ret < 0)
+      break;
   }
 
-  if (dir)
-    *dir = ret.dir;
-  if (is_truncated)
-    *is_truncated = ret.is_truncated;
+  int num_completions, r = 0;
+  while (manager.wait_for_completions(0, &num_completions, &r)) {
+    if (r >= 0 && ret >= 0) {
+      for (int i = 0; i < num_completions && iter != list_results.end(); ++i, ++iter) {
+        int issue_ret = issue_bucket_list_op(io_ctx, iter->first, start_obj, filter_prefix, num_entries, &manager, &iter->second);
+        if (issue_ret < 0) {
+          ret = issue_ret;
+          break;
+        }
+      }
+    } else if (ret >= 0) {
+      ret = r;
+    }
+  }
 
return r;
 return ret;
 }
 
 int cls_rgw_bucket_check_index_op(IoCtx& io_ctx, string& oid,
index 9cce78f6331ad1255cffb0c4479a3e137d448b17..a49b6422e7e80853a9a63bb63584d8964904b834 100644 (file)
@@ -101,9 +101,26 @@ void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp o
                                 rgw_bucket_entry_ver& ver, string& name, rgw_bucket_dir_entry_meta& dir_meta,
                                list<string> *remove_objs, bool log_op);
 
-int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj,
-                    string& filter_prefix, uint32_t num_entries,
-                    rgw_bucket_dir *dir, bool *is_truncated);
+/**
+ * List the bucket with the starting object and filter prefix.
+ * NOTE: this method do listing requests for each bucket index shards identified by
+ *       the keys of the *list_results* map, which means the map should be popludated
+ *       by the caller to fill with each bucket index object id.
+ *
+ * io_ctx        - IO context for rados.
+ * start_obj     - marker for the listing.
+ * filter_prefix - filter prefix.
+ * num_entries   - number of entries to request for each object (note the total
+ *                 amount of entries returned depends on the number of shardings).
+ * list_results  - the list results keyed by bucket index object id.
+ * max_aio       - the maximum number of AIO (for throttling).
+ *
+ * Return 0 on success, a failure code otherwise.
+*/
+int cls_rgw_list_op(librados::IoCtx& io_ctx, const string & start_obj,
+                    const string& filter_prefix, uint32_t num_entries,
+                    map<string, struct rgw_cls_list_ret>& list_results,
+                    uint32_t max_aio);
 
 int cls_rgw_bucket_check_index_op(librados::IoCtx& io_ctx, string& oid,
                                  rgw_bucket_dir_header *existing_header,
index 4afe1ae10192a8bc5b4277adaa12aedb8b3ce9c0..c09cdc2f26c809df07864a8e5fa0845fb3b939ff 100644 (file)
@@ -729,9 +729,9 @@ int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
   while (is_truncated) {
     map<string, RGWObjEnt> result;
 
-    int r = store->cls_bucket_list(bucket, marker, prefix, 1000, result,
-             &is_truncated, &marker,
-             bucket_object_check_filter);
+    int r = store->cls_bucket_list(bucket, marker, prefix, 1000,
+                                   result, &is_truncated, &marker,
+                                  bucket_object_check_filter);
 
     if (r == -ENOENT) {
       break;
index f53727bb0a331dae7639d769f4194645b2cacdbf..235047e296f5ce96ca7b1f44c526d035a4b7de08 100644 (file)
@@ -19,6 +19,7 @@
 #include "rgw_metadata.h"
 #include "rgw_bucket.h"
 
+#include "cls/rgw/cls_rgw_ops.h"
 #include "cls/rgw/cls_rgw_types.h"
 #include "cls/rgw/cls_rgw_client.h"
 #include "cls/refcount/cls_refcount_client.h"
@@ -2326,6 +2327,10 @@ int RGWRados::list_objects(rgw_bucket& bucket, int max, string& prefix, string&
       result.push_back(ent);
       count++;
     }
+
+    // Either the back-end telling us truncated, or we don't consume all
+    // items returned per the amount caller request
+    truncated = (truncated || eiter != ent_map.end());
   }
 
 done:
@@ -3811,6 +3816,22 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
   return 0;
 }
 
+template<typename T>
+int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+    map<string, T>& bucket_objs)
+{
+  vector<string> oids;
+  int ret = open_bucket_index(bucket, index_ctx, oids);
+  if (ret < 0)
+    return ret;
+
+  vector<string>::const_iterator iter = oids.begin();
+  for (; iter != oids.end(); ++iter) {
+    bucket_objs[*iter] = T();
+  }
+  return 0;
+}
+
 int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
     const string& obj_key, string *bucket_obj)
 {
@@ -6203,31 +6224,56 @@ int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeou
   return r;
 }
 
-int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix,
-                             uint32_t num, map<string, RGWObjEnt>& m,
-                             bool *is_truncated, string *last_entry,
-                             bool (*force_check_filter)(const string&  name))
+int RGWRados::cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix,
+                             uint32_t num_entries, map<string, RGWObjEnt>& m, bool *is_truncated,
+                             string *last_entry, bool (*force_check_filter)(const string&  name))
 {
-  ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num " << num << dendl;
+  ldout(cct, 10) << "cls_bucket_list " << bucket << " start " << start << " num_entries " << num_entries << dendl;
 
   librados::IoCtx index_ctx;
-  string oid;
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  // key   - oid (for different shards if there is any)
+  // value - list result for the corresponding oid (shard), it is filled by the AIO callback
+  map<string, struct rgw_cls_list_ret> list_results;
+  int r = open_bucket_index(bucket, index_ctx, list_results);
   if (r < 0)
     return r;
 
-  struct rgw_bucket_dir dir;
-  r = cls_rgw_list_op(index_ctx, oid, start, prefix, num, &dir, is_truncated);
+  r = cls_rgw_list_op(index_ctx, start, prefix, num_entries, list_results, cct->_conf->rgw_bucket_index_max_aio);
   if (r < 0)
     return r;
 
-  map<string, struct rgw_bucket_dir_entry>::iterator miter;
-  bufferlist updates;
-  for (miter = dir.m.begin(); miter != dir.m.end(); ++miter) {
-    RGWObjEnt e;
-    rgw_bucket_dir_entry& dirent = miter->second;
+  // Create a list of iterators that are used to iterate each shard
+  vector<map<string, struct rgw_bucket_dir_entry>::iterator> vcurrents(list_results.size());
+  vector<map<string, struct rgw_bucket_dir_entry>::iterator> vends(list_results.size());
+  vector<string> vnames(list_results.size());
+  map<string, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
+  *is_truncated = false;
+  for (; iter != list_results.end(); ++iter) {
+    vcurrents.push_back(iter->second.dir.m.begin());
+    vends.push_back(iter->second.dir.m.end());
+    vnames.push_back(iter->first);
+    *is_truncated = (*is_truncated || iter->second.is_truncated);
+  }
+
+  // Create a map to track the next candidate entry from each shard, if the entry
+  // from a specified shard is selected/erased, the next entry from that shard will
+  // be inserted for next round selection
+  map<string, size_t> candidates;
+  for (size_t i = 0; i < vcurrents.size(); ++i) {
+    if (vcurrents[i] != vends[i]) {
+      candidates[vcurrents[i]->second.name] = i;
+    }
+  }
+
+  map<string, bufferlist> updates;
+  uint32_t count = 0;
+  while (count < num_entries && !candidates.empty()) {
+    // Select the next one
+    int pos = candidates.begin()->second;
+    struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
 
     // fill it in with initial values; we may correct later
+    RGWObjEnt e;
     e.name = dirent.name;
     e.size = dirent.meta.size;
     e.mtime = dirent.meta.mtime;
@@ -6237,20 +6283,13 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix,
     e.content_type = dirent.meta.content_type;
     e.tag = dirent.tag;
 
-    /* oh, that shouldn't happen! */
-    if (e.name.empty()) {
-      ldout(cct, 0) << "WARNING: got empty dirent name, skipping" << dendl;
-      continue;
-    }
-
     bool force_check = force_check_filter && force_check_filter(dirent.name);
-
     if (!dirent.exists || !dirent.pending_map.empty() || force_check) {
       /* there are uncommitted ops. We need to check the current state,
        * and if the tags are old we need to do cleanup as well. */
       librados::IoCtx sub_ctx;
       sub_ctx.dup(index_ctx);
-      r = check_disk_state(sub_ctx, bucket, dirent, e, updates);
+      r = check_disk_state(sub_ctx, bucket, dirent, e, updates[vnames[pos]]);
       if (r < 0) {
         if (r == -ENOENT)
           continue;
@@ -6260,21 +6299,37 @@ int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix,
     }
     m[e.name] = e;
     ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.name << dendl;
+
+    // Refresh the candidates map
+    candidates.erase(candidates.begin());
+    ++vcurrents[pos];
+    if (vcurrents[pos] != vends[pos]) {
+      candidates[vcurrents[pos]->second.name] = pos;
+    }
+    ++count;
   }
 
-  if (dir.m.size()) {
-    *last_entry = dir.m.rbegin()->first;
+  // Suggest updates if there is any
+  map<string, bufferlist>::iterator miter = updates.begin();
+  for (; miter != updates.end(); ++miter) {
+    if (miter->second.length()) {
+      ObjectWriteOperation o;
+      cls_rgw_suggest_changes(o, miter->second);
+      // we don't care if we lose suggested updates, send them off blindly
+      AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+      index_ctx.aio_operate(miter->first, c, &o);
+        c->release();
+    }
   }
 
-  if (updates.length()) {
-    ObjectWriteOperation o;
-    cls_rgw_suggest_changes(o, updates);
-    // we don't care if we lose suggested updates, send them off blindly
-    AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
-    r = index_ctx.aio_operate(oid, c, &o);
-    c->release();
+  // Check if all the returned entries are consumed or not
+  for (size_t i = 0; i < vcurrents.size(); ++i) {
+    if (vcurrents[i] != vends[i])
+      *is_truncated = true;
   }
-  return m.size();
+  if (m.size())
+    *last_entry = m.rbegin()->first;
+  return 0;
 }
 
 int RGWRados::cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info)
index 556b4cf685eff52009644564c25c9fc1a157dd1a..cec51c95367d9974780c95fda534f09f0b7b62df 100644 (file)
@@ -1266,6 +1266,9 @@ class RGWRados
       const string& obj_key, string *bucket_obj);
   int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
       vector<string>& bucket_objs);
+  template<typename T>
+  int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
+      map<string, T>& bucket_objs);
   struct GetObjState {
     librados::IoCtx io_ctx;
     bool sent_data;
@@ -1854,9 +1857,9 @@ public:
   int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name);
   int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name);
   int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout);
-  int cls_bucket_list(rgw_bucket& bucket, string start, string prefix, uint32_t num,
-                      map<string, RGWObjEnt>& m, bool *is_truncated,
-                      string *last_entry, bool (*force_check_filter)(const string&  name) = NULL);
+  int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
+                      map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
+                      bool (*force_check_filter)(const string&  name) = NULL);
   int cls_bucket_head(rgw_bucket& bucket, struct rgw_bucket_dir_header& header);
   int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx);
   int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,