return manager->aio_operate(io_ctx, shard_id, oid, &op);
}
+static bool issue_bi_dir_suggest_op(librados::IoCtx& io_ctx,
+ const int shard_id,
+ const std::string& shard_oid,
+ bufferlist& updates,
+ BucketIndexAioManager* manager)
+{
+ librados::ObjectWriteOperation op;
+ op.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates);
+ return manager->aio_operate(io_ctx, shard_id, shard_oid, &op);
+}
+
int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid)
{
return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
return issue_bi_log_stop(io_ctx, shard_id, oid, &manager);
}
+int CLSRGWIssueBucketBIDirSuggest::issue_op(int shard_id,
+ const std::string& oid)
+{
+ auto update_iter = updates.find(shard_id);
+ if (update_iter == updates.end()) {
+ // we did not have an update for that shard, so do nothing
+ return 0;
+ }
+ return issue_bi_dir_suggest_op(io_ctx, shard_id, oid,
+ update_iter->second, &manager);
+}
+
class GetDirHeaderCompletion : public ObjectOperationCompletion {
RGWGetDirHeader_CB *ret_ctx;
public:
CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio) {}
};
+class CLSRGWIssueBucketBIDirSuggest : public CLSRGWConcurrentIO {
+ std::map<int, bufferlist>& updates;
+protected:
+ virtual int issue_op(int shard_id, const std::string& oid);
+public:
+ CLSRGWIssueBucketBIDirSuggest(librados::IoCtx& io_ctx,
+ std::map<int, std::string>& _bucket_objs,
+ std::map<int, bufferlist>& _updates,
+ uint32_t max_aio)
+ : CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio),
+ updates(_updates)
+ {/* empty */ }
+};
+
+
int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, std::string& oid, RGWGetDirHeader_CB *ctx);
void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, ceph::buffer::list& updates);
}
-int RGWRados::remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, list<rgw_obj_index_key>& oid_list)
+int RGWRados::remove_objs_from_index(const DoutPrefixProvider *dpp,
+ RGWBucketInfo& bucket_info,
+ const std::list<rgw_obj_index_key>& entry_key_list)
{
RGWSI_RADOS::Pool index_pool;
- string dir_oid;
+ const auto& latest_log = bucket_info.layout.logs.back();
+ const rgw::bucket_index_layout_generation& current_index =
+ rgw::log_to_index_layout(latest_log);
+
+ const uint32_t num_shards = current_index.layout.normal.num_shards;
uint8_t suggest_flag = (svc.zone->get_zone().log_data ? CEPH_RGW_DIR_SUGGEST_LOG_OP : 0);
+ std::map<int, std::string> index_oids;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, &index_pool, &dir_oid);
- if (r < 0)
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt,
+ current_index, &index_pool,
+ &index_oids, nullptr);
+ if (r < 0) {
return r;
+ }
+
+ // needed so objclass won't skip req
+ constexpr uint64_t highest_epoch = uint64_t(-1);
- bufferlist updates;
+ // split up removals by shard
+ std::map<int, bufferlist> sharded_updates;
+ for (const auto& entry_key : entry_key_list) {
+ const rgw_obj_key obj_key(entry_key);
+ const uint32_t shard = rgw_bucket_shard_index(obj_key, num_shards);
+ bufferlist& updates = sharded_updates[shard];
- for (auto iter = oid_list.begin(); iter != oid_list.end(); ++iter) {
rgw_bucket_dir_entry entry;
- entry.key = *iter;
- ldpp_dout(dpp, 2) << "RGWRados::remove_objs_from_index bucket=" << bucket_info.bucket << " obj=" << entry.key.name << ":" << entry.key.instance << dendl;
- entry.ver.epoch = (uint64_t)-1; // ULLONG_MAX, needed to that objclass doesn't skip out request
+ entry.key = entry_key;
+ entry.ver.epoch = highest_epoch;
+
+ ldpp_dout(dpp, 5) << __func__ << "bucket=" << bucket_info.bucket <<
+ " shard=" << shard <<
+ " obj=" << entry.key.name << ":" << entry.key.instance << dendl;
+
updates.append(CEPH_RGW_REMOVE | suggest_flag);
encode(entry, updates);
}
- bufferlist out;
-
- r = index_pool.ioctx().exec(dir_oid, RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates, out);
-
- return r;
+ // this operation ignores shards for which there's not an update, so
+ // we can send the complete list of bucket index shard oids
+ return CLSRGWIssueBucketBIDirSuggest(index_pool.ioctx(),
+ index_oids,
+ sharded_updates,
+ cct->_conf->rgw_bucket_index_max_aio)();
}
int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
std::map<RGWObjCategory, RGWStorageStats> *calculated_stats);
int bucket_rebuild_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info);
int bucket_set_reshard(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry);
- int remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, std::list<rgw_obj_index_key>& oid_list);
+ int remove_objs_from_index(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info,
+ const std::list<rgw_obj_index_key>& oid_list);
int move_rados_obj(const DoutPrefixProvider *dpp,
librados::IoCtx& src_ioctx,
const std::string& src_oid, const std::string& src_locator,
name = prefix + buf;
}
+uint32_t rgw_bucket_shard_index(const rgw_obj_key& obj_key,
+ int num_shards)
+{
+ std::string sharding_key;
+ if (obj_key.ns == RGW_OBJ_NS_MULTIPART) {
+ RGWMPObj mp;
+ mp.from_meta(obj_key.name);
+ sharding_key = mp.get_key();
+ } else {
+ sharding_key = obj_key.name;
+ }
+
+ uint32_t sid = ceph_str_hash_linux(sharding_key.c_str(), sharding_key.size());
+ uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
+ return rgw_shards_mod(sid2, num_shards);
+}
+
int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
const string& str, uint32_t *perm)
{
return hval % RGW_SHARDS_PRIME_1 % max_shards;
}
+uint32_t rgw_bucket_shard_index(const rgw_obj_key& obj_key,
+ int num_shards);
+
// used for logging and tagging
inline int rgw_shard_id(const std::string& key, int max_shards)
{