/**
* return: Plain, Instance, OLH or Invalid
*/
-BIIndexType bi_type(const string& s, const string& prefix)
+BIIndexType bi_type(const string& s, const string& prefix ="")
{
int ret = bi_entry_type(s.substr(prefix.size()));
if (ret < 0) {
}
}
+ for (auto& s : op.dec_stats) {
+ auto& dest = header.stats[s.first];
+ if (op.absolute) {
+ CLS_LOG(0, "ERROR: %s: there can not be decribed stats when setting absolutly", __func__);
+ return -EINVAL;
+ } else {
+ dest.total_size -= s.second.total_size;
+ dest.total_size_rounded -= s.second.total_size_rounded;
+ dest.num_entries -= s.second.num_entries;
+ dest.actual_size -= s.second.actual_size;
+ }
+ }
+
return write_bucket_header(hctx, &header);
}
return 0;
}
+/* gain bi_entry based on reshard log */
+static int rgw_bi_get_vals_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ // decode request
+ rgw_cls_bi_get_vals_op op;
+ auto bl_iter = in->cbegin();
+ try {
+ decode(op, bl_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(0, "ERROR: %s: failed to decode request", __func__);
+ return -EINVAL;
+ }
+
+ map<string, bufferlist> keys;
+ int ret = cls_cxx_map_get_vals_by_keys(hctx, op.log_entries_wanted, &keys);
+ if (ret < 0) {
+ return ret;
+ }
+
+ rgw_cls_bi_list_ret op_ret;
+ std::map<string, bufferlist>::iterator iter;
+ for (iter = keys.begin(); iter != keys.end(); ++iter) {
+
+ rgw_cls_bi_entry entry;
+ entry.idx = iter->first;
+ entry.type = bi_type(iter->first);
+ entry.data = iter->second;
+
+ auto biter = entry.data.cbegin();
+
+ switch (entry.type) {
+ case BIIndexType::Plain:
+ case BIIndexType::Instance: {
+ rgw_bucket_dir_entry e;
+ try {
+ decode(e, biter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(0, "ERROR: %s: failed to decode buffer", __func__);
+ return -EIO;
+ }
+ break;
+ }
+ case BIIndexType::OLH: {
+ rgw_bucket_olh_entry e;
+ try {
+ decode(e, biter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(0, "ERROR: %s: failed to decode buffer (size=%d)", __func__, entry.data.length());
+ return -EIO;
+ }
+ break;
+ }
+ default:
+ CLS_LOG(0, "%s: invalid entry type: %d", __func__, int(entry.type));
+ return -EINVAL;
+ }
+ CLS_LOG(20, "%s: entry.idx=%s", __func__, escape_str(entry.idx).c_str());
+
+ op_ret.entries.push_back(entry);
+ }
+
+ encode(op_ret, *out);
+
+ return 0;
+}
+
static int rgw_bi_put_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
cls_method_handle_t h_rgw_obj_check_attrs_prefix;
cls_method_handle_t h_rgw_obj_check_mtime;
cls_method_handle_t h_rgw_bi_get_op;
+ cls_method_handle_t h_rgw_bi_get_vals_op;
cls_method_handle_t h_rgw_bi_put_op;
cls_method_handle_t h_rgw_bi_list_op;
cls_method_handle_t h_rgw_bi_log_list_op;
cls_register_cxx_method(h_class, RGW_OBJ_CHECK_MTIME, CLS_METHOD_RD, rgw_obj_check_mtime, &h_rgw_obj_check_mtime);
cls_register_cxx_method(h_class, RGW_BI_GET, CLS_METHOD_RD, rgw_bi_get_op, &h_rgw_bi_get_op);
+ cls_register_cxx_method(h_class, RGW_BI_GET_VALS, CLS_METHOD_RD, rgw_bi_get_vals_op, &h_rgw_bi_get_vals_op);
cls_register_cxx_method(h_class, RGW_BI_PUT, CLS_METHOD_RD | CLS_METHOD_WR, rgw_bi_put_op, &h_rgw_bi_put_op);
cls_register_cxx_method(h_class, RGW_BI_LIST, CLS_METHOD_RD, rgw_bi_list_op, &h_rgw_bi_list_op);
void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
bool absolute,
- const map<RGWObjCategory, rgw_bucket_category_stats>& stats)
+ const map<RGWObjCategory, rgw_bucket_category_stats>& stats,
+ const map<RGWObjCategory, rgw_bucket_category_stats>* dec_stats)
{
rgw_cls_bucket_update_stats_op call;
call.absolute = absolute;
call.stats = stats;
+ if (dec_stats != NULL)
+ call.dec_stats = *dec_stats;
bufferlist in;
encode(call, in);
o.exec(RGW_CLASS, RGW_BUCKET_UPDATE_STATS, in);
return 0;
}
+int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid,
+ std::set<std::string> log_entries_wanted,
+ std::list<rgw_cls_bi_entry> *entries)
+{
+ bufferlist in, out;
+ struct rgw_cls_bi_get_vals_op call;
+ call.log_entries_wanted = log_entries_wanted;
+ encode(call, in);
+ int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_GET_VALS, in, out);
+ if (r < 0)
+ return r;
+
+ struct rgw_cls_bi_list_ret op_ret;
+ auto iter = out.cbegin();
+ try {
+ decode(op_ret, iter);
+ } catch (ceph::buffer::error& err) {
+ return -EIO;
+ }
+
+ entries->swap(op_ret.entries);
+
+ return 0;
+}
int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry)
{
void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
bool absolute,
- const std::map<RGWObjCategory, rgw_bucket_category_stats>& stats);
+ const std::map<RGWObjCategory, rgw_bucket_category_stats>& stats,
+ const std::map<RGWObjCategory, rgw_bucket_category_stats>* dec_stats = nullptr);
void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, const std::string& tag,
const cls_rgw_obj_key& key, const std::string& locator, bool log_op,
int cls_rgw_bi_get(librados::IoCtx& io_ctx, const std::string oid,
BIIndexType index_type, const cls_rgw_obj_key& key,
rgw_cls_bi_entry *entry);
+int cls_rgw_bi_get_vals(librados::IoCtx& io_ctx, const std::string oid,
+ std::set<std::string> log_entries_wanted,
+ std::list<rgw_cls_bi_entry> *entries);
int cls_rgw_bi_put(librados::IoCtx& io_ctx, const std::string oid, const rgw_cls_bi_entry& entry);
void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const std::string oid, const rgw_cls_bi_entry& entry);
int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
#define RGW_OBJ_CHECK_MTIME "obj_check_mtime"
#define RGW_BI_GET "bi_get"
+#define RGW_BI_GET_VALS "bi_get_vals"
#define RGW_BI_PUT "bi_put"
#define RGW_BI_LIST "bi_list"
s.total_size = 1;
s.total_size_rounded = 4096;
s.num_entries = 1;
+ rgw_bucket_category_stats& dec_s = r->dec_stats[RGWObjCategory::None];
+ dec_s.total_size = 1;
+ dec_s.total_size_rounded = 4096;
+ dec_s.num_entries = 1;
o.push_back(r);
o.push_back(new rgw_cls_bucket_update_stats_op);
s[(int)entry.first] = entry.second;
}
encode_json("stats", s, f);
+ map<int, rgw_bucket_category_stats> dec_s;
+ for (auto& entry : dec_stats) {
+ dec_s[(int)entry.first] = entry.second;
+ }
+ encode_json("dec_stats", dec_s, f);
}
void cls_rgw_bi_log_list_op::dump(Formatter *f) const
{
bool absolute{false};
std::map<RGWObjCategory, rgw_bucket_category_stats> stats;
+ std::map<RGWObjCategory, rgw_bucket_category_stats> dec_stats;
rgw_cls_bucket_update_stats_op() {}
void encode(ceph::buffer::list &bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(absolute, bl);
encode(stats, bl);
+ encode(dec_stats, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator &bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(absolute, bl);
decode(stats, bl);
+ if (struct_v >= 2)
+ decode(dec_stats, bl);
DECODE_FINISH(bl);
}
void dump(ceph::Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_cls_bi_get_ret)
+struct rgw_cls_bi_get_vals_op {
+ std::set<std::string> log_entries_wanted;
+
+ void encode(ceph::buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(log_entries_wanted, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(ceph::buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(log_entries_wanted, bl);
+ DECODE_FINISH(bl);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_cls_bi_get_vals_op)
+
struct rgw_cls_bi_put_op {
rgw_cls_bi_entry entry;
return cls_rgw_bi_get(ref.ioctx, ref.obj.oid, index_type, key, entry);
}
+int RGWRados::bi_get_vals(BucketShard& bs, set<string> log_entries_wanted,
+ list<rgw_cls_bi_entry> *entries, optional_yield y)
+{
+ auto& ref = bs.bucket_obj;
+ int ret = cls_rgw_bi_get_vals(ref.ioctx, ref.obj.oid, log_entries_wanted, entries);
+ if (ret < 0)
+ return ret;
+
+ return 0;
+}
+
void RGWRados::bi_put(ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y)
{
auto& ref = bs.bucket_obj;
int bi_get_instance(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_dir_entry *dirent, optional_yield y);
int bi_get_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_olh_entry *olh, optional_yield y);
int bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry, optional_yield y);
+ int bi_get_vals(BucketShard& bs, std::set<std::string> log_entries_wanted, std::list<rgw_cls_bi_entry> *entries, optional_yield y);
void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y);
int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry, optional_yield y);
int bi_put(const DoutPrefixProvider *dpp, rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry, optional_yield y);
}
int add_entry(rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
- const rgw_bucket_category_stats& entry_stats) {
+ const rgw_bucket_category_stats& entry_stats,
+ bool process_log = false) {
entries.push_back(entry);
if (account) {
rgw_bucket_category_stats& target = stats[category];
target.actual_size += entry_stats.actual_size;
}
if (entries.size() >= reshard_shard_batch_size) {
- int ret = flush();
+ int ret = flush(process_log);
if (ret < 0) {
return ret;
}
return 0;
}
- int flush() {
+ int flush(bool process_log = false) {
if (entries.size() == 0) {
return 0;
}
librados::ObjectWriteOperation op;
- for (auto& entry : entries) {
- store->getRados()->bi_put(op, bs, entry, null_yield);
+ if (process_log) {
+ map<RGWObjCategory, rgw_bucket_category_stats> dec_stats;
+ list<rgw_cls_bi_entry> dec_entries;
+ set<string> dec_entry_names_wanted;
+ for (auto& entry : entries) {
+ store->getRados()->bi_put(op, bs, entry, null_yield);
+ dec_entry_names_wanted.emplace(entry.idx);
+ }
+
+ // getting the index entry in target shard
+ int ret = store->getRados()->bi_get_vals(bs, dec_entry_names_wanted, &dec_entries, null_yield);
+ if(ret < 0) {
+ derr << "ERROR: bi_get_vals(): " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ for (auto& dec_entry : dec_entries) {
+ cls_rgw_obj_key cls_key;
+ RGWObjCategory category;
+ rgw_bucket_category_stats accounted_stats;
+ bool account = dec_entry.get_info(&cls_key, &category, &accounted_stats);
+ if (account) {
+ auto& dest = dec_stats[category];
+ dest.total_size += accounted_stats.total_size;
+ dest.total_size_rounded += accounted_stats.total_size_rounded;
+ dest.num_entries += accounted_stats.num_entries;
+ dest.actual_size += accounted_stats.actual_size;
+ }
+ }
+ cls_rgw_bucket_update_stats(op, false, stats, &dec_stats);
+ } else {
+ for (auto& entry : entries) {
+ store->getRados()->bi_put(op, bs, entry, null_yield);
+ }
+ cls_rgw_bucket_update_stats(op, false, stats);
}
- cls_rgw_bucket_update_stats(op, false, stats);
librados::AioCompletion *c;
int ret = get_completion(&c);
int add_entry(int shard_index,
rgw_cls_bi_entry& entry, bool account, RGWObjCategory category,
- const rgw_bucket_category_stats& entry_stats) {
+ const rgw_bucket_category_stats& entry_stats,
+ bool process_log = false) {
int ret = target_shards[shard_index].add_entry(entry, account, category,
- entry_stats);
+ entry_stats, process_log);
if (ret < 0) {
derr << "ERROR: target_shards.add_entry(" << entry.idx <<
") returned error: " << cpp_strerror(-ret) << dendl;
return 0;
}
- int finish() {
+ int finish(bool process_log = false) {
int ret = 0;
for (auto& shard : target_shards) {
- int r = shard.flush();
+ int r = shard.flush(process_log);
if (r < 0) {
derr << "ERROR: target_shards[" << shard.get_shard_id() << "].flush() returned error: " << cpp_strerror(-r) << dendl;
ret = r;
list<rgw_cls_bi_entry> entries;
string stage;
- bool read_reshardlog;
+ bool process_log = false;
switch (reshard_stage) {
case rgw::BucketReshardState::InLogrecord:
stage = "inventory";
- read_reshardlog = false;
+ process_log = false;
break;
case rgw::BucketReshardState::InProgress:
stage = "inc";
- read_reshardlog = true;
+ process_log = true;
break;
default:
ldpp_dout(dpp, 0) << "ERROR: " << __func__ << " unknown reshard stage" << dendl;
int ret = store->getRados()->bi_list(dpp, bucket_info, i, null_object_filter,
marker, max_op_entries, &entries,
- &is_truncated, read_reshardlog, y);
+ &is_truncated, process_log, y);
if (ret == -ENOENT) {
ldpp_dout(dpp, 1) << "WARNING: " << __func__ << " failed to find shard "
<< i << ", skipping" << dendl;
int shard_index = (target_shard_id > 0 ? target_shard_id : 0);
ret = target_shards_mgr.add_entry(shard_index, entry, account,
- category, stats);
+ category, stats, process_log);
if (ret < 0) {
return ret;
}
(*out) << " " << stage_entries << std::endl;
}
- int ret = target_shards_mgr.finish();
+ int ret = target_shards_mgr.finish(process_log);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to reshard: " << ret << dendl;
return -EIO;