--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/common/tmap_helpers.h"
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "include/rados.h"
+
+namespace detail {
+
+#define decode_or_return(v, bp) \
+ try { \
+ ::decode(v, bp); \
+ } catch (...) { \
+ return -EINVAL; \
+ }
+
+class TMapContents {
+ std::map<std::string, bufferlist> keys;
+ bufferlist header;
+public:
+ TMapContents() = default;
+
+ int decode(bufferlist::const_iterator &bliter) {
+ keys.clear();
+ header.clear();
+ if (bliter.end()) {
+ return 0;
+ }
+ decode_or_return(header, bliter);
+ __u32 num_keys;
+ decode_or_return(num_keys, bliter);
+ for (; num_keys > 0; --num_keys) {
+ std::string key;
+ decode_or_return(key, bliter);
+ decode_or_return(keys[key], bliter);
+ }
+ return 0;
+ }
+
+ bufferlist encode() {
+ bufferlist bl;
+ ::encode(header, bl);
+ ::encode(static_cast<__u32>(keys.size()), bl);
+ for (auto &[k, v]: keys) {
+ ::encode(k, bl);
+ ::encode(v, bl);
+ }
+ return bl;
+ }
+
+ int update(bufferlist::const_iterator in) {
+ while (!in.end()) {
+ __u8 op;
+ decode_or_return(op, in);
+
+ if (op == CEPH_OSD_TMAP_HDR) {
+ decode_or_return(header, in);
+ continue;
+ }
+
+ std::string key;
+ decode_or_return(key, in);
+
+ switch (op) {
+ case CEPH_OSD_TMAP_SET: {
+ decode_or_return(keys[key], in);
+ break;
+ }
+ case CEPH_OSD_TMAP_CREATE: {
+ if (keys.contains(key)) {
+ return -EEXIST;
+ }
+ decode_or_return(keys[key], in);
+ break;
+ }
+ case CEPH_OSD_TMAP_RM: {
+ auto kiter = keys.find(key);
+ if (kiter == keys.end()) {
+ return -ENOENT;
+ }
+ keys.erase(kiter);
+ break;
+ }
+ case CEPH_OSD_TMAP_RMSLOPPY: {
+ keys.erase(key);
+ break;
+ }
+ }
+ }
+ return 0;
+ }
+
+ int put(bufferlist::const_iterator in) {
+ return 0;
+ }
+};
+
+}
+
+namespace crimson::common {
+
+using do_tmap_up_ret = tl::expected<bufferlist, int>;
+do_tmap_up_ret do_tmap_up(bufferlist::const_iterator in, bufferlist contents)
+{
+ detail::TMapContents tmap;
+ auto bliter = contents.cbegin();
+ int r = tmap.decode(bliter);
+ if (r < 0) {
+ return tl::unexpected(r);
+ }
+ r = tmap.update(in);
+ if (r < 0) {
+ return tl::unexpected(r);
+ }
+ return tmap.encode();
+}
+
+using do_tmap_up_ret = tl::expected<bufferlist, int>;
+do_tmap_up_ret do_tmap_put(bufferlist::const_iterator in)
+{
+ detail::TMapContents tmap;
+ int r = tmap.decode(in);
+ if (r < 0) {
+ return tl::unexpected(r);
+ }
+ return tmap.encode();
+}
+
+}
return do_const_op([this, &osd_op] (/* const */auto& backend, const auto& os) {
return backend.stat(os, osd_op, delta_stats);
});
+
+ case CEPH_OSD_OP_TMAPPUT:
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto& txn) {
+ return backend.tmapput(os, osd_op, txn, delta_stats, *osd_op_params);
+ });
case CEPH_OSD_OP_TMAPUP:
- // TODO: there was an effort to kill TMAP in ceph-osd. According to
- // @dzafman this isn't possible yet. Maybe it could be accomplished
- // before crimson's readiness and we'd luckily don't need to carry.
- logger().info("crimson explicitly does not support CEPH_OSD_OP_TMAPUP");
- return dont_do_legacy_op();
+ return do_write_op([this, &osd_op](auto& backend, auto& os, auto &txn) {
+ return backend.tmapup(os, osd_op, txn, delta_stats, *osd_op_params);
+ });
+ case CEPH_OSD_OP_TMAPGET:
+ return do_read_op([this, &osd_op](auto& backend, const auto& os) {
+ return backend.tmapget(os, osd_op, delta_stats);
+ });
// OMAP
case CEPH_OSD_OP_OMAPGETKEYS:
#include "common/Clock.h"
#include "crimson/common/exception.h"
+#include "crimson/common/tmap_helpers.h"
#include "crimson/os/futurized_collection.h"
#include "crimson/os/futurized_store.h"
#include "crimson/osd/osd_operation.h"
return stat_errorator::now();
}
+PGBackend::write_iertr::future<> PGBackend::_writefull(
+ ObjectState& os,
+ off_t truncate_size,
+ const bufferlist& bl,
+ ceph::os::Transaction& txn,
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats,
+ unsigned flags)
+{
+ const bool existing = maybe_create_new_object(os, txn, delta_stats);
+ if (existing && bl.length() < os.oi.size) {
+
+ txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, bl.length());
+ truncate_update_size_and_usage(delta_stats, os.oi, truncate_size);
+
+ osd_op_params.clean_regions.mark_data_region_dirty(
+ bl.length(),
+ os.oi.size - bl.length());
+ }
+ if (bl.length()) {
+ txn.write(
+ coll->get_cid(), ghobject_t{os.oi.soid}, 0, bl.length(),
+ bl, flags);
+ update_size_and_usage(
+ delta_stats, os.oi, 0,
+ bl.length(), true);
+ osd_op_params.clean_regions.mark_data_region_dirty(
+ 0,
+ std::max((uint64_t)bl.length(), os.oi.size));
+ }
+ return seastar::now();
+}
+
bool PGBackend::maybe_create_new_object(
ObjectState& os,
ceph::os::Transaction& txn,
return crimson::ct_error::file_too_large::make();
}
- const bool existing = maybe_create_new_object(os, txn, delta_stats);
- if (existing && op.extent.length < os.oi.size) {
- txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.length);
- truncate_update_size_and_usage(delta_stats, os.oi, op.extent.truncate_size);
- osd_op_params.clean_regions.mark_data_region_dirty(op.extent.length,
- os.oi.size - op.extent.length);
- }
- if (op.extent.length) {
- txn.write(coll->get_cid(), ghobject_t{os.oi.soid}, 0, op.extent.length,
- osd_op.indata, op.flags);
- update_size_and_usage(delta_stats, os.oi, 0,
- op.extent.length, true);
- osd_op_params.clean_regions.mark_data_region_dirty(0,
- std::max((uint64_t) op.extent.length, os.oi.size));
- }
- return seastar::now();
+ return _writefull(
+ os,
+ op.extent.truncate_size,
+ osd_op.indata,
+ txn,
+ osd_op_params,
+ delta_stats,
+ op.flags);
}
PGBackend::append_ierrorator::future<> PGBackend::append(
return store->fiemap(c, oid, off, len);
}
+PGBackend::write_iertr::future<> PGBackend::tmapput(
+ ObjectState& os,
+ const OSDOp& osd_op,
+ ceph::os::Transaction& txn,
+ object_stat_sum_t& delta_stats,
+ osd_op_params_t& osd_op_params)
+{
+ logger().debug("PGBackend::tmapput: {}", os.oi.soid);
+ auto ret = crimson::common::do_tmap_put(osd_op.indata.cbegin());
+ if (!ret.has_value()) {
+ logger().debug("PGBackend::tmapup: {}, ret={}", os.oi.soid, ret.error());
+ ceph_assert(ret.error() == -EINVAL);
+ return crimson::ct_error::invarg::make();
+ } else {
+ auto bl = std::move(ret.value());
+ return _writefull(
+ os,
+ bl.length(),
+ std::move(bl),
+ txn,
+ osd_op_params,
+ delta_stats,
+ 0);
+ }
+}
+
+PGBackend::tmapup_iertr::future<> PGBackend::tmapup(
+ ObjectState& os,
+ const OSDOp& osd_op,
+ ceph::os::Transaction& txn,
+ object_stat_sum_t& delta_stats,
+ osd_op_params_t& osd_op_params)
+{
+ logger().debug("PGBackend::tmapup: {}", os.oi.soid);
+ return PGBackend::write_iertr::now(
+ ).si_then([this, &os] {
+ return _read(os.oi.soid, 0, os.oi.size, 0);
+ }).handle_error_interruptible(
+ crimson::ct_error::enoent::handle([](auto &) {
+ return seastar::make_ready_future<bufferlist>();
+ }),
+ PGBackend::write_iertr::pass_further{},
+ crimson::ct_error::assert_all{"read error in mutate_object_contents"}
+ ).si_then([this, &os, &osd_op, &txn,
+ &delta_stats, &osd_op_params]
+ (auto &&bl) mutable -> PGBackend::tmapup_iertr::future<> {
+ auto result = crimson::common::do_tmap_up(
+ osd_op.indata.cbegin(),
+ std::move(bl));
+ if (!result.has_value()) {
+ int ret = result.error();
+ logger().debug("PGBackend::tmapup: {}, ret={}", os.oi.soid, ret);
+ switch (ret) {
+ case -EEXIST:
+ return crimson::ct_error::eexist::make();
+ case -ENOENT:
+ return crimson::ct_error::enoent::make();
+ case -EINVAL:
+ return crimson::ct_error::invarg::make();
+ default:
+ ceph_assert(0 == "impossible error");
+ return crimson::ct_error::invarg::make();
+ }
+ }
+
+ logger().debug(
+ "PGBackend::tmapup: {}, result.value.length()={}, ret=0",
+ os.oi.soid, result.value().length());
+ return _writefull(
+ os,
+ result.value().length(),
+ result.value(),
+ txn,
+ osd_op_params,
+ delta_stats,
+ 0);
+ });
+}
+
+PGBackend::read_ierrorator::future<> PGBackend::tmapget(
+ const ObjectState& os,
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats)
+{
+ logger().debug("PGBackend::tmapget: {}", os.oi.soid);
+ const auto& oi = os.oi;
+ logger().debug("PGBackend::tmapget: read {} 0~{}", oi.soid, oi.size);
+ if (!os.exists || os.oi.is_whiteout()) {
+ logger().debug("PGBackend::tmapget: {} DNE", os.oi.soid);
+ return crimson::ct_error::enoent::make();
+ }
+
+ return _read(oi.soid, 0, oi.size, 0).safe_then_interruptible_tuple(
+ [&delta_stats, &osd_op](auto&& bl) -> read_errorator::future<> {
+ logger().debug("PGBackend::tmapget: data length: {}", bl.length());
+ osd_op.op.extent.length = bl.length();
+ osd_op.rval = 0;
+ delta_stats.num_rd++;
+ delta_stats.num_rd_kb += shift_round_up(bl.length(), 10);
+ osd_op.outdata = std::move(bl);
+ return read_errorator::now();
+ }, crimson::ct_error::input_output_error::handle([] {
+ return read_errorator::future<>{crimson::ct_error::object_corrupted::make()};
+ }),
+ read_errorator::pass_further{});
+}
+
void PGBackend::on_activate_complete() {
peering.reset();
}
uint64_t off,
uint64_t len);
+ write_iertr::future<> tmapput(
+ ObjectState& os,
+ const OSDOp& osd_op,
+ ceph::os::Transaction& trans,
+ object_stat_sum_t& delta_stats,
+ osd_op_params_t& osd_op_params);
+
+ using tmapup_ertr = write_ertr::extend<
+ crimson::ct_error::enoent,
+ crimson::ct_error::eexist>;
+ using tmapup_iertr = ::crimson::interruptible::interruptible_errorator<
+ ::crimson::osd::IOInterruptCondition,
+ tmapup_ertr>;
+ tmapup_iertr::future<> tmapup(
+ ObjectState& os,
+ const OSDOp& osd_op,
+ ceph::os::Transaction& trans,
+ object_stat_sum_t& delta_stats,
+ osd_op_params_t& osd_op_params);
+
+ read_ierrorator::future<> tmapget(
+ const ObjectState& os,
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats);
+
// OMAP
ll_read_ierrorator::future<> omap_get_keys(
const ObjectState& os,
size_t offset,
size_t length,
uint32_t flags) = 0;
+ write_iertr::future<> _writefull(
+ ObjectState& os,
+ off_t truncate_size,
+ const bufferlist& bl,
+ ceph::os::Transaction& txn,
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats,
+ unsigned flags);
bool maybe_create_new_object(ObjectState& os,
ceph::os::Transaction& txn,