From: Orit Wasserman Date: Sun, 14 May 2017 05:34:04 +0000 (+0300) Subject: rgw: add resharding thread X-Git-Tag: v12.1.0~276^2~42 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ad138a4d7272598831c666c4d06abc2853a2e150;p=ceph-ci.git rgw: add resharding thread Signed-off-by: Orit Wasserman --- diff --git a/src/rgw/rgw_reshard.cc b/src/rgw/rgw_reshard.cc index 8cd657d380d..67ffe30f311 100644 --- a/src/rgw/rgw_reshard.cc +++ b/src/rgw/rgw_reshard.cc @@ -630,8 +630,134 @@ int RGWReshard::block_while_resharding(RGWRados::BucketShard *bs, string *new_bu return -ERR_BUSY_RESHARDING; } -BucketIndexLockGuard::BucketIndexLockGuard(CephContext *_cct, RGWRados* _store, const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) : - cct(_cct), store(_store), +int RGWReshard::process_single_shard(const string& shard) +{ + string marker; + bool truncated = false; + + CephContext *cct = store->ctx(); + int max_entries = 1000; + int max_secs = 10; + + rados::cls::lock::Lock l(reshard_lock_name); + + utime_t time(max_secs, 0); + l.set_duration(time); + + int ret = l.lock_exclusive(&store->reshard_pool_ctx, shard); + if (ret == -EBUSY) { /* already locked by another processor */ + dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl; + return ret; + } + + do { + std::list entries; + ret = list(marker, max_entries, entries, truncated); + if (ret < 0) { + ldout(cct, 10) << "cannot list all reshards: " << shard + << dendl; + continue; + } + } while (truncated); + + l.unlock(&store->reshard_pool_ctx, shard); + return 0; +} + + +void RGWReshard::get_shard(int shard_num, string& shard) +{ + char buf[32]; + snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num); + + string objname("bucket_reshard."); + shard = objname + buf; +} + +int RGWReshard::inspect_all_shards() +{ + CephContext * const cct = store->ctx(); + int ret = 0; + + for (int i = 0; i < num_shards; i++) { + string shard; + store->objexp_get_shard(i, shard); + + ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl; + + ret = process_single_shard(shard); + if (ret <0) { + return ret; + } + } + + return 0; +} + +bool RGWReshard::going_down() +{ + return down_flag; +} + +void RGWReshard::start_processor() +{ + worker = new ReshardWorker(store->ctx(), this); + worker->create("rgw_reshard"); +} + +void RGWReshard::stop_processor() +{ + down_flag = true; + if (worker) { + worker->stop(); + worker->join(); + } + delete worker; + worker = NULL; +} + +void *RGWReshard::ReshardWorker::entry() { + utime_t last_run; + do { + utime_t start = ceph_clock_now(); + ldout(cct, 2) << "object expiration: start" << dendl; + if (reshard->inspect_all_shards()) { + /* All shards have been processed properly. Next time we can start + * from this moment. */ + last_run = start; + } + ldout(cct, 2) << "object expiration: stop" << dendl; + + + if (reshard->going_down()) + break; + + utime_t end = ceph_clock_now(); + end -= start; + int secs = cct->_conf->rgw_objexp_gc_interval; + + if (secs <= end.sec()) + continue; // next round + + secs -= end.sec(); + + lock.Lock(); + cond.WaitInterval(lock, utime_t(secs, 0)); + lock.Unlock(); + } while (!reshard->going_down()); + + return NULL; +} + +void RGWReshard::ReshardWorker::stop() +{ + Mutex::Locker l(lock); + cond.Signal(); +} + +BucketIndexLockGuard::BucketIndexLockGuard(CephContext* _cct, RGWRados* _store, + const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) : + cct(_cct),store(_store), l(create_bucket_index_lock_name(bucket_instance_id)), oid(_oid), io_ctx(_io_ctx),locked(false) { diff --git a/src/rgw/rgw_reshard.h b/src/rgw/rgw_reshard.h index 7b21541bd3f..50c9a784592 100644 --- a/src/rgw/rgw_reshard.h +++ b/src/rgw/rgw_reshard.h @@ -22,6 +22,7 @@ class BucketIndexLockGuard string oid; librados::IoCtx io_ctx; bool locked; + public: BucketIndexLockGuard(CephContext* cct, RGWRados* store, const string& bucket_instance_id, const string& oid, const librados::IoCtx& io_ctx); @@ -71,9 +72,32 @@ class RGWReshard { string lock_name; int max_jobs; rados::cls::lock::Lock instance_lock; + int num_shards; int lock_bucket_index_shared(const string& oid); int unlock_bucket_index(const string& oid); + void get_shard(int shard_num, string& shard); +protected: + class ReshardWorker : public Thread { + CephContext *cct; + RGWReshard *reshard; + Mutex lock; + Cond cond; + + public: + ReshardWorker(CephContext * const _cct, + RGWReshard * const _reshard) + : cct(_cct), + reshard(_reshard), + lock("ReshardWorker") { + } + + void *entry() override; + void stop(); + }; + + ReshardWorker *worker; + std::atomic down_flag = { false }; public: RGWReshard(RGWRados* _store); @@ -83,6 +107,12 @@ class RGWReshard { int list(string& marker, uint32_t max, list& entries, bool& is_truncated); int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry); int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id); + /* reshard thread */ + int process_single_shard(const std::string& shard); + int inspect_all_shards(); + bool going_down(); + void start_processor(); + void stop_processor(); }; #endif