return -ERR_BUSY_RESHARDING;
}
-BucketIndexLockGuard::BucketIndexLockGuard(CephContext *_cct, RGWRados* _store, const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
- cct(_cct), store(_store),
+int RGWReshard::process_single_shard(const string& shard)
+{
+ string marker;
+ bool truncated = false;
+
+ CephContext *cct = store->ctx();
+ int max_entries = 1000;
+ int max_secs = 10;
+
+ rados::cls::lock::Lock l(reshard_lock_name);
+
+ utime_t time(max_secs, 0);
+ l.set_duration(time);
+
+ int ret = l.lock_exclusive(&store->reshard_pool_ctx, shard);
+ if (ret == -EBUSY) { /* already locked by another processor */
+ dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
+ return ret;
+ }
+
+ do {
+ std::list<cls_rgw_reshard_entry> entries;
+ ret = list(marker, max_entries, entries, truncated);
+ if (ret < 0) {
+ ldout(cct, 10) << "cannot list all reshards: " << shard
+ << dendl;
+ continue;
+ }
+ } while (truncated);
+
+ l.unlock(&store->reshard_pool_ctx, shard);
+ return 0;
+}
+
+
+void RGWReshard::get_shard(int shard_num, string& shard)
+{
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
+
+ string objname("bucket_reshard.");
+ shard = objname + buf;
+}
+
+int RGWReshard::inspect_all_shards()
+{
+ CephContext * const cct = store->ctx();
+ int ret = 0;
+
+ for (int i = 0; i < num_shards; i++) {
+ string shard;
+ store->objexp_get_shard(i, shard);
+
+ ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;
+
+ ret = process_single_shard(shard);
+ if (ret <0) {
+ return ret;
+ }
+ }
+
+ return 0;
+}
+
+bool RGWReshard::going_down()
+{
+ return down_flag;
+}
+
+void RGWReshard::start_processor()
+{
+ worker = new ReshardWorker(store->ctx(), this);
+ worker->create("rgw_reshard");
+}
+
+void RGWReshard::stop_processor()
+{
+ down_flag = true;
+ if (worker) {
+ worker->stop();
+ worker->join();
+ }
+ delete worker;
+ worker = NULL;
+}
+
+void *RGWReshard::ReshardWorker::entry() {
+ utime_t last_run;
+ do {
+ utime_t start = ceph_clock_now();
+ ldout(cct, 2) << "object expiration: start" << dendl;
+ if (reshard->inspect_all_shards()) {
+ /* All shards have been processed properly. Next time we can start
+ * from this moment. */
+ last_run = start;
+ }
+ ldout(cct, 2) << "object expiration: stop" << dendl;
+
+
+ if (reshard->going_down())
+ break;
+
+ utime_t end = ceph_clock_now();
+ end -= start;
+ int secs = cct->_conf->rgw_objexp_gc_interval;
+
+ if (secs <= end.sec())
+ continue; // next round
+
+ secs -= end.sec();
+
+ lock.Lock();
+ cond.WaitInterval(lock, utime_t(secs, 0));
+ lock.Unlock();
+ } while (!reshard->going_down());
+
+ return NULL;
+}
+
+void RGWReshard::ReshardWorker::stop()
+{
+ Mutex::Locker l(lock);
+ cond.Signal();
+}
+
+BucketIndexLockGuard::BucketIndexLockGuard(CephContext* _cct, RGWRados* _store,
+ const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
+ cct(_cct),store(_store),
l(create_bucket_index_lock_name(bucket_instance_id)),
oid(_oid), io_ctx(_io_ctx),locked(false)
{
string oid;
librados::IoCtx io_ctx;
bool locked;
+
public:
BucketIndexLockGuard(CephContext* cct, RGWRados* store, const string& bucket_instance_id,
const string& oid, const librados::IoCtx& io_ctx);
string lock_name;
int max_jobs;
rados::cls::lock::Lock instance_lock;
+ int num_shards;
int lock_bucket_index_shared(const string& oid);
int unlock_bucket_index(const string& oid);
+ void get_shard(int shard_num, string& shard);
+protected:
+ class ReshardWorker : public Thread {
+ CephContext *cct;
+ RGWReshard *reshard;
+ Mutex lock;
+ Cond cond;
+
+ public:
+ ReshardWorker(CephContext * const _cct,
+ RGWReshard * const _reshard)
+ : cct(_cct),
+ reshard(_reshard),
+ lock("ReshardWorker") {
+ }
+
+ void *entry() override;
+ void stop();
+ };
+
+ ReshardWorker *worker;
+ std::atomic<bool> down_flag = { false };
public:
RGWReshard(RGWRados* _store);
int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
+ /* reshard thread */
+ int process_single_shard(const std::string& shard);
+ int inspect_all_shards();
+ bool going_down();
+ void start_processor();
+ void stop_processor();
};
#endif