]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add resharding thread
authorOrit Wasserman <owasserm@redhat.com>
Sun, 14 May 2017 05:34:04 +0000 (08:34 +0300)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Jun 2017 20:17:49 +0000 (13:17 -0700)
Signed-off-by: Orit Wasserman <owasserm@redhat.com>
src/rgw/rgw_reshard.cc
src/rgw/rgw_reshard.h

index 8cd657d380db3106f369bb07f2038ae2a57be2a5..67ffe30f311fed78246b2306e4c8050b972f7cb3 100644 (file)
@@ -630,8 +630,134 @@ int RGWReshard::block_while_resharding(RGWRados::BucketShard *bs, string *new_bu
   return -ERR_BUSY_RESHARDING;
 }
 
-BucketIndexLockGuard::BucketIndexLockGuard(CephContext *_cct, RGWRados* _store, const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
-  cct(_cct), store(_store),
+int RGWReshard::process_single_shard(const string& shard)
+{
+  string marker;
+  bool truncated = false;
+
+  CephContext *cct = store->ctx();
+  int max_entries = 1000;
+  int max_secs = 10;
+  
+  rados::cls::lock::Lock l(reshard_lock_name);
+
+  utime_t time(max_secs, 0);
+  l.set_duration(time);
+
+  int ret = l.lock_exclusive(&store->reshard_pool_ctx, shard);
+  if (ret == -EBUSY) { /* already locked by another processor */
+    dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
+    return ret;
+  }
+
+  do {
+    std::list<cls_rgw_reshard_entry> entries;
+    ret = list(marker, max_entries, entries, truncated);
+    if (ret < 0) {
+      ldout(cct, 10) << "cannot list all reshards: " << shard
+                     << dendl;
+      continue;
+    }
+  } while (truncated);
+
+  l.unlock(&store->reshard_pool_ctx, shard);
+  return 0;
+}
+
+
+void  RGWReshard::get_shard(int shard_num, string& shard)
+{
+  char buf[32];
+  snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
+
+  string objname("bucket_reshard.");
+  shard =  objname + buf;
+}
+
+int RGWReshard::inspect_all_shards()
+{
+  CephContext * const cct = store->ctx();
+  int ret = 0;
+
+  for (int i = 0; i < num_shards; i++) {
+    string shard;
+    store->objexp_get_shard(i, shard);
+
+    ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;
+
+    ret = process_single_shard(shard);
+    if (ret <0) {
+      return ret;
+    }
+  }
+
+  return 0;
+}
+
+bool RGWReshard::going_down()
+{
+  return down_flag;
+}
+
+void RGWReshard::start_processor()
+{
+  worker = new ReshardWorker(store->ctx(), this);
+  worker->create("rgw_reshard");
+}
+
+void RGWReshard::stop_processor()
+{
+  down_flag = true;
+  if (worker) {
+    worker->stop();
+    worker->join();
+  }
+  delete worker;
+  worker = NULL;
+}
+
+void *RGWReshard::ReshardWorker::entry() {
+  utime_t last_run;
+  do {
+    utime_t start = ceph_clock_now();
+    ldout(cct, 2) << "object expiration: start" << dendl;
+    if (reshard->inspect_all_shards()) {
+      /* All shards have been processed properly. Next time we can start
+       * from this moment. */
+      last_run = start;
+    }
+    ldout(cct, 2) << "object expiration: stop" << dendl;
+
+
+    if (reshard->going_down())
+      break;
+
+    utime_t end = ceph_clock_now();
+    end -= start;
+    int secs = cct->_conf->rgw_objexp_gc_interval;
+
+    if (secs <= end.sec())
+      continue; // next round
+
+    secs -= end.sec();
+
+    lock.Lock();
+    cond.WaitInterval(lock, utime_t(secs, 0));
+    lock.Unlock();
+  } while (!reshard->going_down());
+
+  return NULL;
+}
+
+void RGWReshard::ReshardWorker::stop()
+{
+  Mutex::Locker l(lock);
+  cond.Signal();
+}
+
+BucketIndexLockGuard::BucketIndexLockGuard(CephContext* _cct, RGWRados* _store,
+                                          const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
+  cct(_cct),store(_store),
   l(create_bucket_index_lock_name(bucket_instance_id)),
   oid(_oid), io_ctx(_io_ctx),locked(false)
 {
index 7b21541bd3f9e7d95120b8e31ffbd069f656f353..50c9a784592ff551a779481bb21fc7db243a7aa3 100644 (file)
@@ -22,6 +22,7 @@ class BucketIndexLockGuard
    string oid;
    librados::IoCtx io_ctx;
    bool locked;
+
 public:
   BucketIndexLockGuard(CephContext* cct, RGWRados* store, const string& bucket_instance_id,
                                         const string& oid, const librados::IoCtx& io_ctx);
@@ -71,9 +72,32 @@ class RGWReshard {
     string lock_name;
     int max_jobs;
     rados::cls::lock::Lock instance_lock;
+    int num_shards;
 
     int lock_bucket_index_shared(const string& oid);
     int unlock_bucket_index(const string& oid);
+    void get_shard(int shard_num, string& shard);
+protected:
+  class ReshardWorker : public Thread {
+    CephContext *cct;
+    RGWReshard *reshard;
+    Mutex lock;
+    Cond cond;
+
+  public:
+    ReshardWorker(CephContext * const _cct,
+                             RGWReshard * const _reshard)
+      : cct(_cct),
+        reshard(_reshard),
+        lock("ReshardWorker") {
+    }
+
+    void *entry() override;
+    void stop();
+  };
+
+  ReshardWorker *worker;
+  std::atomic<bool> down_flag = { false };
 
   public:
     RGWReshard(RGWRados* _store);
@@ -83,6 +107,12 @@ class RGWReshard {
     int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
     int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
     int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
+    /* reshard thread */
+    int process_single_shard(const std::string& shard);
+    int inspect_all_shards();
+    bool going_down();
+    void start_processor();
+    void stop_processor();
 };
 
 #endif