* @param string &key, omap string key
* @param string &value, mapped value corresponding key
*/
- using omap_set_key_iertr = base_iertr;
+ using omap_set_key_iertr = base_iertr::extend<
+ crimson::ct_error::value_too_large>;
using omap_set_key_ret = omap_set_key_iertr::future<>;
virtual omap_set_key_ret omap_set_key(
omap_root_t &omap_root,
const std::string &key,
const ceph::bufferlist &value) = 0;
- using omap_set_keys_iertr = base_iertr;
+ using omap_set_keys_iertr = omap_set_key_iertr;
using omap_set_keys_ret = omap_set_keys_iertr::future<>;
virtual omap_set_keys_ret omap_set_keys(
omap_root_t &omap_root,
omap_context_t oc,
const std::string &key) = 0;
- using insert_iertr = base_iertr;
+ using insert_iertr = base_iertr::extend<
+ crimson::ct_error::value_too_large>;
using insert_ret = insert_iertr::future<mutation_result_t>;
virtual insert_ret insert(
omap_context_t oc,
virtual ~OMapNode() = default;
+ virtual bool exceeds_max_kv_limit(
+ const std::string &key,
+ const ceph::bufferlist &value) const = 0;
+
void init_range(std::string _begin, std::string _end) {
assert(begin.empty());
assert(end.empty());
LOG_PREFIX(OMapInnerNode::insert);
DEBUGT("{}->{}, this: {}", oc.t, key, value, *this);
auto child_pt = get_containing_child(key);
+ if (exceeds_max_kv_limit(key, value)) {
+ return crimson::ct_error::value_too_large::make();
+ }
return get_child_node(oc, child_pt).si_then(
[oc, &key, &value] (auto extent) {
ceph_assert(!extent->is_btree_root());
return extent->insert(oc, key, value);
- }).si_then([this, oc, child_pt] (auto mresult) {
+ }).si_then([this, oc, child_pt] (auto mresult) -> insert_ret {
if (mresult.status == mutation_status_t::SUCCESS) {
return insert_iertr::make_ready_future<mutation_result_t>(mresult);
} else if (mresult.status == mutation_status_t::WAS_SPLIT) {
std::nullopt, std::nullopt));
}
case mutation_status_t::WAS_SPLIT:
- return handle_split(oc, child_pt, mresult);
+ return handle_split(oc, child_pt, mresult
+ ).handle_error_interruptible(
+ rm_key_iertr::pass_further{},
+ crimson::ct_error::assert_all{"unexpected error"}
+ );
default:
return rm_key_iertr::make_ready_future<mutation_result_t>(mresult);
}
{
LOG_PREFIX(OMapLeafNode::insert);
DEBUGT("{} -> {}, this: {}", oc.t, key, value, *this);
+ if (exceeds_max_kv_limit(key, value)) {
+ return crimson::ct_error::value_too_large::make();
+ }
bool overflow = extent_will_overflow(key.size(), value.length());
if (!overflow) {
if (!is_mutable()) {
const std::string &key,
const ceph::bufferlist &value) final;
+ bool exceeds_max_kv_limit(
+ const std::string &key,
+ const ceph::bufferlist &value) const final {
+ return (key.length() + sizeof(laddr_le_t)) > (capacity() / 4);
+ }
+
rm_key_ret rm_key(
omap_context_t oc,
const std::string &key) final;
omap_context_t oc,
internal_const_iterator_t iter, OMapNodeRef entry);
- using handle_split_iertr = base_iertr;
+ using handle_split_iertr = base_iertr::extend<
+ crimson::ct_error::value_too_large>;
using handle_split_ret = handle_split_iertr::future<mutation_result_t>;
handle_split_ret handle_split(
omap_context_t oc, internal_const_iterator_t iter,
const std::string &key,
const ceph::bufferlist &value) final;
+ bool exceeds_max_kv_limit(
+ const std::string &key,
+ const ceph::bufferlist &value) const final {
+ return (key.length() + value.length()) > (capacity() / 4);
+ }
+
rm_key_ret rm_key(
omap_context_t oc, const std::string &key) final;
seastar::stop_iteration>(
seastar::stop_iteration::no);
}
- });
+ }).handle_error_interruptible(
+ base_iertr::pass_further{},
+ crimson::ct_error::assert_all{"unexpected value_too_large"}
+ );
});
});
});
}
-SeaStore::base_iertr::future<>
+SeaStore::Shard::omaptree_set_keys_iertr::future<>
SeaStore::Shard::omaptree_set_keys(
Transaction& t,
omap_root_t&& root,
uint64_t off,
uint64_t len) const;
- using tm_iertr = base_iertr;
+ using tm_iertr = base_iertr::extend<
+ crimson::ct_error::value_too_large>;
using tm_ret = tm_iertr::future<>;
tm_ret _do_transaction_step(
internal_context_t &ctx,
omap_root_t&& root,
const std::optional<std::string>& start) const;
- base_iertr::future<> omaptree_set_keys(
+ using omaptree_set_keys_iertr = base_iertr::extend<
+ crimson::ct_error::value_too_large>;
+ omaptree_set_keys_iertr::future<> omaptree_set_keys(
Transaction& t,
omap_root_t&& root,
Onode& onode,