#include "common/errno.h"
#include "common/ceph_json.h"
+#include "common/dout.h"
+
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
RGWBucketReshard::RGWBucketReshard(RGWRados *_store, const RGWBucketInfo& _bucket_info, const map<string, bufferlist>& _bucket_attrs) :
store(_store), bucket_info(_bucket_info), bucket_attrs(_bucket_attrs),
reshard_lock(reshard_lock_name) {
- const rgw_bucket& b = bucket_info.bucket;
+ const rgw_bucket& b = bucket_info.bucket;
reshard_oid = b.tenant + (b.tenant.empty() ? "" : ":") + b.name + ":" + b.bucket_id;
utime_t lock_duration(store->ctx()->_conf->rgw_reshard_bucket_lock_duration, 0);
}
int RGWBucketReshard::execute(int num_shards, int max_op_entries,
- bool verbose, ostream *out, Formatter *formatter)
+ bool verbose, ostream *out, Formatter *formatter, RGWReshard* reshard_log)
{
int ret = lock_bucket();
}
RGWBucketInfo new_bucket_info;
-
ret = create_new_bucket_instance(num_shards, new_bucket_info);
if (ret < 0) {
unlock_bucket();
return ret;
}
+
+ if (reshard_log) {
+ ret = reshard_log->update(bucket_info, new_bucket_info);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
ret = set_resharding_status(new_bucket_info.bucket.bucket_id, num_shards, CLS_RGW_RESHARD_IN_PROGRESS);
if (ret < 0) {
unlock_bucket();
return 0;
}
+int RGWReshard::update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info)
+{
+ cls_rgw_reshard_entry entry;
+ entry.bucket_name = bucket_info.bucket.name;
+ entry.bucket_id = bucket_info.bucket.bucket_id;
+
+ int ret = get(entry);
+ if (ret < 0) {
+ return ret;
+ }
+
+ entry.new_instance_id = new_bucket_info.bucket.name + ":" + new_bucket_info.bucket.bucket_id;
+
+ ret = add(entry);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
+ cpp_strerror(-ret) << dendl;
+ }
+
+ return ret;
+}
+
int RGWReshard::list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated)
{
int RGWReshard::process_single_logshard(int logshard_num)
{
string marker;
- bool truncated = false;
+ bool truncated = true;
CephContext *cct = store->ctx();
int max_entries = 1000;
}
for(auto& entry: entries) {
- /* resharding has not started */
- RGWObjectCtx obj_ctx(store);
- RGWBucketInfo bucket_info;
- map<string, bufferlist> attrs;
+ if(entry.new_instance_id.empty()) {
- ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
- &attrs);
- if (ret < 0) {
- ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
- return -ret;
- }
- rgw_bucket bucket = bucket_info.bucket;
- RGWBucketInfo new_bucket_info(bucket_info);
+ ldout(store->ctx(), 20) << __func__ << " resharding " << entry.bucket_name << dendl;
- if(entry.new_instance_id.empty()) {
- ret = create_new_bucket_instance(store, entry.new_num_shards, bucket_info, attrs,
- new_bucket_info);
+ RGWObjectCtx obj_ctx(store);
+ rgw_bucket bucket;
+ RGWBucketInfo bucket_info;
+ map<string, bufferlist> attrs;
+
+ ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name, bucket_info, nullptr,
+ &attrs);
if (ret < 0) {
- ldout(cct, 0) << __func__ << " ERROR: could not create new bucket info: " << cpp_strerror(-ret) << dendl;
- return ret;
+ ldout(cct, 0) << __func__ << ": Error in get_bucket_info: " << cpp_strerror(-ret) << dendl;
+ return -ret;
}
- entry.new_instance_id = new_bucket_info.bucket.bucket_id;
-
- ldout(cct, 20) << "reshard: assigning new bucket instance id for bucket=" << bucket.name
- << " new_instance_id=" << entry.new_instance_id << dendl;
+ RGWBucketReshard br(store, bucket_info, attrs);
- ret = add(entry);
+ Formatter* formatter = new JSONFormatter(false);
+ auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
+ ret = br.execute(entry.new_num_shards, max_entries, true,nullptr, formatter, this);
if (ret < 0) {
- ldout(cct, 0) << __func__ << ":Error in updating entry bucket " << entry.bucket_name << ": " <<
- cpp_strerror(-ret) << dendl;
+ ldout (store->ctx(), 0) << __func__ << "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
+ cpp_strerror(-ret)<< dendl;
return ret;
}
- }
- RGWBucketAdminOpState bucket_op;
- RGWBucketReshard reshard_op(store, bucket_info, attrs);
- ret = reshard_op.do_reshard(entry.new_num_shards, new_bucket_info,
- max_entries, verbose, out, formatter);
- if (ret < 0) {
- return ret;
- }
+ ldout (store->ctx(), 20) << " removing entry" << entry.bucket_name<< dendl;
- ret = remove(entry);
- if (ret < 0) {
- ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
- << cpp_strerror(-ret) << dendl;
- return ret;
+ ret = remove(entry);
+ if (ret < 0) {
+ ldout(cct, 0)<< __func__ << ":Error removing bucket " << entry.bucket_name << " for resharding queue: "
+ << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
}
utime_t now = ceph_clock_now();
int set_resharding_status(const string& new_instance_id, int32_t num_shards, cls_rgw_reshard_status status);
int clear_resharding();
- int create_new_bucket_instance(int new_num_shards,
- RGWBucketInfo& new_bucket_info);
+ int create_new_bucket_instance(int new_num_shards, RGWBucketInfo& new_bucket_info);
int do_reshard(int num_shards,
const RGWBucketInfo& new_bucket_info,
int max_entries,
int execute(int num_shards, int max_op_entries,
bool verbose = false, ostream *out = nullptr,
- Formatter *formatter = nullptr);
+ Formatter *formatter = nullptr,
+ RGWReshard *reshard_log = nullptr);
int abort();
int get_status(std::list<cls_rgw_bucket_instance_entry> *status);
};
public:
RGWReshard(RGWRados* _store, bool _verbose = false, ostream *_out = nullptr, Formatter *_formatter = nullptr);
int add(cls_rgw_reshard_entry& entry);
+ int update(const RGWBucketInfo& bucket_info, const RGWBucketInfo& new_bucket_info);
int get(cls_rgw_reshard_entry& entry);
int remove(cls_rgw_reshard_entry& entry);
int list(int logshard_num, string& marker, uint32_t max, std::list<cls_rgw_reshard_entry>& entries, bool *is_truncated);