]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: guard resharding in async completions
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 13 May 2017 00:07:04 +0000 (17:07 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:46 +0000 (13:17 -0700)
Catch return value from async index completions, resend operations
synchronously on a separate worker thread if needed.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 55da65a1ace1281fa2dd5dca39b32df210067875..02d827154ada8586f18fc2ca536a7d3ad5f5a1e5 100644 (file)
@@ -2952,10 +2952,20 @@ class RGWRadosThread {
     Mutex lock;
     Cond cond;
 
+    void wait() {
+      Mutex::Locker l(lock);
+      cond.Wait(lock);
+    };
+
+    void wait_interval(const utime_t& wait_time) {
+      Mutex::Locker l(lock);
+      cond.WaitInterval(lock, wait_time);
+    }
+
   public:
     Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p), lock("RGWRadosThread::Worker") {}
     void *entry() override;
-    void stop() {
+    void signal() {
       Mutex::Locker l(lock);
       cond.Signal();
     }
@@ -2987,6 +2997,12 @@ public:
 
   void start();
   void stop();
+
+  void signal() {
+    if (worker) {
+      worker->signal();
+    }
+  }
 };
 
 void RGWRadosThread::start()
@@ -3000,7 +3016,7 @@ void RGWRadosThread::stop()
   down_flag = true;
   stop_process();
   if (worker) {
-    worker->stop();
+    worker->signal();
     worker->join();
   }
   delete worker;
@@ -3037,13 +3053,9 @@ void *RGWRadosThread::Worker::entry() {
       utime_t wait_time = interval;
       wait_time -= end;
 
-      lock.Lock();
-      cond.WaitInterval(lock, wait_time);
-      lock.Unlock();
+      wait_interval(wait_time);
     } else {
-      lock.Lock();
-      cond.Wait(lock);
-      lock.Unlock();
+      wait();
     }
   } while (!processor->going_down());
 
@@ -3368,6 +3380,177 @@ int RGWRados::get_max_chunk_size(const string& placement_rule, const rgw_obj& ob
   return get_max_chunk_size(pool, max_chunk_size);
 }
 
+class RGWIndexCompletionManager;
+
+struct complete_op_data {
+  RGWIndexCompletionManager *manager{nullptr};
+  rgw_obj obj;
+  RGWModifyOp op;
+  string tag;
+  rgw_bucket_entry_ver ver;
+  cls_rgw_obj_key key;
+  rgw_bucket_dir_entry_meta dir_meta;
+  list<cls_rgw_obj_key> remove_objs;
+  bool log_op;
+  uint16_t bilog_op;
+};
+
+class RGWIndexCompletionThread : public RGWRadosThread {
+  RGWRados *store;
+
+  uint64_t interval_msec() override {
+    return 0;
+  }
+
+  list<complete_op_data *> completions;
+
+  Mutex completions_lock;
+public:
+  RGWIndexCompletionThread(RGWRados *_store)
+    : RGWRadosThread(_store, "index-complete"), store(_store), completions_lock("RGWIndexCompletionThread::completions_lock") {}
+
+  int process() override;
+
+  void add_completion(complete_op_data *completion) {
+    {
+      Mutex::Locker l(completions_lock);
+      completions.push_back(completion);
+    }
+
+    signal();
+  }
+};
+
+int RGWIndexCompletionThread::process()
+{
+  list<complete_op_data *> comps;
+
+  {
+    Mutex::Locker l(completions_lock);
+    completions.swap(comps);
+  }
+
+  for (auto c: comps) {
+    if (!going_down()) {
+      ldout(store->ctx(), 20) << __func__ << "(): handling completion for key=" << c->key << dendl;
+
+      RGWRados::BucketShard bs(store);
+
+      int r = bs.init(c->obj.bucket, c->obj);
+      if (r < 0) {
+        ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
+        /* not much to do */
+        delete c;
+        continue;
+      }
+
+      r = store->guard_reshard(&bs, c->obj, [&](RGWRados::BucketShard *bs) -> int { 
+                               librados::ObjectWriteOperation o;
+                               cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+                               cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
+                                                          c->log_op, c->bilog_op);
+
+                               return bs->index_ctx.operate(bs->bucket_obj, &o);
+                               });
+      ldout(cct, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
+
+      /* ignoring error, can't do anything about it */
+    }
+    delete c;
+  }
+
+  return 0;
+}
+
+class RGWIndexCompletionManager {
+  RGWRados *store{nullptr};
+  Mutex lock{"RGWIndexCompletionManager::lock"};
+  set<complete_op_data *> completions;
+
+  RGWIndexCompletionThread *completion_thread{nullptr};
+
+
+public:
+  RGWIndexCompletionManager(RGWRados *_store) : store(_store) {}
+  ~RGWIndexCompletionManager() {
+    delete completion_thread;
+  }
+
+  void create_completion(const rgw_obj& obj,
+                         RGWModifyOp op, string& tag,
+                         rgw_bucket_entry_ver& ver,
+                         const cls_rgw_obj_key& key,
+                         rgw_bucket_dir_entry_meta& dir_meta,
+                         list<cls_rgw_obj_key>& remove_objs, bool log_op,
+                         uint16_t bilog_op,
+                         complete_op_data **result);
+  void handle_completion(completion_t cb, complete_op_data *arg);
+
+  int start() {
+    completion_thread = new RGWIndexCompletionThread(store);
+    int ret = completion_thread->init();
+    if (ret < 0) {
+      return ret;
+    }
+    completion_thread->start();
+    return 0;
+  }
+  void stop() {
+    if (completion_thread) {
+      completion_thread->stop();
+    }
+  }
+};
+
+void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
+                                                  RGWModifyOp op, string& tag,
+                                                  rgw_bucket_entry_ver& ver,
+                                                  const cls_rgw_obj_key& key,
+                                                  rgw_bucket_dir_entry_meta& dir_meta,
+                                                  list<cls_rgw_obj_key>& remove_objs, bool log_op,
+                                                  uint16_t bilog_op,
+                                                  complete_op_data **result)
+{
+  complete_op_data *entry = new complete_op_data;
+
+  entry->manager = this;
+  entry->obj = obj;
+  entry->op = op;
+  entry->tag = tag;
+  entry->ver = ver;
+  entry->key = key;
+  entry->dir_meta = dir_meta;
+  entry->log_op = log_op;
+  entry->bilog_op = bilog_op;
+  entry->remove_objs = remove_objs;
+
+  *result = entry;
+
+  Mutex::Locker l(lock);
+  completions.insert(entry);
+}
+
+void RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg)
+{
+  {
+    Mutex::Locker l(lock);
+
+    auto iter = completions.find(arg);
+    if (iter == completions.end()) {
+      return;
+    }
+
+    completions.erase(iter);
+  }
+
+  int r = rados_aio_get_return_value(cb);
+  if (r != -ERR_BUSY_RESHARDING) {
+    delete arg;
+    return;
+  }
+  completion_thread->add_completion(arg);
+}
+
 void RGWRados::finalize()
 {
   if (run_sync_thread) {
@@ -3459,6 +3642,9 @@ void RGWRados::finalize()
   delete binfo_cache;
   delete obj_tombstone_cache;
   delete sync_modules_manager;
+
+  delete reshard;
+  delete index_completion_manager;
 }
 
 /** 
@@ -4271,6 +4457,8 @@ int RGWRados::init_complete()
   }
 
   reshard = new RGWReshard(this);
+  index_completion_manager = new RGWIndexCompletionManager(this);
+  ret = index_completion_manager->start();
 
   return ret;
 }
@@ -9646,7 +9834,7 @@ int RGWRados::Bucket::UpdateIndex::complete(int64_t poolid, uint64_t epoch,
   ent.meta.owner_display_name = owner.get_display_name();
   ent.meta.content_type = content_type;
 
-  ret = store->cls_obj_complete_add(*bs, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
+  ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
 
   int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
   if (r < 0) {
@@ -12213,7 +12401,12 @@ int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
   return bs.index_ctx.operate(bs.bucket_obj, &o);
 }
 
-int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
+static void obj_complete_cb(completion_t cb, void *arg)
+{
+  ((complete_op_data *)arg)->manager->handle_completion(cb, (complete_op_data *)arg);
+}
+
+int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag,
                                   int64_t pool, uint64_t epoch,
                                   rgw_bucket_dir_entry& ent, RGWObjCategory category,
                                  list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
@@ -12249,18 +12442,21 @@ int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
   cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, pro,
                              get_zone().log_data, bilog_flags, zones_trace);
 
-  AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
+  complete_op_data *arg;
+  index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, ro,
+                                              get_zone().log_data, bilog_flags, &arg);
+  AioCompletion *c = librados::Rados::aio_create_completion(arg, NULL, obj_complete_cb);
   int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
   c->release();
   return ret;
 }
 
-int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
+int RGWRados::cls_obj_complete_add(BucketShard& bs, const rgw_obj& obj, string& tag,
                                    int64_t pool, uint64_t epoch,
                                    rgw_bucket_dir_entry& ent, RGWObjCategory category,
                                    list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace)
 {
-  return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
+  return cls_obj_complete_op(bs, obj, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
 }
 
 int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
@@ -12274,14 +12470,14 @@ int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
   rgw_bucket_dir_entry ent;
   ent.meta.mtime = removed_mtime;
   obj.key.get_index_key(&ent.key);
-  return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags, zones_trace);
+  return cls_obj_complete_op(bs, obj, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags, zones_trace);
 }
 
 int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace)
 {
   rgw_bucket_dir_entry ent;
   obj.key.get_index_key(&ent.key);
-  return cls_obj_complete_op(bs, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags, zones_trace);
+  return cls_obj_complete_op(bs, obj, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags, zones_trace);
 }
 
 int RGWRados::cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_t timeout)
index 28fd2dd97937fa2ea5539052a0b4147718f8de35..3b7f5a362e84c0638c12c37f9afecde960a0f27f 100644 (file)
@@ -2169,6 +2169,8 @@ struct tombstone_entry {
       pg_ver(state.pg_ver) {}
 };
 
+class RGWIndexCompletionManager;
+
 class RGWRados
 {
   friend class RGWGC;
@@ -2310,6 +2312,8 @@ protected:
   uint32_t zone_short_id;
 
   RGWPeriod current_period;
+
+  RGWIndexCompletionManager *index_completion_manager{nullptr};
 public:
   RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
                gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
@@ -3347,9 +3351,9 @@ public:
 
   int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
   int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
-  int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
+  int cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
                           rgw_bucket_dir_entry& ent, RGWObjCategory category, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
-  int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, rgw_bucket_dir_entry& ent,
+  int cls_obj_complete_add(BucketShard& bs, const rgw_obj& obj, string& tag, int64_t pool, uint64_t epoch, rgw_bucket_dir_entry& ent,
                            RGWObjCategory category, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
   int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, rgw_obj& obj,
                            ceph::real_time& removed_mtime, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);