case CEPH_OSD_OP_SYNC_READ:
[[fallthrough]];
case CEPH_OSD_OP_READ:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.read(os, osd_op);
+ return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return backend.read(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_SPARSE_READ:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.sparse_read(os, osd_op);
+ return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return backend.sparse_read(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_CHECKSUM:
return do_read_op([&osd_op] (auto& backend, const auto& os) {
return backend.cmp_ext(os, osd_op);
});
case CEPH_OSD_OP_GETXATTR:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.getxattr(os, osd_op);
+ return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return backend.getxattr(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_GETXATTRS:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.get_xattrs(os, osd_op);
+ return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return backend.get_xattrs(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_CMPXATTR:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.cmp_xattr(os, osd_op);
+ return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return backend.cmp_xattr(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_RMXATTR:
return do_write_op(
return backend.rm_xattr(os, osd_op, txn);
}, true);
case CEPH_OSD_OP_CREATE:
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.create(os, osd_op, txn);
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.create(os, osd_op, txn, delta_stats);
}, true);
case CEPH_OSD_OP_WRITE:
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.write(os, osd_op, txn, *osd_op_params);
+ return backend.write(os, osd_op, txn, *osd_op_params, delta_stats);
}, true);
case CEPH_OSD_OP_WRITESAME:
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.write_same(os, osd_op, txn, *osd_op_params);
+ return backend.write_same(os, osd_op, txn, *osd_op_params, delta_stats);
}, true);
case CEPH_OSD_OP_WRITEFULL:
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.writefull(os, osd_op, txn, *osd_op_params);
+ return backend.writefull(os, osd_op, txn, *osd_op_params, delta_stats);
}, true);
case CEPH_OSD_OP_APPEND:
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.append(os, osd_op, txn, *osd_op_params);
+ return backend.append(os, osd_op, txn, *osd_op_params, delta_stats);
}, true);
case CEPH_OSD_OP_TRUNCATE:
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
// FIXME: rework needed. Move this out to do_write_op(), introduce
// do_write_op_no_user_modify()...
- return backend.truncate(os, osd_op, txn, *osd_op_params);
+ return backend.truncate(os, osd_op, txn, *osd_op_params, delta_stats);
}, true);
case CEPH_OSD_OP_ZERO:
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.zero(os, osd_op, txn, *osd_op_params);
+ return backend.zero(os, osd_op, txn, *osd_op_params, delta_stats);
}, true);
case CEPH_OSD_OP_SETALLOCHINT:
return osd_op_errorator::now();
case CEPH_OSD_OP_SETXATTR:
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.setxattr(os, osd_op, txn);
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.setxattr(os, osd_op, txn, delta_stats);
}, true);
case CEPH_OSD_OP_DELETE:
- return do_write_op([] (auto& backend, auto& os, auto& txn) {
- return backend.remove(os, txn);
+ return do_write_op([this] (auto& backend, auto& os, auto& txn) {
+ return backend.remove(os, txn, delta_stats);
}, true);
case CEPH_OSD_OP_CALL:
return this->do_op_call(osd_op);
case CEPH_OSD_OP_STAT:
// note: stat does not require RD
- return do_const_op([&osd_op] (/* const */auto& backend, const auto& os) {
- return backend.stat(os, osd_op);
+ return do_const_op([this, &osd_op] (/* const */auto& backend, const auto& os) {
+ return backend.stat(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_TMAPUP:
// TODO: there was an effort to kill TMAP in ceph-osd. According to
// OMAP
case CEPH_OSD_OP_OMAPGETKEYS:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.omap_get_keys(os, osd_op);
+ return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_keys(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_OMAPGETVALS:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.omap_get_vals(os, osd_op);
+ return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_vals(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_OMAPGETHEADER:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.omap_get_header(os, osd_op);
+ return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_header(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
- return do_read_op([&osd_op] (auto& backend, const auto& os) {
- return backend.omap_get_vals_by_keys(os, osd_op);
+ return do_read_op([this, &osd_op] (auto& backend, const auto& os) {
+ return backend.omap_get_vals_by_keys(os, osd_op, delta_stats);
});
case CEPH_OSD_OP_OMAPSETVALS:
#if 0
}
#endif
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.omap_set_vals(os, osd_op, txn, *osd_op_params);
+ return backend.omap_set_vals(os, osd_op, txn, *osd_op_params, delta_stats);
}, true);
case CEPH_OSD_OP_OMAPSETHEADER:
#if 0
return crimson::ct_error::operation_not_supported::make();
}
#endif
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.omap_set_header(os, osd_op, txn);
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.omap_set_header(os, osd_op, txn, *osd_op_params,
+ delta_stats);
}, true);
case CEPH_OSD_OP_OMAPRMKEYRANGE:
#if 0
return crimson::ct_error::operation_not_supported::make();
}
#endif
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.omap_remove_range(os, osd_op, txn);
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return backend.omap_remove_range(os, osd_op, txn, delta_stats);
}, true);
case CEPH_OSD_OP_OMAPCLEAR:
return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
- return backend.omap_clear(os, osd_op, txn, *osd_op_params);
+ return backend.omap_clear(os, osd_op, txn, *osd_op_params, delta_stats);
}, true);
// watch/notify
}
PGBackend::read_ierrorator::future<>
-PGBackend::read(const ObjectState& os, OSDOp& osd_op)
+PGBackend::read(const ObjectState& os, OSDOp& osd_op,
+ object_stat_sum_t& delta_stats)
{
const auto& oi = os.oi;
const ceph_osd_op& op = osd_op.op;
length = size;
}
return _read(oi.soid, offset, length, op.flags).safe_then_interruptible_tuple(
- [&oi, &osd_op](auto&& bl) -> read_errorator::future<> {
+ [&delta_stats, &oi, &osd_op](auto&& bl) -> read_errorator::future<> {
if (!_read_verify_data(oi, bl)) {
// crc mismatches
return crimson::ct_error::object_corrupted::make();
}
logger().debug("read: data length: {}", bl.length());
osd_op.rval = bl.length();
+ delta_stats.num_rd++;
+ delta_stats.num_rd_kb += shift_round_up(bl.length(), 10);
osd_op.outdata = std::move(bl);
return read_errorator::now();
}, crimson::ct_error::input_output_error::handle([] {
}
PGBackend::read_ierrorator::future<>
-PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op)
+PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op,
+ object_stat_sum_t& delta_stats)
{
const auto& op = osd_op.op;
logger().trace("sparse_read: {} {}~{}",
return interruptor::make_interruptible(store->fiemap(coll, ghobject_t{os.oi.soid},
op.extent.offset,
op.extent.length)).then_interruptible(
- [&os, &osd_op, this](auto&& m) {
+ [&delta_stats, &os, &osd_op, this](auto&& m) {
return seastar::do_with(interval_set<uint64_t>{std::move(m)},
- [&os, &osd_op, this](auto&& extents) {
+ [&delta_stats, &os, &osd_op, this](auto&& extents) {
return interruptor::make_interruptible(store->readv(coll, ghobject_t{os.oi.soid},
extents, osd_op.op.flags)).safe_then_interruptible_tuple(
- [&os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> {
+ [&delta_stats, &os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> {
if (_read_verify_data(os.oi, bl)) {
osd_op.op.extent.length = bl.length();
// re-encode since it might be modified
encode_destructively(bl, osd_op.outdata);
logger().trace("sparse_read got {} bytes from object {}",
osd_op.op.extent.length, os.oi.soid);
+ delta_stats.num_rd++;
+ delta_stats.num_rd_kb += shift_round_up(osd_op.op.extent.length, 10);
return read_errorator::make_ready_future<>();
} else {
// crc mismatches
PGBackend::stat_ierrorator::future<>
PGBackend::stat(
const ObjectState& os,
- OSDOp& osd_op)
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats)
{
if (os.exists/* TODO: && !os.is_whiteout() */) {
logger().debug("stat os.oi.size={}, os.oi.mtime={}", os.oi.size, os.oi.mtime);
logger().debug("stat object does not exist");
return crimson::ct_error::enoent::make();
}
+ delta_stats.num_rd++;
return stat_errorator::now();
- // TODO: ctx->delta_stats.num_rd++;
}
bool PGBackend::maybe_create_new_object(
ObjectState& os,
- ceph::os::Transaction& txn)
+ ceph::os::Transaction& txn,
+ object_stat_sum_t& delta_stats)
{
if (!os.exists) {
ceph_assert(!os.oi.is_whiteout());
os.oi.new_object();
txn.touch(coll->get_cid(), ghobject_t{os.oi.soid});
- // TODO: delta_stats.num_objects++
+ delta_stats.num_objects++;
return false;
} else if (os.oi.is_whiteout()) {
os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
- // TODO: delta_stats.num_whiteouts--
+ delta_stats.num_whiteouts--;
}
return true;
}
+void PGBackend::update_size_and_usage(object_stat_sum_t& delta_stats,
+ object_info_t& oi, uint64_t offset,
+ uint64_t length, bool write_full)
+{
+ if (write_full ||
+ (offset + length > oi.size && length)) {
+ uint64_t new_size = offset + length;
+ delta_stats.num_bytes -= oi.size;
+ delta_stats.num_bytes += new_size;
+ oi.size = new_size;
+ }
+ delta_stats.num_wr++;
+ delta_stats.num_wr_kb += shift_round_up(length, 10);
+}
+
+void PGBackend::truncate_update_size_and_usage(object_stat_sum_t& delta_stats,
+ object_info_t& oi,
+ uint64_t truncate_size)
+{
+ if (oi.size != truncate_size) {
+ delta_stats.num_bytes -= oi.size;
+ delta_stats.num_bytes += truncate_size;
+ oi.size = truncate_size;
+ }
+}
+
static bool is_offset_and_length_valid(
const std::uint64_t offset,
const std::uint64_t length)
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_params)
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats)
{
const ceph_osd_op& op = osd_op.op;
uint64_t offset = op.extent.offset;
ghobject_t{os.oi.soid}, op.extent.truncate_size);
if (op.extent.truncate_size != os.oi.size) {
os.oi.size = length;
- // TODO: truncate_update_size_and_usage()
if (op.extent.truncate_size > os.oi.size) {
osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size,
op.extent.truncate_size - os.oi.size);
os.oi.size - op.extent.truncate_size);
}
}
+ truncate_update_size_and_usage(delta_stats, os.oi, op.extent.truncate_size);
}
os.oi.truncate_seq = op.extent.truncate_seq;
os.oi.truncate_size = op.extent.truncate_size;
}
- maybe_create_new_object(os, txn);
+ maybe_create_new_object(os, txn, delta_stats);
if (length == 0) {
if (offset > os.oi.size) {
txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.offset);
+ truncate_update_size_and_usage(delta_stats, os.oi, op.extent.offset);
} else {
txn.nop();
}
} else {
txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
offset, length, std::move(buf), op.flags);
- os.oi.size = std::max(offset + length, os.oi.size);
+ update_size_and_usage(delta_stats, os.oi, offset, length);
}
osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset,
op.extent.length);
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_params)
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats)
{
const ceph_osd_op& op = osd_op.op;
const uint64_t len = op.writesame.length;
for (uint64_t size = 0; size < len; size += op.writesame.data_length) {
repeated_indata.append(osd_op.indata);
}
- maybe_create_new_object(os, txn);
+ maybe_create_new_object(os, txn, delta_stats);
txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
op.writesame.offset, len,
std::move(repeated_indata), op.flags);
- os.oi.size = len;
+ update_size_and_usage(delta_stats, os.oi, op.writesame.offset, len);
osd_op_params.clean_regions.mark_data_region_dirty(op.writesame.offset, len);
return seastar::now();
}
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_params)
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats)
{
const ceph_osd_op& op = osd_op.op;
if (op.extent.length != osd_op.indata.length()) {
throw crimson::osd::invalid_argument();
}
- const bool existing = maybe_create_new_object(os, txn);
+ const bool existing = maybe_create_new_object(os, txn, delta_stats);
if (existing && op.extent.length < os.oi.size) {
txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.length);
+ truncate_update_size_and_usage(delta_stats, os.oi, op.extent.truncate_size);
osd_op_params.clean_regions.mark_data_region_dirty(op.extent.length,
os.oi.size - op.extent.length);
}
if (op.extent.length) {
txn.write(coll->get_cid(), ghobject_t{os.oi.soid}, 0, op.extent.length,
osd_op.indata, op.flags);
- os.oi.size = op.extent.length;
+ update_size_and_usage(delta_stats, os.oi, 0,
+ op.extent.length, true);
osd_op_params.clean_regions.mark_data_region_dirty(0,
std::max((uint64_t) op.extent.length, os.oi.size));
}
ObjectState& os,
OSDOp& osd_op,
ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_params)
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats)
{
const ceph_osd_op& op = osd_op.op;
if (op.extent.length != osd_op.indata.length()) {
return crimson::ct_error::invarg::make();
}
- maybe_create_new_object(os, txn);
+ maybe_create_new_object(os, txn, delta_stats);
if (op.extent.length) {
txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
os.oi.size /* offset */, op.extent.length,
std::move(osd_op.indata), op.flags);
- os.oi.size += op.extent.length;
+ update_size_and_usage(delta_stats, os.oi, os.oi.size,
+ op.extent.length);
osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size,
op.extent.length);
}
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_params)
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats)
{
if (!os.exists || os.oi.is_whiteout()) {
logger().debug("{} object dne, truncate is a no-op", __func__);
os.oi.truncate_size = op.extent.truncate_size;
}
}
- maybe_create_new_object(os, txn);
+ maybe_create_new_object(os, txn, delta_stats);
if (os.oi.size != op.extent.offset) {
txn.truncate(coll->get_cid(),
ghobject_t{os.oi.soid}, op.extent.offset);
os.oi.size,
op.extent.offset - os.oi.size);
}
- os.oi.size = op.extent.offset;
+ truncate_update_size_and_usage(delta_stats, os.oi, op.extent.offset);
os.oi.clear_data_digest();
}
- // TODO: truncate_update_size_and_usage()
- // TODO: ctx->delta_stats.num_wr++;
+ delta_stats.num_wr++;
// ----
// do no set exists, or we will break above DELETE -> TRUNCATE munging.
return write_ertr::now();
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_params)
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats)
{
if (!os.exists || os.oi.is_whiteout()) {
logger().debug("{} object dne, zero is a no-op", __func__);
// TODO: modified_ranges.union_of(zeroed);
osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset,
op.extent.length);
- // TODO: ctx->delta_stats.num_wr++;
+ delta_stats.num_wr++;
os.oi.clear_data_digest();
return write_ertr::now();
}
PGBackend::interruptible_future<> PGBackend::create(
ObjectState& os,
const OSDOp& osd_op,
- ceph::os::Transaction& txn)
+ ceph::os::Transaction& txn,
+ object_stat_sum_t& delta_stats)
{
if (os.exists && !os.oi.is_whiteout() &&
(osd_op.op.flags & CEPH_OSD_OP_FLAG_EXCL)) {
throw crimson::osd::invalid_argument();
}
}
- maybe_create_new_object(os, txn);
+ maybe_create_new_object(os, txn, delta_stats);
txn.nop();
return seastar::now();
}
return seastar::now();
}
+PGBackend::interruptible_future<>
+PGBackend::remove(ObjectState& os, ceph::os::Transaction& txn,
+ object_stat_sum_t& delta_stats)
+{
+ // todo: snapset
+ txn.remove(coll->get_cid(),
+ ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
+ delta_stats.num_bytes -= os.oi.size;
+ os.oi.size = 0;
+ os.oi.new_object();
+ os.exists = false;
+ // todo: update watchers
+ if (os.oi.is_whiteout()) {
+ os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
+ delta_stats.num_whiteouts--;
+ }
+ delta_stats.num_objects--;
+ return seastar::now();
+}
+
PGBackend::interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>>
PGBackend::list_objects(const hobject_t& start, uint64_t limit) const
{
PGBackend::interruptible_future<> PGBackend::setxattr(
ObjectState& os,
const OSDOp& osd_op,
- ceph::os::Transaction& txn)
+ ceph::os::Transaction& txn,
+ object_stat_sum_t& delta_stats)
{
if (local_conf()->osd_max_attr_size > 0 &&
osd_op.op.xattr.value_len > local_conf()->osd_max_attr_size) {
throw crimson::osd::make_error(-ENAMETOOLONG);
}
- maybe_create_new_object(os, txn);
+ maybe_create_new_object(os, txn, delta_stats);
std::string name{"_"};
ceph::bufferlist val;
bp.copy(osd_op.op.xattr.value_len, val);
}
logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name);
-
txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val);
+ delta_stats.num_wr++;
return seastar::now();
- //ctx->delta_stats.num_wr++;
}
PGBackend::get_attr_ierrorator::future<> PGBackend::getxattr(
const ObjectState& os,
- OSDOp& osd_op) const
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const
{
std::string name;
ceph::bufferlist val;
}
logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name);
return getxattr(os.oi.soid, name).safe_then_interruptible(
- [&osd_op] (ceph::bufferlist&& val) {
+ [&delta_stats, &osd_op] (ceph::bufferlist&& val) {
osd_op.outdata = std::move(val);
osd_op.op.xattr.value_len = osd_op.outdata.length();
+ delta_stats.num_rd++;
+ delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
return get_attr_errorator::now();
});
}
PGBackend::get_attr_ierrorator::future<> PGBackend::get_xattrs(
const ObjectState& os,
- OSDOp& osd_op) const
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
}
return store->get_attrs(coll, ghobject_t{os.oi.soid}).safe_then(
- [&osd_op](auto&& attrs) {
+ [&delta_stats, &osd_op](auto&& attrs) {
std::vector<std::pair<std::string, bufferlist>> user_xattrs;
+ ceph::bufferlist bl;
for (auto& [key, val] : attrs) {
if (key.size() > 1 && key[0] == '_') {
- ceph::bufferlist bl;
bl.append(std::move(val));
user_xattrs.emplace_back(key.substr(1), std::move(bl));
}
}
ceph::encode(user_xattrs, osd_op.outdata);
+ delta_stats.num_rd++;
+ delta_stats.num_rd_kb += shift_round_up(bl.length(), 10);
return get_attr_errorator::now();
});
}
PGBackend::cmp_xattr_ierrorator::future<> PGBackend::cmp_xattr(
const ObjectState& os,
- OSDOp& osd_op) const
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const
{
std::string name{"_"};
auto bp = osd_op.indata.cbegin();
logger().debug("cmpxattr on obj={} for attr={}", os.oi.soid, name);
return getxattr(os.oi.soid, name).safe_then_interruptible(
- [&osd_op] (auto &&xattr) {
+ [&delta_stats, &osd_op] (auto &&xattr) {
int result = 0;
auto bp = osd_op.indata.cbegin();
bp += osd_op.op.xattr.name_len;
} else {
osd_op.rval = result;
}
+ delta_stats.num_rd++;
+ delta_stats.num_rd_kb += shift_round_up(osd_op.op.xattr.value_len, 10);
});
}
PGBackend::ll_read_ierrorator::future<>
PGBackend::omap_get_header(
const ObjectState& os,
- OSDOp& osd_op) const
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const
{
return omap_get_header(coll, ghobject_t{os.oi.soid}).safe_then_interruptible(
- [&osd_op] (ceph::bufferlist&& header) {
+ [&delta_stats, &osd_op] (ceph::bufferlist&& header) {
osd_op.outdata = std::move(header);
+ delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
+ delta_stats.num_rd++;
return seastar::now();
});
}
PGBackend::ll_read_ierrorator::future<>
PGBackend::omap_get_keys(
const ObjectState& os,
- OSDOp& osd_op) const
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
max_return =
std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
+
// TODO: truly chunk the reading
return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then_interruptible(
- [=, &osd_op](auto ret) {
+ [=,&delta_stats, &osd_op](auto ret) {
ceph::bufferlist result;
bool truncated = false;
uint32_t num = 0;
encode(num, osd_op.outdata);
osd_op.outdata.claim_append(result);
encode(truncated, osd_op.outdata);
+ delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
+ delta_stats.num_rd++;
return seastar::now();
}).handle_error_interruptible(
crimson::ct_error::enodata::handle([&osd_op] {
}),
ll_read_errorator::pass_further{}
);
- // TODO:
- //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
- //ctx->delta_stats.num_rd++;
}
PGBackend::ll_read_ierrorator::future<>
PGBackend::omap_get_vals(
const ObjectState& os,
- OSDOp& osd_op) const
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
max_return = \
std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
+ delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
+ delta_stats.num_rd++;
// TODO: truly chunk the reading
return maybe_get_omap_vals(store, coll, os.oi, start_after)
}),
ll_read_errorator::pass_further{}
);
-
- // TODO:
- //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
- //ctx->delta_stats.num_rd++;
}
PGBackend::ll_read_ierrorator::future<>
PGBackend::omap_get_vals_by_keys(
const ObjectState& os,
- OSDOp& osd_op) const
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
} catch (buffer::error&) {
throw crimson::osd::invalid_argument();
}
+ delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
+ delta_stats.num_rd++;
return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get)
.safe_then_interruptible(
[&osd_op] (crimson::os::FuturizedStore::omap_values_t&& vals) {
}),
ll_read_errorator::pass_further{}
);
-
- // TODO:
- //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
- //ctx->delta_stats.num_rd++;
}
PGBackend::interruptible_future<>
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_params)
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats)
{
- maybe_create_new_object(os, txn);
+ maybe_create_new_object(os, txn, delta_stats);
ceph::bufferlist to_set_bl;
try {
}
txn.omap_setkeys(coll->get_cid(), ghobject_t{os.oi.soid}, to_set_bl);
-
- // TODO:
- //ctx->clean_regions.mark_omap_dirty();
-
- // TODO:
- //ctx->delta_stats.num_wr++;
- //ctx->delta_stats.num_wr_kb += shift_round_up(to_set_bl.length(), 10);
+ osd_op_params.clean_regions.mark_omap_dirty();
+ delta_stats.num_wr++;
+ delta_stats.num_wr_kb += shift_round_up(to_set_bl.length(), 10);
os.oi.set_flag(object_info_t::FLAG_OMAP);
os.oi.clear_omap_digest();
- osd_op_params.clean_regions.mark_omap_dirty();
return seastar::now();
}
PGBackend::omap_set_header(
ObjectState& os,
const OSDOp& osd_op,
- ceph::os::Transaction& txn)
+ ceph::os::Transaction& txn,
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats)
{
- maybe_create_new_object(os, txn);
+ maybe_create_new_object(os, txn, delta_stats);
txn.omap_setheader(coll->get_cid(), ghobject_t{os.oi.soid}, osd_op.indata);
- //TODO:
- //ctx->clean_regions.mark_omap_dirty();
- //ctx->delta_stats.num_wr++;
+ osd_op_params.clean_regions.mark_omap_dirty();
+ delta_stats.num_wr++;
os.oi.set_flag(object_info_t::FLAG_OMAP);
os.oi.clear_omap_digest();
return seastar::now();
PGBackend::interruptible_future<> PGBackend::omap_remove_range(
ObjectState& os,
const OSDOp& osd_op,
- ceph::os::Transaction& txn)
+ ceph::os::Transaction& txn,
+ object_stat_sum_t& delta_stats)
{
std::string key_begin, key_end;
try {
throw crimson::osd::invalid_argument{};
}
txn.omap_rmkeyrange(coll->get_cid(), ghobject_t{os.oi.soid}, key_begin, key_end);
- //TODO:
- //ctx->delta_stats.num_wr++;
+ delta_stats.num_wr++;
os.oi.clear_omap_digest();
return seastar::now();
}
ObjectState& os,
OSDOp& osd_op,
ceph::os::Transaction& txn,
- osd_op_params_t& osd_op_params)
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats)
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
}
txn.omap_clear(coll->get_cid(), ghobject_t{os.oi.soid});
osd_op_params.clean_regions.mark_omap_dirty();
+ delta_stats.num_wr++;
os.oi.clear_omap_digest();
os.oi.clear_flag(object_info_t::FLAG_OMAP);
return omap_clear_ertr::now();
read_errorator>;
read_ierrorator::future<> read(
const ObjectState& os,
- OSDOp& osd_op);
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats);
read_ierrorator::future<> sparse_read(
const ObjectState& os,
- OSDOp& osd_op);
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats);
using checksum_errorator = ll_read_errorator::extend<
crimson::ct_error::object_corrupted,
crimson::ct_error::invarg>;
stat_errorator>;
stat_ierrorator::future<> stat(
const ObjectState& os,
- OSDOp& osd_op);
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats);
// TODO: switch the entire write family to errorator.
using write_ertr = crimson::errorator<
interruptible_future<> create(
ObjectState& os,
const OSDOp& osd_op,
- ceph::os::Transaction& trans);
+ ceph::os::Transaction& trans,
+ object_stat_sum_t& delta_stats);
+ interruptible_future<> remove(
+ ObjectState& os,
+ ceph::os::Transaction& txn,
+ object_stat_sum_t& delta_stats);
interruptible_future<> remove(
ObjectState& os,
ceph::os::Transaction& txn);
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
- osd_op_params_t& osd_op_params);
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats);
interruptible_future<> write_same(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
- osd_op_params_t& osd_op_params);
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats);
interruptible_future<> writefull(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
- osd_op_params_t& osd_op_params);
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats);
using append_errorator = crimson::errorator<
crimson::ct_error::invarg>;
using append_ierrorator =
ObjectState& os,
OSDOp& osd_op,
ceph::os::Transaction& trans,
- osd_op_params_t& osd_op_params);
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats);
write_iertr::future<> truncate(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
- osd_op_params_t& osd_op_params);
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats);
write_iertr::future<> zero(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
- osd_op_params_t& osd_op_params);
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats);
rep_op_fut_t mutate_object(
std::set<pg_shard_t> pg_shards,
crimson::osd::ObjectContextRef &&obc,
interruptible_future<> setxattr(
ObjectState& os,
const OSDOp& osd_op,
- ceph::os::Transaction& trans);
+ ceph::os::Transaction& trans,
+ object_stat_sum_t& delta_stats);
using get_attr_errorator = crimson::os::FuturizedStore::get_attr_errorator;
using get_attr_ierrorator =
::crimson::interruptible::interruptible_errorator<
get_attr_errorator>;
get_attr_ierrorator::future<> getxattr(
const ObjectState& os,
- OSDOp& osd_op) const;
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const;
get_attr_ierrorator::future<ceph::bufferlist> getxattr(
const hobject_t& soid,
std::string_view key) const;
get_attr_ierrorator::future<> get_xattrs(
const ObjectState& os,
- OSDOp& osd_op) const;
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const;
using cmp_xattr_errorator = ::crimson::os::FuturizedStore::get_attr_errorator;
using cmp_xattr_ierrorator =
::crimson::interruptible::interruptible_errorator<
cmp_xattr_errorator>;
cmp_xattr_ierrorator::future<> cmp_xattr(
const ObjectState& os,
- OSDOp& osd_op) const;
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const;
using rm_xattr_ertr = crimson::errorator<crimson::ct_error::enoent>;
using rm_xattr_iertr =
::crimson::interruptible::interruptible_errorator<
// OMAP
ll_read_ierrorator::future<> omap_get_keys(
const ObjectState& os,
- OSDOp& osd_op) const;
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const;
ll_read_ierrorator::future<> omap_get_vals(
const ObjectState& os,
- OSDOp& osd_op) const;
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const;
ll_read_ierrorator::future<> omap_get_vals_by_keys(
const ObjectState& os,
- OSDOp& osd_op) const;
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const;
interruptible_future<> omap_set_vals(
ObjectState& os,
const OSDOp& osd_op,
ceph::os::Transaction& trans,
- osd_op_params_t& osd_op_params);
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats);
ll_read_ierrorator::future<ceph::bufferlist> omap_get_header(
const crimson::os::CollectionRef& c,
const ghobject_t& oid) const;
ll_read_ierrorator::future<> omap_get_header(
const ObjectState& os,
- OSDOp& osd_op) const;
+ OSDOp& osd_op,
+ object_stat_sum_t& delta_stats) const;
interruptible_future<> omap_set_header(
ObjectState& os,
const OSDOp& osd_op,
- ceph::os::Transaction& trans);
+ ceph::os::Transaction& trans,
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats);
interruptible_future<> omap_remove_range(
ObjectState& os,
const OSDOp& osd_op,
- ceph::os::Transaction& trans);
+ ceph::os::Transaction& trans,
+ object_stat_sum_t& delta_stats);
using omap_clear_ertr = crimson::errorator<crimson::ct_error::enoent>;
using omap_clear_iertr =
::crimson::interruptible::interruptible_errorator<
ObjectState& os,
OSDOp& osd_op,
ceph::os::Transaction& trans,
- osd_op_params_t& osd_op_params);
+ osd_op_params_t& osd_op_params,
+ object_stat_sum_t& delta_stats);
virtual void got_rep_op_reply(const MOSDRepOpReply&) {}
virtual seastar::future<> stop() = 0;
size_t length,
uint32_t flags) = 0;
- bool maybe_create_new_object(ObjectState& os, ceph::os::Transaction& txn);
+ bool maybe_create_new_object(ObjectState& os,
+ ceph::os::Transaction& txn,
+ object_stat_sum_t& delta_stats);
+ void update_size_and_usage(object_stat_sum_t& delta_stats,
+ object_info_t& oi, uint64_t offset,
+ uint64_t length, bool write_full = false);
+ void truncate_update_size_and_usage(
+ object_stat_sum_t& delta_stats,
+ object_info_t& oi,
+ uint64_t truncate_size);
virtual rep_op_fut_t
_submit_transaction(std::set<pg_shard_t>&& pg_shards,
const hobject_t& hoid,