class MOSDECSubOpRead : public MOSDFastDispatchOp {
private:
- static constexpr int HEAD_VERSION = 3;
+ static constexpr int HEAD_VERSION = 4;
static constexpr int COMPAT_VERSION = 1;
public:
spg_t pgid;
epoch_t map_epoch = 0, min_epoch = 0;
ECSubRead op;
+ uint64_t cost = 0;
+ /**
+ * Calculate the cost of the SubOp read operation for mClock scheduler.
+ *
+ * @param *cct: *CephContext
+ * @param pair<int, int>&: subchunk_count and subchunk_size
+ */
+ void compute_cost(CephContext *cct, std::pair<int, int> &subchunk_info) {
+ cost = op.cost(cct, subchunk_info);
+ }
int get_cost() const override {
- return 0;
+ return cost;
}
epoch_t get_map_epoch() const override {
return map_epoch;
} else {
min_epoch = map_epoch;
}
+ if (header.version >= 4) {
+ decode(cost, p);
+ }
}
void encode_payload(uint64_t features) override {
encode(op, payload, features);
encode(min_epoch, payload);
encode_trace(payload, features);
+ if (header.version >= 4) {
+ encode(cost, payload);
+ }
}
std::string_view get_type_name() const override { return "MOSDECSubOpRead"; }
ceph_assert(!op.recovery_progress.data_complete);
set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
uint64_t from = op.recovery_progress.data_recovered_to;
+ /* When beginning recovery, the OI may not be known. As such the object
+ * size is not known. For the first read, attempt to read the default
+ * size. If this is larger than the object sizes, then the OSD will
+ * return truncated reads. If the object size is known, then attempt
+ * correctly sized reads, capped at recovery chunk size.
+ * (Ref: ECCommon.cc -> ECCommon::RecoveryBackend::continue_recovery_op())
+ */
uint64_t amount = get_recovery_chunk_size();
+ if (op.obc) {
+ uint64_t remaining = op.obc->obs.oi.size - from;
+ amount = std::min(remaining, amount);
+ }
if (op.recovery_progress.first && op.obc) {
if (auto [r, attrs, size] = ecbackend->get_attrs_n_size_from_disk(op.hoid);
std::optional<ECSubRead> local_read_op;
std::vector<std::pair<int, Message*>> m;
m.reserve(messages.size());
+ std::pair<int, int> subchunk_info =
+ std::make_pair(ec_impl->get_sub_chunk_count(),
+ sinfo.get_chunk_size() / ec_impl->get_sub_chunk_count());
for (auto &&[pg_shard, read]: messages) {
rop.in_progress.insert(pg_shard);
shard_to_read_map[pg_shard].insert(rop.tid);
msg->trace.init("ec sub read", nullptr, &rop.trace);
msg->trace.keyval("shard", pg_shard.shard.id);
}
+ msg->compute_cost(cct, subchunk_info);
m.push_back(std::make_pair(pg_shard.osd, msg));
dout(10) << __func__ << ": will send msg " << *msg
<< " to osd." << pg_shard.osd << dendl;
std::vector<std::pair<int, Message*>> m;
m.reserve(messages.size());
+ std::pair<int, int> subchunk_info =
+ std::make_pair(ec_impl->get_sub_chunk_count(),
+ sinfo.get_chunk_size() / ec_impl->get_sub_chunk_count());
for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
i != messages.end();
++i) {
msg->trace.init("ec sub read", nullptr, &op.trace);
msg->trace.keyval("shard", i->first.shard.id);
}
+ msg->compute_cost(cct, subchunk_info);
m.push_back(std::make_pair(i->first.osd, msg));
}
if (!m.empty()) {
}
}
-END_IGNORE_DEPRECATED
\ No newline at end of file
+END_IGNORE_DEPRECATED
#include "ECMsgTypes.h"
+#include "common/ceph_context.h"
+
using std::list;
using std::make_pair;
using std::map;
DECODE_FINISH(bl);
}
+/**
+ * Calculate the cost of the SubOp read operation for mClock scheduler.
+ * Cost is calculated based on whether the complete chunk/shard
+ * or a subchunk needs to be read:
+ * Case 1. Read the complete chunk aligned length:
+ * - Cost is set to the length of the chunk aligned extent size.
+ * Case 2. Fragmented reads:
+ * - Cost is set by considering the subchunk length and count.
+ *
+ * Note: To retain the legacy behavior, a cost of '0' is returned as
+ * before for WeightedPriorityQueue scheduler.
+ */
+uint64_t ECSubRead::cost(CephContext *cct, std::pair<int, int>& subchunk_info)
+{
+ uint64_t total_cost = 0;
+ /**
+ * While the cost is calculated by the primary shard with mClock
+ * scheduler being active, the replica shard could still be
+ * running on the legacy WPQ scheduler. In such a case, the
+ * replica shard's decoding logic would set the cost to 0, which
+ * is consistent with what the legacy WPQ scheduler expects.
+ *
+ * In the converse case, the primary shard running with the
+ * legacy WPQ scheduler sends a cost of 0. The replica shard
+ * running with mClock scheduler will interpret this and set
+ * the cost to 1 accordingly.
+ */
+ if (cct->_conf->osd_op_queue != "mclock_scheduler") {
+ return total_cost; // Legacy behavior for WPQ scheduler
+ }
+
+ uint64_t subchunk_size = subchunk_info.second;
+
+ for (auto &&[hoid, tl] : to_read) {
+ auto it = subchunks.find(hoid);
+ if (it == subchunks.end()) continue;
+
+ auto &sc = it->second;
+ if (sc.empty()) continue;
+
+ // Case 1: Optimized / Complete chunk aligned read
+ if (sc.size() == 1 && sc.front().second == subchunk_info.first) {
+ for ([[maybe_unused]] auto &&[offset, len, flags] : tl) {
+ total_cost += len;
+ }
+ continue;
+ }
+
+ // Case 2: Fragmented / Subchunk Reads
+ uint64_t fragmented_shard_bytes = 0;
+ for (auto &&k : sc) {
+ fragmented_shard_bytes += (uint64_t)k.second * subchunk_size;
+ }
+ total_cost += fragmented_shard_bytes * tl.size();
+ }
+
+ // Safety Boundary: mClock requires non-zero costs for tracking active ops
+ return std::max<uint64_t>(total_cost, 1ULL);
+}
+
std::ostream &operator<<(
std::ostream &lhs, const ECSubRead &rhs)
{
std::map<hobject_t, std::vector<std::pair<int, int>>> subchunks;
std::set<hobject_t> omap_headers_to_read;
std::map<hobject_t, std::pair<std::string, uint64_t>> omap_read_from;
+ /**
+ * Calculate the cost of the SubOp read operation for mClock scheduler.
+ *
+ * @param *cct: *CephContext
+ * @param pair<int, int>&: subchunk_count and subchunk_size
+ * @return uint64_t - Cost of EC SubOpRead (size in Bytes)
+ */
+ uint64_t cost(CephContext *cct, std::pair<int, int>& subchunk_info);
void encode(ceph::buffer::list &bl, uint64_t features) const;
void decode(ceph::buffer::list::const_iterator &bl);
void dump(ceph::Formatter *f) const;