client's status.
Signed-off-by: Byungsu Park <bspark8@sk.com>
using priority_t = unsigned;
using cost_t = unsigned;
+ using Retn = std::pair<T, dmc::PhaseType>;
typedef std::list<std::pair<cost_t, T> > ListPairs;
T dequeue() override final {
assert(!empty());
- if (!(high_queue.empty())) {
+ if (!high_queue.empty()) {
T ret = std::move(high_queue.rbegin()->second.front().second);
high_queue.rbegin()->second.pop_front();
if (high_queue.rbegin()->second.empty()) {
return std::move(*(retn.request));
}
+ Retn dequeue_distributed() {
+ assert(!empty());
+ dmc::PhaseType resp_params = dmc::PhaseType();
+
+ if (!high_queue.empty()) {
+ T ret = std::move(high_queue.rbegin()->second.front().second);
+ high_queue.rbegin()->second.pop_front();
+ if (high_queue.rbegin()->second.empty()) {
+ high_queue.erase(high_queue.rbegin()->first);
+ }
+ return std::make_pair(std::move(ret), resp_params);
+ }
+
+ if (!queue_front.empty()) {
+ T ret = std::move(queue_front.front().second);
+ queue_front.pop_front();
+ return std::make_pair(std::move(ret), resp_params);
+ }
+
+ auto pr = queue.pull_request();
+ assert(pr.is_retn());
+ auto& retn = pr.get_retn();
+ resp_params = retn.phase;
+ return std::make_pair(std::move(*(retn.request)), resp_params);
+ }
+
void dump(ceph::Formatter *f) const override final {
f->open_array_section("high_queues");
for (typename SubQueues::const_iterator p = high_queue.begin();
flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
- true);
+ true, op->qos_resp);
reply->set_reply_versions(v, uv);
m->get_connection()->send_message(reply);
}
TrackedOp(tracker, req->get_recv_stamp()),
rmw_flags(0), request(req),
hit_flag_points(0), latest_flag_point(0),
- hitset_inserted(false) {
+ hitset_inserted(false), qos_resp(dmc::PhaseType::reservation) {
if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) {
// don't warn as quickly for low priority ops
warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple;
#include "include/memory.h"
#include "osd/osd_types.h"
#include "common/TrackedOp.h"
+#include "common/mClockCommon.h"
/**
* The OpRequest takes in a Message* and takes over a single reference
epoch_t min_epoch = 0; ///< min epoch needed to handle this msg
bool hitset_inserted;
+ dmc::PhaseType qos_resp;
const Message *get_req() const { return request; }
Message *get_nonconst_req() { return request; }
// reply
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
- false);
+ false, op->qos_resp);
reply->claim_op_out_data(ops);
reply->set_result(result);
reply->set_reply_versions(info.last_update, info.last_user_version);
MOSDOpReply *reply = orig_reply.detach();
if (reply == nullptr) {
reply = new MOSDOpReply(m, r, pg->get_osdmap()->get_epoch(),
- flags, true);
+ flags, true, op->qos_resp);
}
ldpp_dout(pg, 10) << " sending commit on " << *m << " " << reply << dendl;
pg->osd->send_message_osd_client(reply, m->get_connection());
{
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
- MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT,
- get_osdmap()->get_epoch(), flags, false);
+ MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT, get_osdmap()->get_epoch(),
+ flags, false, op->qos_resp);
request_redirect_t redir(m->get_object_locator(), pool.info.tier_of);
reply->set_redirect(redir);
dout(10) << "sending redirect to pool " << pool.info.tier_of << " for op "
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
OpContext *ctx = new OpContext(op, m->get_reqid(), &prdop->ops, this);
- ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
+ ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false,
+ op->qos_resp);
ctx->user_at_version = prdop->user_version;
ctx->data_off = prdop->data_offset;
ctx->ignore_log_op_stats = true;
if (reply)
pwop->ctx->reply = NULL;
else {
- reply = new MOSDOpReply(m, r, get_osdmap()->get_epoch(), 0, true);
+ reply = new MOSDOpReply(m, r, get_osdmap()->get_epoch(), 0, true,
+ pwop->op->qos_resp);
reply->set_reply_versions(eversion_t(), pwop->user_version);
}
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
bool successful_write = !ctx->op_t->empty() && op->may_write() && result >= 0;
// prepare the reply
ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0,
- successful_write);
+ successful_write, op->qos_resp);
// Write operations aren't allowed to return a data payload because
// we can't do so reliably. If the client has to resend the request
if (reply)
ctx->reply = nullptr;
else {
- reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
+ reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true,
+ ctx->op->qos_resp);
reply->set_reply_versions(ctx->at_version,
ctx->user_at_version);
}
dout(20) << __func__ << " got reqids " << reply_obj.reqids << dendl;
::encode(reply_obj, osd_op.outdata, features);
osd_op.rval = -ENOENT;
- MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
+ MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false,
+ op->qos_resp);
reply->claim_op_out_data(m->ops);
reply->set_result(-ENOENT);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
// Return an op to be dispatched
inline Request mClockClientQueue::dequeue() {
- return queue.dequeue();
+ std::pair<Request, dmc::PhaseType> retn = queue.dequeue_distributed();
+
+ if (boost::optional<OpRequestRef> _op = retn.first.maybe_get_op()) {
+ (*_op)->qos_resp = retn.second;
+ }
+ return std::move(retn.first);
}
} // namespace ceph
/* get it before we call _finish_op() */
auto completion_lock = s->get_lock(op->target.base_oid);
+ if (mclock_service_tracker) {
+ qos_trk->track_resp(op->target.osd, m->get_qos_resp());
+ }
ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
_finish_op(op, 0);
}
+TEST_F(MClockClientQueueTest, TestDistributedEnqueue) {
+ Request r1 = create_snaptrim(100, client1);
+ Request r2 = create_snaptrim(101, client2);
+ Request r3 = create_snaptrim(102, client3);
+ Request r4 = create_snaptrim(103, client1);
+ Request r5 = create_snaptrim(104, client2);
+ Request r6 = create_snaptrim(105, client3);
+
+ r4.set_qos_params(dmc::ReqParams(50,1));
+ r5.set_qos_params(dmc::ReqParams(30,1));
+ r6.set_qos_params(dmc::ReqParams(10,1));
+
+ q.enqueue(client1, 12, 0, std::move(r1));
+ q.enqueue(client2, 12, 0, std::move(r2));
+ q.enqueue(client3, 12, 0, std::move(r3));
+ q.enqueue(client1, 12, 0, std::move(r4));
+ q.enqueue(client2, 12, 0, std::move(r5));
+ q.enqueue(client3, 12, 0, std::move(r6));
+
+ Request r = q.dequeue();
+ r = q.dequeue();
+ r = q.dequeue();
+
+ r = q.dequeue();
+ ASSERT_EQ(105u, r.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(104u, r.get_map_epoch());
+
+ r = q.dequeue();
+ ASSERT_EQ(103u, r.get_map_epoch());
+}
+
+
TEST_F(MClockClientQueueTest, TestEnqueueStrict) {
q.enqueue_strict(client1, 12, create_snaptrim(100, client1));
q.enqueue_strict(client2, 13, create_snaptrim(101, client2));