]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Adjust bi log trim implementation to work with multiple bucket shards.
authorGuang Yang <yguang@yahoo-inc.com>
Wed, 24 Sep 2014 06:21:28 +0000 (06:21 +0000)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 14 Jan 2015 03:21:24 +0000 (19:21 -0800)
Signed-off-by: Guang Yang (yguang@yahoo-inc.com)
src/cls/rgw/cls_rgw_client.cc
src/cls/rgw/cls_rgw_client.h
src/rgw/rgw_rados.cc

index 0632b3a7355c044bea68c28f093bb34c84008577..c0c5d7c7731e7a2a079cd33be7fc22d613e74c77 100644 (file)
@@ -48,11 +48,19 @@ void BucketIndexAioManager::do_completion(int id) {
   completions.push_back(iter->second);
   pendings.erase(iter);
 
+  // If the caller needs a list of finished objects, store them
+  // for further processing
+  map<int, string>::iterator miter = pending_objs.find(id);
+  if (miter != pending_objs.end()) {
+    completion_objs.push_back(miter->second);
+    pending_objs.erase(miter);
+  }
+
   cond.Signal();
 }
 
 bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
-    int *num_completions, int *ret_code) {
+    int *num_completions, int *ret_code, vector<string> *objs) {
   lock.Lock();
   if (pendings.empty() && completions.empty()) {
     lock.Unlock();
@@ -63,8 +71,12 @@ bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
 
   // Clear the completed AIOs
   list<librados::AioCompletion*>::iterator iter = completions.begin();
-  for (; iter != completions.end(); ++iter) {
+  list<string>::iterator liter = completion_objs.begin();
+  for (; iter != completions.end() && liter != completion_objs.end(); ++iter, ++liter) {
     int r = (*iter)->get_return_value();
+    if (objs && r == 0) {
+      objs->push_back(*liter);
+    }
     if (ret_code && (r < 0 && r != valid_ret_code))
       (*ret_code) = r;
     (*iter)->release();
@@ -194,6 +206,24 @@ int CLSRGWIssueBILogList::issue_op()
   return issue_bi_log_list_op(io_ctx, iter->first, marker_mgr, max, &manager, &iter->second);
 }
 
+static bool issue_bi_log_trim(librados::IoCtx& io_ctx,
+    string& oid, BucketIndexShardsManager& start_marker_mgr,
+    BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
+  bufferlist in;
+  cls_rgw_bi_log_trim_op call;
+  call.start_marker = start_marker_mgr.get(oid, "");
+  call.end_marker = end_marker_mgr.get(oid, "");
+  ::encode(call, in);
+  ObjectWriteOperation op;
+  op.exec("rgw", "bi_log_trim", in);
+  return manager->aio_operate(io_ctx, oid, &op);
+}
+
+int CLSRGWIssueBILogTrim::issue_op()
+{
+  return issue_bi_log_trim(io_ctx, *iter, start_marker_mgr, end_marker_mgr, &manager);
+}
+
 static bool issue_bucket_check_index_op(IoCtx& io_ctx, const string& oid, BucketIndexAioManager *manager,
     struct rgw_cls_check_index_ret *pdata) {
   bufferlist in;
@@ -275,28 +305,6 @@ int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB
   return 0;
 }
 
-int cls_rgw_bi_log_trim(IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker)
-{
-  do {
-    int r;
-    bufferlist in, out;
-    cls_rgw_bi_log_trim_op call;
-    call.start_marker = start_marker;
-    call.end_marker = end_marker;
-    ::encode(call, in);
-    r = io_ctx.exec(oid, "rgw", "bi_log_trim", in, out);
-
-    if (r == -ENODATA)
-      break;
-
-    if (r < 0)
-      return r;
-
-  } while (1);
-
-  return 0;
-}
-
 int cls_rgw_usage_log_read(IoCtx& io_ctx, string& oid, string& user,
                            uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
                            string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage,
index e64d884caa99d030730f486d07db2e13da2ee50a..14a702cc4296230fe5398f345a59154a1583b1d9 100644 (file)
@@ -31,6 +31,8 @@ class BucketIndexAioManager {
 private:
   map<int, librados::AioCompletion*> pendings;
   list<librados::AioCompletion*> completions;
+  map<int, string> pending_objs;
+  list<string> completion_objs;
   int next;
   Mutex lock;
   Cond cond;
@@ -49,23 +51,29 @@ private:
    * Return next request ID.
    */
   int get_next() { return next++; }
-
+    
   /*
    * Add a new pending AIO completion instance.
    *
    * @param id         - the request ID.
    * @param completion - the AIO completion instance.
+   * @param oid        - the object id associated with the object, if it is NULL, we don't
+   *                     track the object id per callback.
    */
-  void add_pending(int id, librados::AioCompletion* completion) {
+  void add_pending(int id, librados::AioCompletion* completion, string *oid = NULL) {
     Mutex::Locker l(lock);
     pendings[id] = completion;
+    if (oid) {
+      pending_objs[id] = *oid;
+    }
   }
 public:
   /*
    * Create a new instance.
    */
-  BucketIndexAioManager() : pendings(), completions(), next(0),
-      lock("BucketIndexAioManager::lock"), cond() {}
+  BucketIndexAioManager() : pendings(), completions(), pending_objs(), completion_objs(),
+  next(0), lock("BucketIndexAioManager::lock"), cond() {}
+
 
   /*
    * Do completion for the given AIO request.
@@ -78,10 +86,12 @@ public:
    * valid_ret_code  - valid AIO return code.
    * num_completions - number of completions.
    * ret_code        - return code of failed AIO.
+   * objs            - a list of objects that has been finished the AIO.
    *
    * Return false if there is no pending AIO, true otherwise.
    */
-  bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code);
+  bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
+      vector<string> *objs);
 
   /**
    * Do aio read operation.
@@ -193,6 +203,12 @@ protected:
 
   virtual void cleanup() {}
   virtual int valid_ret_code() { return 0; }
+  // Return true if multiple rounds of OPs might be needed, this happens when
+  // OP needs to be re-send until a certain code is returned.
+  virtual bool need_multiple_rounds() { return false; }
+  // Add a new object to the end of the container.
+  virtual void add_object(const string& oid) {}
+  virtual void reset_container(vector<string>& objs) {}
 
 public:
   CLSRGWConcurrentIO(librados::IoCtx& ioc, T& _objs_container,
@@ -209,7 +225,8 @@ public:
     }
 
     int num_completions, r = 0;
-    while (manager.wait_for_completions(0, &num_completions, &r)) {
+    vector<string> objs;
+    while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, &objs)) {
       if (r >= 0 && ret >= 0) {
         for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
           int issue_ret = issue_op();
@@ -221,6 +238,11 @@ public:
       } else if (ret >= 0) {
         ret = r;
       }
+      if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
+        // For those objects which need another round, use them to reset
+        // the container
+        reset_container(objs);
+      }
     }
 
     if (ret < 0) {
@@ -303,6 +325,27 @@ public:
     marker_mgr(_marker_mgr), max(_max) {}
 };
 
+class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO<vector<string> > {
+  BucketIndexShardsManager& start_marker_mgr;
+  BucketIndexShardsManager& end_marker_mgr;
+protected:
+  int issue_op();
+  // Trim until -ENODATA is returned.
+  int valid_ret_code() { return -ENODATA; }
+  bool need_multiple_rounds() { return true; }
+  void add_object(const string& oid) { objs_container.push_back(oid); }
+  void reset_container(vector<string>& objs) {
+    objs_container.swap(objs);
+    iter = objs_container.begin();
+    objs.clear();
+  }
+public:
+  CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
+      BucketIndexShardsManager& _end_marker_mgr, vector<string>& _bucket_objs, uint32_t max_aio) :
+    CLSRGWConcurrentIO<vector<string> >(io_ctx, _bucket_objs, max_aio),
+    start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
+};
+
 /**
  * Check the bucket index.
  *
@@ -346,8 +389,6 @@ void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist
 
 void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates);
 
-int cls_rgw_bi_log_trim(librados::IoCtx& io_ctx, string& oid, string& start_marker, string& end_marker);
-
 /* usage logging */
 int cls_rgw_usage_log_read(librados::IoCtx& io_ctx, string& oid, string& user,
                            uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
index 93c2f9edd238d3d0978aa017508e9ffd42caf5be..a90a442122724c4a45d4908d01c7d416889cc4ba 100644 (file)
@@ -6187,16 +6187,23 @@ int RGWRados::list_bi_log_entries(rgw_bucket& bucket, string& marker, uint32_t m
 int RGWRados::trim_bi_log_entries(rgw_bucket& bucket, string& start_marker, string& end_marker)
 {
   librados::IoCtx index_ctx;
-  string oid;
-  int r = open_bucket_index(bucket, index_ctx, oid);
+  vector<string> bucket_objs;
+  int r = open_bucket_index(bucket, index_ctx, bucket_objs);
   if (r < 0)
     return r;
 
-  int ret = cls_rgw_bi_log_trim(index_ctx, oid, start_marker, end_marker);
-  if (ret < 0)
-    return ret;
+  bool has_shards = bucket_objs.size() > 1;
+  BucketIndexShardsManager start_marker_mgr;
+  r = start_marker_mgr.from_string(start_marker, has_shards, bucket_objs.front());
+  if (r < 0)
+    return r;
+  BucketIndexShardsManager end_marker_mgr;
+  r = end_marker_mgr.from_string(end_marker, has_shards, bucket_objs.front()); 
+  if (r < 0)
+    return r;
 
-  return 0;
+  return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
+      cct->_conf->rgw_bucket_index_max_aio)();
 }
 
 int RGWRados::gc_operate(string& oid, librados::ObjectWriteOperation *op)