common/Cycles.cc
common/scrub_types.cc
common/bit_str.cc
- dmclock/src/dmclock_util.cc
- dmclock/support/src/run_every.cc
osdc/Striper.cc
osdc/Objecter.cc
common/Graylog.cc
OPTION(objecter_inject_no_watch_ping, OPT_BOOL) // suppress watch pings
OPTION(objecter_retry_writes_after_first_reply, OPT_BOOL) // ignore the first reply for each write, and resend the osd op instead
OPTION(objecter_debug_inject_relock_delay, OPT_BOOL)
-OPTION(objecter_mclock_service_tracker, OPT_BOOL)
// Max number of deletes at once in a single Filer::purge call
OPTION(filer_max_purge_ops, OPT_U32)
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2017 SK Telecom
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#pragma once
-
-#include <cstdint>
-#include "include/encoding.h"
-#include "dmclock/src/dmclock_recs.h"
-
-// the following is done to unclobber _ASSERT_H so it returns to the
-// way ceph likes it
-#include "include/assert.h"
-
-
-namespace ceph {
-namespace dmc = ::crimson::dmclock;
-}
-
-namespace crimson {
-namespace dmclock {
-
-WRITE_RAW_ENCODER(ReqParams)
-
-inline void encode(const PhaseType &phase, bufferlist& bl,
- uint64_t features=0)
-{
- using ceph::encode;
- encode(static_cast<std::uint8_t>(phase), bl);
-}
-
-inline void decode(PhaseType &phase, bufferlist::iterator& p)
-{
- using ceph::decode;
- std::uint8_t int_phase;
- decode((std::uint8_t&)int_phase, p);
- phase = static_cast<dmc::PhaseType>(int_phase);
-}
-}
-}
using priority_t = unsigned;
using cost_t = unsigned;
- using Retn = std::pair<T, dmc::PhaseType>;
typedef std::list<std::pair<cost_t, T> > ListPairs;
queue.add_request(std::move(item), cl, cost);
}
- void enqueue_distributed(K cl, unsigned priority, unsigned cost, T&& item,
- const dmc::ReqParams& req_params) {
- // priority is ignored
- queue.add_request(std::move(item), cl, req_params, cost);
- }
-
void enqueue_front(K cl,
unsigned priority,
unsigned cost,
return std::move(*(retn.request));
}
- Retn dequeue_distributed() {
- assert(!empty());
- dmc::PhaseType resp_params = dmc::PhaseType();
-
- if (!high_queue.empty()) {
- T ret = std::move(high_queue.rbegin()->second.front().second);
- high_queue.rbegin()->second.pop_front();
- if (high_queue.rbegin()->second.empty()) {
- high_queue.erase(high_queue.rbegin()->first);
- }
- return std::make_pair(std::move(ret), resp_params);
- }
-
- if (!queue_front.empty()) {
- T ret = std::move(queue_front.front().second);
- queue_front.pop_front();
- return std::make_pair(std::move(ret), resp_params);
- }
-
- auto pr = queue.pull_request();
- assert(pr.is_retn());
- auto& retn = pr.get_retn();
- resp_params = retn.phase;
- return std::make_pair(std::move(*(retn.request)), resp_params);
- }
-
void dump(ceph::Formatter *f) const override final {
f->open_array_section("high_queues");
for (typename SubQueues::const_iterator p = high_queue.begin();
.set_default(false)
.set_description(""),
- Option("objecter_mclock_service_tracker", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
- .set_default(false)
- .set_description("whether to enable mclock service tracker for tracking completed IOs in a distributed environment")
- .set_long_description("When using the client-side dmclock qos service in a distributed environment, you must enable mclock service tracker for tracking completed IOs.")
- .add_see_also("osd_op_queue"),
-
Option("filer_max_purge_ops", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(10)
.set_description("Max in-flight operations for purging a striped range (e.g., MDS journal)"),
DEFINE_CEPH_FEATURE_RETIRED(16, 1, QUERY_T, JEWEL, LUMINOUS)
DEFINE_CEPH_FEATURE(16, 3, SERVER_O)
DEFINE_CEPH_FEATURE_RETIRED(17, 1, INDEP_PG_MAP, JEWEL, LUMINOUS)
-DEFINE_CEPH_FEATURE(17, 2, QOS_DMC)
DEFINE_CEPH_FEATURE(17, 2, OS_PERF_STAT_NS)
DEFINE_CEPH_FEATURE(18, 1, CRUSH_TUNABLES)
CEPH_FEATURE_OSDREPLYMUX | \
CEPH_FEATURE_OSDENC | \
CEPH_FEATURE_MONENC | \
- CEPH_FEATURE_QOS_DMC | \
CEPH_FEATURE_CRUSH_TUNABLES | \
CEPH_FEATURE_MSG_AUTH | \
CEPH_FEATURE_CRUSH_TUNABLES2 | \
#include "include/ceph_features.h"
#include "common/hobject.h"
#include <atomic>
-#include "common/mClockCommon.h"
/*
* OSD op
class MOSDOp : public MOSDFastDispatchOp {
- static const int HEAD_VERSION = 9;
+ static const int HEAD_VERSION = 8;
static const int COMPAT_VERSION = 3;
private:
uint64_t features;
bool bdata_encode;
osd_reqid_t reqid; // reqid explicitly set by sender
- dmc::ReqParams qos_params;
public:
friend class MOSDOpReply;
void set_spg(spg_t p) {
pgid = p;
}
- void set_qos_params(const dmc::ReqParams& p) { qos_params = p; }
// Fields decoded in partial decoding
pg_t get_pg() const {
header.tid);
}
}
- const dmc::ReqParams& get_qos_params() const {
- assert(!partial_decode_needed);
- return qos_params;
- }
// Fields decoded in final decoding
int get_client_inc() const {
encode(retry_attempt, payload);
encode(features, payload);
} else {
- // v9 encoding for dmclock use, otherwise v8.
- // v8 encoding with hobject_t hash separate from pgid, no
- // reassert version.
- if (HAVE_FEATURE(features, QOS_DMC)) {
- header.version = HEAD_VERSION;
- } else {
- header.version = 8;
- }
+ // latest v8 encoding with hobject_t hash separate from pgid, no
+ // reassert version
+ header.version = HEAD_VERSION;
encode(pgid, payload);
encode(hobj.get_hash(), payload);
encode(osdmap_epoch, payload);
encode(flags, payload);
encode(reqid, payload);
- if (header.version >= 9) {
- encode(qos_params, payload);
- }
encode_trace(payload, features);
// -- above decoded up front; below decoded post-dispatch thread --
p = payload.begin();
// Always keep here the newest version of decoding order/rule
- if (header.version >= 8) {
+ if (header.version == HEAD_VERSION) {
decode(pgid, p); // actual pgid
uint32_t hash;
decode(hash, p); // raw hash value
decode(osdmap_epoch, p);
decode(flags, p);
decode(reqid, p);
- if (header.version >= 9)
- decode(qos_params, p);
decode_trace(p);
} else if (header.version == 7) {
decode(pgid.pgid, p); // raw pgid
#include "MOSDOp.h"
#include "os/ObjectStore.h"
#include "common/errno.h"
-#include "common/mClockCommon.h"
/*
* OSD op reply
class MOSDOpReply : public Message {
- static const int HEAD_VERSION = 9;
+ static const int HEAD_VERSION = 8;
static const int COMPAT_VERSION = 2;
object_t oid;
int32_t retry_attempt = -1;
bool do_redirect;
request_redirect_t redirect;
- dmc::PhaseType qos_resp;
public:
const object_t& get_oid() const { return oid; }
void set_redirect(const request_redirect_t& redir) { redirect = redir; }
const request_redirect_t& get_redirect() const { return redirect; }
bool is_redirect_reply() const { return do_redirect; }
- void set_qos_resp(const dmc::PhaseType qresp) { qos_resp = qresp; }
- dmc::PhaseType get_qos_resp() const { return qos_resp; }
void add_flags(int f) { flags |= f; }
bdata_encode(false) {
do_redirect = false;
}
- MOSDOpReply(const MOSDOp *req, int r, epoch_t e, int acktype, bool ignore_out_data,
- dmc::PhaseType qresp = dmc::PhaseType::reservation)
+ MOSDOpReply(const MOSDOp *req, int r, epoch_t e, int acktype,
+ bool ignore_out_data)
: Message(CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION),
oid(req->hobj.oid), pgid(req->pgid.pgid), ops(req->ops),
- bdata_encode(false), qos_resp(qresp) {
+ bdata_encode(false) {
set_tid(req->get_tid());
result = r;
if (do_redirect) {
encode(redirect, payload);
}
- if ((features & CEPH_FEATURE_QOS_DMC) == 0) {
- header.version = 8;
- } else {
- encode(qos_resp, payload);
- }
}
encode_trace(payload, features);
}
decode(do_redirect, p);
if (do_redirect)
decode(redirect, p);
- decode(qos_resp, p);
decode_trace(p);
} else if (header.version < 2) {
ceph_osd_reply_head head;
}
}
if (header.version >= 8) {
- if (header.version >= 9) {
- decode(qos_resp, p);
- }
decode_trace(p);
}
}
int flags;
flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
- MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
- true, op->qos_resp);
+ MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags, true);
reply->set_reply_versions(v, uv);
m->get_connection()->send_message(reply);
}
#include "include/utime.h"
#include "osd/OpRequest.h"
#include "osd/PG.h"
-#include "common/mClockCommon.h"
-#include "messages/MOSDOp.h"
#include "PGPeeringEvent.h"
class OSD;
private:
OpQueueable::Ref qitem;
int cost;
- dmc::ReqParams qos_params;
unsigned priority;
utime_t start_time;
uint64_t owner; ///< global id (e.g., client.XXX)
start_time(start_time),
owner(owner),
map_epoch(e)
- {
- if (auto op = maybe_get_op()) {
- auto req = (*op)->get_req();
- if (req->get_type() == CEPH_MSG_OSD_OP) {
- const MOSDOp *m = static_cast<const MOSDOp*>(req);
- qos_params = m->get_qos_params();
- }
- }
- }
+ {}
OpQueueItem(OpQueueItem &&) = default;
OpQueueItem(const OpQueueItem &) = delete;
OpQueueItem &operator=(OpQueueItem &&) = default;
utime_t get_start_time() const { return start_time; }
uint64_t get_owner() const { return owner; }
epoch_t get_map_epoch() const { return map_epoch; }
- dmc::ReqParams get_qos_params() const { return qos_params; }
- void set_qos_params(dmc::ReqParams qparams) { qos_params = qparams; }
bool is_peering() const {
return qitem->is_peering();
TrackedOp(tracker, req->get_recv_stamp()),
rmw_flags(0), request(req),
hit_flag_points(0), latest_flag_point(0),
- hitset_inserted(false), qos_resp(dmc::PhaseType::reservation) {
+ hitset_inserted(false)
+{
if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) {
// don't warn as quickly for low priority ops
warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple;
#include "osd/osd_types.h"
#include "common/TrackedOp.h"
-#include "common/mClockCommon.h"
/**
* The OpRequest takes in a Message* and takes over a single reference
epoch_t min_epoch = 0; ///< min epoch needed to handle this msg
bool hitset_inserted;
- dmc::PhaseType qos_resp;
const Message *get_req() const { return request; }
Message *get_nonconst_req() { return request; }
// reply
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
- false, op->qos_resp);
+ false);
reply->claim_op_out_data(ops);
reply->set_result(result);
reply->set_reply_versions(info.last_update, info.last_user_version);
MOSDOpReply *reply = orig_reply.detach();
if (reply == nullptr) {
reply = new MOSDOpReply(m, r, pg->get_osdmap()->get_epoch(),
- flags, true, op->qos_resp);
+ flags, true);
}
ldpp_dout(pg, 10) << " sending commit on " << *m << " " << reply << dendl;
pg->osd->send_message_osd_client(reply, m->get_connection());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT, get_osdmap()->get_epoch(),
- flags, false, op->qos_resp);
+ flags, false);
request_redirect_t redir(m->get_object_locator(), pool.info.tier_of);
reply->set_redirect(redir);
dout(10) << "sending redirect to pool " << pool.info.tier_of << " for op "
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
OpContext *ctx = new OpContext(op, m->get_reqid(), &prdop->ops, this);
- ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false,
- op->qos_resp);
+ ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
ctx->user_at_version = prdop->user_version;
ctx->data_off = prdop->data_offset;
ctx->ignore_log_op_stats = true;
if (reply)
pwop->ctx->reply = NULL;
else {
- reply = new MOSDOpReply(m, r, get_osdmap()->get_epoch(), 0, true,
- pwop->op->qos_resp);
+ reply = new MOSDOpReply(m, r, get_osdmap()->get_epoch(), 0, true);
reply->set_reply_versions(eversion_t(), pwop->user_version);
}
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
bool successful_write = !ctx->op_t->empty() && op->may_write() && result >= 0;
// prepare the reply
ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0,
- successful_write, op->qos_resp);
+ successful_write);
// Write operations aren't allowed to return a data payload because
// we can't do so reliably. If the client has to resend the request
if (reply)
ctx->reply = nullptr;
else {
- reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true,
- ctx->op->qos_resp);
+ reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
reply->set_reply_versions(ctx->at_version,
ctx->user_at_version);
}
dout(20) << __func__ << " got reqids " << reply_obj.reqids << dendl;
encode(reply_obj, osd_op.outdata, features);
osd_op.rval = -ENOENT;
- MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false,
- op->qos_resp);
+ MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
reply->claim_op_out_data(m->ops);
reply->set_result(-ENOENT);
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
unsigned priority,
unsigned cost,
Request&& item) {
- auto qos_params = item.get_qos_params();
- queue.enqueue_distributed(get_inner_client(cl, item), priority, cost,
- std::move(item), qos_params);
+ queue.enqueue(get_inner_client(cl, item), priority, cost, std::move(item));
}
// Enqueue the op in the front of the regular queue
// Return an op to be dispatched
inline Request mClockClientQueue::dequeue() {
- std::pair<Request, dmc::PhaseType> retn = queue.dequeue_distributed();
-
- if (boost::optional<OpRequestRef> _op = retn.first.maybe_get_op()) {
- (*_op)->qos_resp = retn.second;
- }
- return std::move(retn.first);
+ return queue.dequeue();
}
} // namespace ceph
if(WITH_LTTNG AND WITH_EVENTTRACE)
add_dependencies(osdc eventtrace_tp)
endif()
-target_link_libraries(osdc dmclock)
static const char *config_keys[] = {
"crush_location",
- "objecter_mclock_service_tracker",
NULL
};
if (changed.count("crush_location")) {
update_crush_location();
}
- if (changed.count("objecter_mclock_service_tracker")) {
- update_mclock_service_tracker();
- }
}
void Objecter::update_crush_location()
crush_location = cct->crush_location.get_location();
}
-void Objecter::update_mclock_service_tracker()
-{
- unique_lock wl(rwlock);
- if (cct->_conf->objecter_mclock_service_tracker && (!mclock_service_tracker)) {
- qos_trk = std::make_unique<dmc::ServiceTracker<int>>();
- } else if (!cct->_conf->objecter_mclock_service_tracker) {
- qos_trk.reset();
- }
- mclock_service_tracker = cct->_conf->objecter_mclock_service_tracker;
-}
-
// messages ------------------------------
/*
m->set_reqid(op->reqid);
}
- if (mclock_service_tracker) {
- dmc::ReqParams rp = qos_trk->get_req_params(op->target.osd);
- m->set_qos_params(rp);
- }
-
logger->inc(l_osdc_op_send);
ssize_t sum = 0;
for (unsigned i = 0; i < m->ops.size(); i++) {
/* get it before we call _finish_op() */
auto completion_lock = s->get_lock(op->target.base_oid);
- if (mclock_service_tracker) {
- qos_trk->track_resp(op->target.osd, m->get_qos_resp());
- }
ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
_finish_op(op, 0);
#include <boost/thread/shared_mutex.hpp>
-#include "dmclock/src/dmclock_client.h"
-
#include "include/assert.h"
#include "include/buffer.h"
#include "include/types.h"
MonClient *monc;
Finisher *finisher;
ZTracer::Endpoint trace_endpoint;
- std::unique_ptr<dmc::ServiceTracker<int>> qos_trk;
- std::atomic<bool> mclock_service_tracker;
private:
OSDMap *osdmap;
public:
void start_tick();
void tick();
void update_crush_location();
- void update_mclock_service_tracker();
class RequestStateHook;
double osd_timeout) :
Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
trace_endpoint("0.0.0.0", 0, "Objecter"),
- mclock_service_tracker(cct->_conf->objecter_mclock_service_tracker),
osdmap(new OSDMap),
max_linger_id(0),
keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops),
epoch_barrier(0),
retry_writes_after_first_reply(cct->_conf->objecter_retry_writes_after_first_reply)
- {
- if (cct->_conf->objecter_mclock_service_tracker) {
- qos_trk = std::make_unique<dmc::ServiceTracker<int>>();
- }
- }
+ { }
~Objecter() override;
void init();
}
-TEST_F(MClockClientQueueTest, TestDistributedEnqueue) {
- Request r1 = create_snaptrim(100, client1);
- Request r2 = create_snaptrim(101, client2);
- Request r3 = create_snaptrim(102, client3);
- Request r4 = create_snaptrim(103, client1);
- Request r5 = create_snaptrim(104, client2);
- Request r6 = create_snaptrim(105, client3);
-
- r4.set_qos_params(dmc::ReqParams(50,1));
- r5.set_qos_params(dmc::ReqParams(30,1));
- r6.set_qos_params(dmc::ReqParams(10,1));
-
- q.enqueue(client1, 12, 0, std::move(r1));
- q.enqueue(client2, 12, 0, std::move(r2));
- q.enqueue(client3, 12, 0, std::move(r3));
- q.enqueue(client1, 12, 0, std::move(r4));
- q.enqueue(client2, 12, 0, std::move(r5));
- q.enqueue(client3, 12, 0, std::move(r6));
-
- Request r = q.dequeue();
- r = q.dequeue();
- r = q.dequeue();
-
- r = q.dequeue();
- ASSERT_EQ(105u, r.get_map_epoch());
-
- r = q.dequeue();
- ASSERT_EQ(104u, r.get_map_epoch());
-
- r = q.dequeue();
- ASSERT_EQ(103u, r.get_map_epoch());
-}
-
-
TEST_F(MClockClientQueueTest, TestEnqueueStrict) {
q.enqueue_strict(client1, 12, create_snaptrim(100, client1));
q.enqueue_strict(client2, 13, create_snaptrim(101, client2));