#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
-const string reshard_oid = "reshard";
+const string reshard_oid_prefix = "reshard";
const string reshard_lock_name = "reshard_process";
const string bucket_instance_lock_name = "bucket_instance_lock";
RGWReshard::RGWReshard(RGWRados* _store): store(_store), instance_lock(bucket_instance_lock_name)
{
- max_jobs = store->ctx()->_conf->rgw_reshard_max_jobs;
+ num_logshards = store->ctx()->_conf->rgw_reshard_num_logs;
+}
+
+string RGWReshard::get_logshard_key(const string& tenant, const string& bucket_name)
+{
+ return bucket_name + ":" + tenant; /* transposed on purpose */
+}
+
+#define MAX_RESHARD_LOGSHARDS_PRIME 7877
+
+void RGWReshard::get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid)
+{
+ string key = get_logshard_key(tenant, bucket_name);
+
+ uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
+ uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
+ sid = sid2 % MAX_RESHARD_LOGSHARDS_PRIME % num_logshards;
+ int logshard = sid % num_logshards;
+
+ get_logshard_oid(logshard, oid);
}
int RGWReshard::add(cls_rgw_reshard_entry& entry)
{
- rados::cls::lock::Lock l(reshard_lock_name);
+ string logshard_oid;
- int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
- if (ret == -EBUSY) {
- ldout(store->ctx(), 0) << "RGWReshard::add failed to acquire lock on " << reshard_oid << dendl;
- return 0;
- }
- if (ret < 0)
- return ret;
+ get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
librados::ObjectWriteOperation op;
cls_rgw_reshard_add(op, entry);
- ret = store->reshard_pool_ctx.operate(reshard_oid, &op);
-
- l.unlock(&store->reshard_pool_ctx, reshard_oid);
- return ret;
+ int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
+ return ret;
+ }
+ return 0;
}
-int RGWReshard::list(string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool& is_truncated)
+int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
{
- rados::cls::lock::Lock l(reshard_lock_name);
-
- int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid);
- if (ret == -EBUSY) {
- ldout(store->ctx(), 0) << "RGWReshard::list failed to acquire lock on " << reshard_oid << dendl;
- return 0;
- }
- if (ret < 0)
- return ret;
+ string logshard_oid;
- ret = cls_rgw_reshard_list(store->reshard_pool_ctx, reshard_oid, marker, max, entries, &is_truncated);
+ get_logshard_oid(logshard_num, &logshard_oid);
- l.unlock(&store->reshard_pool_ctx, reshard_oid);
- return ret;
+ int ret = cls_rgw_reshard_list(store->reshard_pool_ctx, logshard_oid, marker, max, entries, is_truncated);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: failed to list reshard log entries, oid=" << logshard_oid << dendl;
+ return ret;
+ }
+ return 0;
}
int RGWReshard::get(cls_rgw_reshard_entry& entry)
{
- rados::cls::lock::Lock l(reshard_lock_name);
+ string logshard_oid;
- int ret = l.lock_shared(&store->reshard_pool_ctx, reshard_oid);
- if (ret == -EBUSY) {
- ldout(store->ctx(), 0) << "RGWReshardLog::get failed to acquire lock on " << reshard_oid << dendl;
- return 0;
- }
- if (ret < 0)
- return ret;
+ get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
- ret = cls_rgw_reshard_get(store->reshard_pool_ctx, reshard_oid, entry);
+ int ret = cls_rgw_reshard_get(store->reshard_pool_ctx, logshard_oid, entry);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: failed to get entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
+ return ret;
+ }
- l.unlock(&store->reshard_pool_ctx, reshard_oid);
- return ret;
+ return 0;
}
int RGWReshard::remove(cls_rgw_reshard_entry& entry)
{
- rados::cls::lock::Lock l(reshard_lock_name);
+ string logshard_oid;
- int ret = l.lock_exclusive(&store->reshard_pool_ctx, reshard_oid);
- if (ret == -EBUSY) {
- ldout(store->ctx(), 0) << "RGWReshardLog::remove failed to acquire lock on " << reshard_oid << dendl;
- return 0;
- }
- if (ret < 0)
- return ret;
+ get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
librados::ObjectWriteOperation op;
cls_rgw_reshard_remove(op, entry);
- ret = store->reshard_pool_ctx.operate(reshard_oid, &op);
+ int ret = store->reshard_pool_ctx.operate(logshard_oid, &op);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: failed to remove entry from reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
+ return ret;
+ }
- l.unlock(&store->reshard_pool_ctx, reshard_oid);
return ret;
}
-std::string create_bucket_index_lock_name(const string& bucket_instance_id) {
- return bucket_instance_lock_name + "." + bucket_instance_id;
-}
-
int RGWReshard::clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry)
{
- rados::cls::lock::Lock l(create_bucket_index_lock_name(entry.old_instance_id));
-
- int ret = l.lock_exclusive(&store->reshard_pool_ctx, bucket_instance_oid);
- if (ret == -EBUSY) {
- ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
- return 0;
- }
- if (ret < 0)
+ int ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
+ if (ret < 0) {
+ lderr(store->ctx()) << "ERROR: failed to clear bucket resharding, bucket_instance_oid=" << bucket_instance_oid << dendl;
return ret;
-
- entry.new_instance_id.clear();
-
- ret = cls_rgw_clear_bucket_resharding(store->reshard_pool_ctx, bucket_instance_oid);
-
- l.unlock(&store->reshard_pool_ctx, bucket_instance_oid);
- return ret;
-}
-
-int RGWReshard::lock_bucket_index_shared(const string& oid)
-{
- int ret = instance_lock.lock_shared(&store->reshard_pool_ctx, oid);
- if (ret == -EBUSY) {
- ldout(store->ctx(), 0) << "RGWReshardLog::add failed to acquire lock on " << reshard_oid << dendl;
- return 0;
}
- return ret;
-}
-
-int RGWReshard::unlock_bucket_index(const string& oid)
-{
- instance_lock.unlock(&store->reshard_pool_ctx, oid);
return 0;
}
return 0;
}
-int RGWReshard::process_single_shard(const string& shard)
+int RGWReshard::process_single_logshard(int logshard_num)
{
string marker;
bool truncated = false;
CephContext *cct = store->ctx();
int max_entries = 1000;
- int max_secs = 10;
-
+ int max_secs = 60;
+
rados::cls::lock::Lock l(reshard_lock_name);
utime_t time(max_secs, 0);
l.set_duration(time);
- int ret = l.lock_exclusive(&store->reshard_pool_ctx, shard);
+ string logshard_oid;
+ get_logshard_oid(logshard_num, &logshard_oid);
+
+ int ret = l.lock_exclusive(&store->reshard_pool_ctx, logshard_oid);
if (ret == -EBUSY) { /* already locked by another processor */
- dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
+ ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " << logshard_oid << dendl;
return ret;
}
do {
std::list<cls_rgw_reshard_entry> entries;
- ret = list(marker, max_entries, entries, truncated);
+ ret = list(logshard_num, marker, max_entries, entries, &truncated);
if (ret < 0) {
- ldout(cct, 10) << "cannot list all reshards: " << shard
- << dendl;
+ ldout(cct, 10) << "cannot list all reshards in logshard oid=" << logshard_oid << dendl;
continue;
}
}
}
}
+#warning update marker?
+#warning do work here, renew lock
} while (truncated);
- l.unlock(&store->reshard_pool_ctx, shard);
+ l.unlock(&store->reshard_pool_ctx, logshard_oid);
return 0;
}
-void RGWReshard::get_shard(int shard_num, string& shard)
+void RGWReshard::get_logshard_oid(int shard_num, string *logshard)
{
char buf[32];
snprintf(buf, sizeof(buf), "%010u", (unsigned)shard_num);
- string objname("bucket_reshard.");
- shard = objname + buf;
+ string objname(reshard_oid_prefix);
+ *logshard = objname + buf;
}
-int RGWReshard::inspect_all_shards()
+int RGWReshard::inspect_all_logshards()
{
int ret = 0;
- for (int i = 0; i < num_shards; i++) {
- string shard;
- store->objexp_get_shard(i, shard);
+ for (int i = 0; i < num_logshards; i++) {
+ string logshard;
+ get_logshard_oid(i, &logshard);
- ldout(store->ctx(), 20) << "proceeding shard = " << shard << dendl;
+ ldout(store->ctx(), 20) << "proceeding logshard = " << logshard << dendl;
- ret = process_single_shard(shard);
+ ret = process_single_logshard(i);
if (ret <0) {
return ret;
}
do {
utime_t start = ceph_clock_now();
ldout(cct, 2) << "object expiration: start" << dendl;
- if (reshard->inspect_all_shards()) {
+ if (reshard->inspect_all_logshards()) {
/* All shards have been processed properly. Next time we can start
* from this moment. */
last_run = start;
utime_t end = ceph_clock_now();
end -= start;
- int secs = cct->_conf->rgw_objexp_gc_interval;
+ int secs = cct->_conf->rgw_reshard_thread_interval;
if (secs <= end.sec())
continue; // next round
cond.Signal();
}
+#if 0
BucketIndexLockGuard::BucketIndexLockGuard(RGWRados* _store,
const string& bucket_instance_id, const string& _oid, const librados::IoCtx& _io_ctx) :
store(_store),
class RGWRados;
+#if 0
/* gets a locked lock , release it when exiting context */
class BucketIndexLockGuard
{
int lock();
int unlock();
};
+#endif
class RGWBucketReshard {
class RGWReshard {
RGWRados *store;
string lock_name;
- int max_jobs;
rados::cls::lock::Lock instance_lock;
- int num_shards;
+ int num_logshards;
- int lock_bucket_index_shared(const string& oid);
- int unlock_bucket_index(const string& oid);
- void get_shard(int shard_num, string& shard);
+ void get_logshard_oid(int shard_num, string *shard);
protected:
class ReshardWorker : public Thread {
CephContext *cct;
ReshardWorker *worker;
std::atomic<bool> down_flag = { false };
- public:
- RGWReshard(RGWRados* _store);
- int add(cls_rgw_reshard_entry& entry);
- int get(cls_rgw_reshard_entry& entry);
- int remove(cls_rgw_reshard_entry& entry);
- int list(string& marker, uint32_t max, list<cls_rgw_reshard_entry>& entries, bool& is_truncated);
- int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
-
- int reshard_bucket(Formatter *formatter,
- int num_shards,
- rgw_bucket& bucket,
- RGWBucketInfo& bucket_info,
- RGWBucketInfo& new_bucket_info,
- int max_entries,
- RGWBucketAdminOpState& bucket_op,
- bool verbose = false);
-
- /* reshard thread */
- int process_single_shard(const std::string& shard);
- int inspect_all_shards();
- bool going_down();
- void start_processor();
- void stop_processor();
+ string get_logshard_key(const string& tenant, const string& bucket_name);
+ void get_bucket_logshard_oid(const string& tenant, const string& bucket_name, string *oid);
+
+public:
+ RGWReshard(RGWRados* _store);
+ int add(cls_rgw_reshard_entry& entry);
+ int get(cls_rgw_reshard_entry& entry);
+ int remove(cls_rgw_reshard_entry& entry);
+ int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);
+ int clear_bucket_resharding(const string& bucket_instance_oid, cls_rgw_reshard_entry& entry);
+
+ int reshard_bucket(Formatter *formatter,
+ int num_shards,
+ rgw_bucket& bucket,
+ RGWBucketInfo& bucket_info,
+ RGWBucketInfo& new_bucket_info,
+ int max_entries,
+ RGWBucketAdminOpState& bucket_op,
+ bool verbose = false);
+
+ /* reshard thread */
+ int process_single_logshard(int logshard_num);
+ int inspect_all_logshards();
+ bool going_down();
+ void start_processor();
+ void stop_processor();
};