From 9799d4928710c4a2b3e0d92fc57b098aea833063 Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 29 Aug 2007 20:58:06 +0000 Subject: [PATCH] cleaned up read shedding git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1734 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/config.cc | 8 +- trunk/ceph/config.h | 2 +- trunk/ceph/messages/MOSDOp.h | 10 +- trunk/ceph/osd/OSD.cc | 18 ++- trunk/ceph/osd/ReplicatedPG.cc | 211 +++++++++++++++++++-------------- trunk/ceph/osd/osd_types.h | 6 +- 6 files changed, 148 insertions(+), 107 deletions(-) diff --git a/trunk/ceph/config.cc b/trunk/ceph/config.cc index 4d536089c537b..6205ec2eaceeb 100644 --- a/trunk/ceph/config.cc +++ b/trunk/ceph/config.cc @@ -250,8 +250,8 @@ md_config_t g_conf = { osd_flash_crowd_iat_alpha: 0.125, osd_shed_reads: false, // forward from primary to replica - osd_shed_reads_min_latency: .001, - osd_shed_reads_min_load_diff: .2, + osd_shed_reads_min_latency: .001, // + osd_shed_reads_min_latency_ratio: 1.2, // 1.2 == 20% higher than peer osd_immediate_read_from_cache: true, // osds to read from the cache immediately? osd_exclusive_caching: true, // replicas evict replicated writes @@ -817,8 +817,8 @@ void parse_config_options(std::vector& args) g_conf.osd_shed_reads = atoi(args[++i]); else if (strcmp(args[i], "--osd_shed_reads_min_latency") == 0) g_conf.osd_shed_reads_min_latency = atof(args[++i]); - else if (strcmp(args[i], "--osd_shed_reads_min_load_diff") == 0) - g_conf.osd_shed_reads_min_load_diff = atof(args[++i]); + else if (strcmp(args[i], "--osd_shed_reads_min_latency_ratio") == 0) + g_conf.osd_shed_reads_min_latency_ratio = atof(args[++i]); else if ( strcmp(args[i],"--osd_immediate_read_from_cache" ) == 0) g_conf.osd_immediate_read_from_cache = atoi(args[++i]); diff --git a/trunk/ceph/config.h b/trunk/ceph/config.h index 424cddc0df393..0ff1bb72a7316 100644 --- a/trunk/ceph/config.h +++ b/trunk/ceph/config.h @@ -244,7 +244,7 @@ struct md_config_t { bool osd_shed_reads; double osd_shed_reads_min_latency; - double osd_shed_reads_min_load_diff; // .5 == 50% + double osd_shed_reads_min_latency_ratio; bool osd_immediate_read_from_cache; bool osd_exclusive_caching; diff --git a/trunk/ceph/messages/MOSDOp.h b/trunk/ceph/messages/MOSDOp.h index 0b2a1be0f66e9..2ba8842239523 100644 --- a/trunk/ceph/messages/MOSDOp.h +++ b/trunk/ceph/messages/MOSDOp.h @@ -119,12 +119,14 @@ private: bufferlist data; map attrset; - double request_received_time; + +public: // ugh + utime_t request_received_time; friend class MOSDOpReply; - public: +public: const osdreqid_t& get_reqid() { return st.reqid; } const tid_t get_client_tid() { return st.reqid.tid; } int get_client_inc() { return st.reqid.inc; } @@ -174,10 +176,10 @@ private: const bool wants_ack() { return st.want_ack; } const bool wants_commit() { return st.want_commit; } - void set_received_time(double time) { + void set_received_time(utime_t time) { request_received_time = time; } - double get_received_time() { + utime_t get_received_time() { return request_received_time; } diff --git a/trunk/ceph/osd/OSD.cc b/trunk/ceph/osd/OSD.cc index e601757bb231b..112b281cd56bc 100644 --- a/trunk/ceph/osd/OSD.cc +++ b/trunk/ceph/osd/OSD.cc @@ -250,8 +250,15 @@ int OSD::init() osd_logtype.add_inc("r_wr"); osd_logtype.add_inc("r_wrb"); + osd_logtype.add_set("qlen"); + osd_logtype.add_set("rqlen"); + osd_logtype.add_set("rdlat"); + osd_logtype.add_inc("shdout"); + osd_logtype.add_inc("shdin"); + + osd_logtype.add_inc("rlsum"); osd_logtype.add_inc("rlnum"); - + osd_logtype.add_set("numpg"); osd_logtype.add_set("pingset"); @@ -612,16 +619,19 @@ void OSD::_refresh_my_stat(utime_t now) my_stat.oprate = stat_oprate.get(now); // qlen - //double qlen = 0; - //if (stat_ops) qlen = - //qlen_calc.add(qlen); my_stat.qlen = (float)stat_qlen / (float)stat_ops; //get_average(); stat_ops = 0; stat_qlen = 0; + + qlen_calc.add(my_stat.qlen); + my_stat.recent_qlen = qlen_calc.get_average(); my_stat.read_latency = read_latency_calc.get_average(); if (my_stat.read_latency < 0) my_stat.read_latency = 0; + logger->fset("qlen", my_stat.qlen); + logger->fset("rqlen", my_stat.recent_qlen); + logger->fset("readlat", my_stat.read_latency); dout(-10) << "_refresh_my_stat " << my_stat << dendl; } } diff --git a/trunk/ceph/osd/ReplicatedPG.cc b/trunk/ceph/osd/ReplicatedPG.cc index 39b4b6cee14ab..397dd7aa672dc 100644 --- a/trunk/ceph/osd/ReplicatedPG.cc +++ b/trunk/ceph/osd/ReplicatedPG.cc @@ -162,97 +162,120 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op) } // -- read shedding - - // check my load. - // TODO xxx we must also compare with our own load - // if i am x percentage higher than replica , - // redirect the read - - if (g_conf.osd_shed_reads == LOAD_LATENCY) { + if (g_conf.osd_shed_reads) { Mutex::Locker lock(osd->peer_stat_lock); - // above some minimum? - if (osd->my_stat.read_latency >= .001) { - for (unsigned i=1; ipeer_stat.count(peer) == 0) continue; - - dout(-10) << "preprocess_op my read latency " << osd->my_stat.read_latency - << ", peer " << peer << " is " << osd->peer_stat[peer].read_latency - << " of peer" << peer << dendl; - - if ((osd->peer_stat[peer].read_latency/osd->my_stat.read_latency) < - (1.0 - g_conf.osd_shed_reads_min_load_diff)) { - dout(10) << " forwarding to peer osd" << peer << dendl; - osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); - return true; - } - } - } - } - else if (g_conf.osd_shed_reads == LOAD_QUEUE_SIZE && osd->stat_ops) { - Mutex::Locker lock(osd->peer_stat_lock); + osd->_refresh_my_stat(op->request_received_time); - // am i above my average? - float my_avg = osd->stat_qlen / osd->stat_ops; + // check my load. + // TODO xxx we must also compare with our own load + // if i am x percentage higher than replica , + // redirect the read + + int shedto = -1; + double bestscore = 0.0; // highest positive score wins - if (osd->pending_ops > my_avg) { - // is there a peer who is below my average? - for (unsigned i=1; ipeer_stat_lock); - if (osd->peer_stat.count(peer) && - osd->peer_stat[peer].qlen < my_avg) { - // calculate a probability that we should redirect - float p = (my_avg - osd->peer_stat[peer].qlen) / my_avg; // this is dumb. - - if (drand48() <= p) { - // take the first one - dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << my_avg - << ", p=" << p - << ", fwd to peer w/ qlen " << osd->peer_stat[peer].qlen - << " osd" << peer - << dendl; - osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); - return true; + // we calculate score values such that we can interpret them as a probability. + + switch (g_conf.osd_shed_reads) { + case LOAD_LATENCY: + // above some minimum? + if (osd->my_stat.read_latency >= .001) { + for (unsigned i=1; ipeer_stat.count(peer) == 0) continue; + + double c = .002; // add in a constant to smooth it a bit + double latratio = + (c+osd->my_stat.read_latency) / + (c+osd->peer_stat[peer].read_latency); + double p = (latratio - 1.0) / 2.0; + dout(-15) << "preprocess_op my read latency " << osd->my_stat.read_latency + << ", peer osd" << peer << " is " << osd->peer_stat[peer].read_latency + << ", latratio " << latratio + << ", p=" << p + << dendl; + if (latratio > g_conf.osd_shed_reads_min_latency_ratio && + p > bestscore && + drand48() < p) { + shedto = peer; + bestscore = p; } } } - } - } - - else if ( g_conf.osd_shed_reads == LOAD_HYBRID ) { - // am i above my average? - float my_avg = osd->stat_qlen / osd->stat_ops; - - if (osd->pending_ops > my_avg) { - // is there a peer who is below my average? - for (unsigned i=1; ipeer_stat_lock); - if (osd->peer_stat.count(peer) && - osd->peer_stat[peer].qlen < my_avg) { - // calculate a probability that we should redirect - //float p = (my_avg - peer_stat[peer].qlen) / my_avg; // this is dumb. - - double mean_read_time = osd->my_stat.read_latency; - - if ( mean_read_time != -1 && - osd->peer_stat.count(peer) && - ( (osd->peer_stat[peer].read_latency/mean_read_time) < - ( 1.0 - g_conf.osd_shed_reads_min_load_diff) ) ) - //if (drand48() <= p) { - // take the first one - dout(10) << "using hybrid :my qlen " << osd->pending_ops << " > my_avg " << my_avg - << "my read time "<< mean_read_time - << "peer read time " << osd->peer_stat[peer].read_latency - << ", fwd to peer w/ qlen " << osd->peer_stat[peer].read_latency - << " osd" << peer + break; + + /* + case LOAD_QUEUE_SIZE: + // am i above my average? + if (osd->pending_ops > osd->my_stat.qlen) { + // yes. is there a peer who is below my average? + for (unsigned i=1; ipeer_stat.count(peer) == 0) continue; + if (osd->peer_stat[peer].qlen < osd->my_stat.qlen) { + // calculate a probability that we should redirect + float p = (osd->my_stat.qlen - osd->peer_stat[peer].qlen) / osd->my_stat.qlen; // this is dumb. + float v = 1.0 - p; + + dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << osd->my_stat.qlen + << ", peer osd" << peer << " has qlen " << osd->peer_stat[peer].qlen + << ", p=" << p + << ", v= "<< v << dendl; - osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); - return true; + + if (v > bestscore) { + shedto = peer; + bestscore = v; + } + } + } + } + break;*/ + + case LOAD_HYBRID: + if (osd->pending_ops > osd->my_stat.qlen && + osd->my_stat.read_latency > 0.0) { + // is there a peer who is below my average? + for (unsigned i=1; ipeer_stat.count(peer) == 0) continue; + if (osd->peer_stat[peer].qlen < osd->my_stat.qlen) { + + double qratio = osd->pending_ops / osd->peer_stat[peer].qlen; + + double c = .002; // add in a constant to smooth it a bit + double latratio = + (c+osd->my_stat.read_latency)/ + (c+osd->peer_stat[peer].read_latency); + double p = (latratio - 1.0) / 2.0; + + dout(-15) << "preprocess_op my qlen / read latency " + << osd->pending_ops << " " << osd->my_stat.read_latency + << ", peer osd" << peer << " is " + << osd->peer_stat[peer].qlen << " " << osd->peer_stat[peer].read_latency + << ", qratio " << qratio + << ", latratio " << latratio + << ", p=" << p + << dendl; + if (latratio > g_conf.osd_shed_reads_min_latency_ratio && + p > bestscore && + drand48() < p) { + shedto = peer; + bestscore = p; + } + } } } + break; + } + + // shed? + if (shedto >= 0) { + dout(-10) << "preprocess_op shedding read to peer osd" << shedto << dendl; + osd->messenger->send_message(op, osd->osdmap->get_inst(shedto)); + osd->logger->inc("shdout"); + return true; } } } // endif balance reads @@ -384,17 +407,21 @@ void ReplicatedPG::op_read(MOSDOp *op) // !primary and unbalanced? // (ignore ops forwarded from the primary) - if (!is_primary() && - !(op->get_source().is_osd() && - op->get_source().num() == get_primary())) { - // make sure i exist and am balanced, otherwise fw back to acker. - bool b; - if (!osd->store->exists(oid) || - osd->store->getattr(oid, "balance-reads", &b, 1) < 0) { - dout(-10) << "read on replica, object " << oid - << " dne or no balance-reads, fw back to primary" << dendl; - osd->messenger->send_message(op, osd->osdmap->get_inst(get_acker())); - return; + if (!is_primary()) { + if (op->get_source().is_osd() && + op->get_source().num() == get_primary()) { + // read was shed to me by the primary + osd->logger->inc("shdin"); + } else { + // make sure i exist and am balanced, otherwise fw back to acker. + bool b; + if (!osd->store->exists(oid) || + osd->store->getattr(oid, "balance-reads", &b, 1) < 0) { + dout(-10) << "read on replica, object " << oid + << " dne or no balance-reads, fw back to primary" << dendl; + osd->messenger->send_message(op, osd->osdmap->get_inst(get_acker())); + return; + } } } diff --git a/trunk/ceph/osd/osd_types.h b/trunk/ceph/osd/osd_types.h index 916a5e166eabb..32cade616f216 100644 --- a/trunk/ceph/osd/osd_types.h +++ b/trunk/ceph/osd/osd_types.h @@ -254,14 +254,16 @@ struct osd_peer_stat_t { utime_t stamp; double oprate; double qlen; + double recent_qlen; double read_latency; - osd_peer_stat_t() : oprate(0), qlen(0), read_latency(0) {} + osd_peer_stat_t() : oprate(0), qlen(0), recent_qlen(0), read_latency(0) {} }; inline ostream& operator<<(ostream& out, const osd_peer_stat_t &stat) { return out << "stat(" << stat.stamp << " oprate=" << stat.oprate - << " qlen=" << stat.qlen + << " qlen=" << stat.qlen + << " recent_qlen=" << stat.recent_qlen << " read_latency=" << stat.read_latency << ")"; } -- 2.39.5