From e611b136d6b9fdc6032780829359fb7088249f72 Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 29 Aug 2007 19:23:23 +0000 Subject: [PATCH] osd load calc changes git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1733 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/config.cc | 30 ++++-- trunk/ceph/config.h | 12 ++- trunk/ceph/messages/MOSDOp.h | 4 + trunk/ceph/messages/MOSDOpReply.h | 5 + trunk/ceph/messages/MOSDPing.h | 17 ++-- trunk/ceph/osd/OSD.cc | 70 +++++++++----- trunk/ceph/osd/OSD.h | 156 ++++++++++++++++-------------- trunk/ceph/osd/ReplicatedPG.cc | 113 +++++++++++++--------- trunk/ceph/osd/ReplicatedPG.h | 2 +- trunk/ceph/osd/osd_types.h | 16 +++ 10 files changed, 252 insertions(+), 173 deletions(-) diff --git a/trunk/ceph/config.cc b/trunk/ceph/config.cc index 26004c143fa47..4d536089c537b 100644 --- a/trunk/ceph/config.cc +++ b/trunk/ceph/config.cc @@ -245,12 +245,16 @@ md_config_t g_conf = { // --- osd --- osd_rep: OSD_REP_PRIMARY, - osd_balance_reads: false, - osd_immediate_read_from_cache: true, // osds to read from the cache immediately? - osd_exclusive_caching: true, // replicas evict replicated writes - osd_load_diff_percent: 20, // load diff for read forwarding + osd_balance_reads: false, // send from client to replica osd_flash_crowd_iat_threshold: 100, osd_flash_crowd_iat_alpha: 0.125, + + osd_shed_reads: false, // forward from primary to replica + osd_shed_reads_min_latency: .001, + osd_shed_reads_min_load_diff: .2, + + osd_immediate_read_from_cache: true, // osds to read from the cache immediately? + osd_exclusive_caching: true, // replicas evict replicated writes osd_pg_bits: 0, // 0 == let osdmonitor decide osd_object_layout: OBJECT_LAYOUT_HASHINO, @@ -804,17 +808,23 @@ void parse_config_options(std::vector& args) else if (strcmp(args[i], "--osd_balance_reads") == 0) g_conf.osd_balance_reads = atoi(args[++i]); - else if ( strcmp(args[i],"--osd_immediate_read_from_cache" ) == 0) - g_conf.osd_immediate_read_from_cache = atoi(args[++i]); - else if ( strcmp(args[i],"--osd_exclusive_caching" ) == 0) - g_conf.osd_exclusive_caching = atoi(args[++i]); - else if (strcmp(args[i], "--osd_load_diff_percent") == 0) - g_conf.osd_load_diff_percent = atoi(args[++i]); else if (strcmp(args[i], "--osd_flash_crowd_iat_threshold") == 0) g_conf.osd_flash_crowd_iat_threshold = atoi(args[++i]); else if (strcmp(args[i], "--osd_flash_crowd_iat_alpha") == 0) g_conf.osd_flash_crowd_iat_alpha = atoi(args[++i]); + else if (strcmp(args[i], "--osd_shed_reads") == 0) + g_conf.osd_shed_reads = atoi(args[++i]); + else if (strcmp(args[i], "--osd_shed_reads_min_latency") == 0) + g_conf.osd_shed_reads_min_latency = atof(args[++i]); + else if (strcmp(args[i], "--osd_shed_reads_min_load_diff") == 0) + g_conf.osd_shed_reads_min_load_diff = atof(args[++i]); + + else if ( strcmp(args[i],"--osd_immediate_read_from_cache" ) == 0) + g_conf.osd_immediate_read_from_cache = atoi(args[++i]); + else if ( strcmp(args[i],"--osd_exclusive_caching" ) == 0) + g_conf.osd_exclusive_caching = atoi(args[++i]); + else if (strcmp(args[i], "--osd_rep") == 0) g_conf.osd_rep = atoi(args[++i]); else if (strcmp(args[i], "--osd_rep_chain") == 0) diff --git a/trunk/ceph/config.h b/trunk/ceph/config.h index 4b183fdfd2cc9..424cddc0df393 100644 --- a/trunk/ceph/config.h +++ b/trunk/ceph/config.h @@ -238,13 +238,17 @@ struct md_config_t { // osd int osd_rep; - bool osd_balance_reads; - bool osd_immediate_read_from_cache; - bool osd_exclusive_caching; - int osd_load_diff_percent; + bool osd_balance_reads; int osd_flash_crowd_iat_threshold; // flash crowd interarrival time threshold in ms double osd_flash_crowd_iat_alpha; + bool osd_shed_reads; + double osd_shed_reads_min_latency; + double osd_shed_reads_min_load_diff; // .5 == 50% + + bool osd_immediate_read_from_cache; + bool osd_exclusive_caching; + int osd_pg_bits; int osd_object_layout; int osd_pg_layout; diff --git a/trunk/ceph/messages/MOSDOp.h b/trunk/ceph/messages/MOSDOp.h index e6695be8ad02c..0b2a1be0f66e9 100644 --- a/trunk/ceph/messages/MOSDOp.h +++ b/trunk/ceph/messages/MOSDOp.h @@ -113,6 +113,8 @@ private: bool want_ack; bool want_commit; bool retry_attempt; + + osd_peer_stat_t peer_stat; } st; bufferlist data; @@ -179,6 +181,8 @@ private: return request_received_time; } + void set_peer_stat(osd_peer_stat_t& stat) { st.peer_stat = stat; } + const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; } void set_data(bufferlist &d) { data.claim(d); diff --git a/trunk/ceph/messages/MOSDOpReply.h b/trunk/ceph/messages/MOSDOpReply.h index ed2971296d9a7..bb5d413228e82 100644 --- a/trunk/ceph/messages/MOSDOpReply.h +++ b/trunk/ceph/messages/MOSDOpReply.h @@ -51,6 +51,8 @@ class MOSDOpReply : public Message { eversion_t pg_complete_thru; epoch_t map_epoch; + + osd_peer_stat_t peer_stat; } st; bufferlist data; @@ -85,6 +87,9 @@ class MOSDOpReply : public Message { void set_op(int op) { st.op = op; } void set_rep_tid(tid_t t) { st.rep_tid = t; } + void set_peer_stat(osd_peer_stat_t& stat) { st.peer_stat = stat; } + const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; } + // data payload void set_data(bufferlist &d) { data.claim(d); diff --git a/trunk/ceph/messages/MOSDPing.h b/trunk/ceph/messages/MOSDPing.h index dda21888c31d7..37be289c0a923 100644 --- a/trunk/ceph/messages/MOSDPing.h +++ b/trunk/ceph/messages/MOSDPing.h @@ -18,34 +18,29 @@ #include "common/Clock.h" #include "msg/Message.h" +#include "osd/osd_types.h" class MOSDPing : public Message { public: epoch_t map_epoch; bool ack; - float avg_qlen; - double read_mean_time; + osd_peer_stat_t peer_stat; - MOSDPing(epoch_t e, - float aq, - double _read_mean_time, - bool a=false) : Message(MSG_OSD_PING), map_epoch(e), ack(a), avg_qlen(aq), read_mean_time(_read_mean_time) { - } + MOSDPing(epoch_t e, osd_peer_stat_t& ps, bool a=false) : + Message(MSG_OSD_PING), map_epoch(e), ack(a), peer_stat(ps) { } MOSDPing() {} virtual void decode_payload() { int off = 0; ::_decode(map_epoch, payload, off); ::_decode(ack, payload, off); - ::_decode(avg_qlen, payload, off); - ::_decode(read_mean_time, payload, off); + ::_decode(peer_stat, payload, off); } virtual void encode_payload() { ::_encode(map_epoch, payload); ::_encode(ack, payload); - ::_encode(avg_qlen, payload); - ::_encode(read_mean_time, payload); + ::_encode(peer_stat, payload); } virtual char *get_type_name() { return "osd_ping"; } diff --git a/trunk/ceph/osd/OSD.cc b/trunk/ceph/osd/OSD.cc index 3dcf3b0de26da..e601757bb231b 100644 --- a/trunk/ceph/osd/OSD.cc +++ b/trunk/ceph/osd/OSD.cc @@ -109,7 +109,9 @@ LogType osd_logtype; OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : timer(osd_lock), - load_calc(g_conf.osd_max_opq<1?1:g_conf.osd_max_opq), + stat_oprate(5.0), + read_latency_calc(g_conf.osd_max_opq<1?1:g_conf.osd_max_opq), + qlen_calc(3), iat_averager(g_conf.osd_flash_crowd_iat_alpha) { whoami = id; @@ -124,8 +126,8 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : state = STATE_BOOTING; - hb_stat_ops = 0; - hb_stat_qlen = 0; + stat_ops = 0; + stat_qlen = 0; pending_ops = 0; waiting_for_no_ops = false; @@ -599,6 +601,31 @@ void OSD::activate_pg(pg_t pgid, epoch_t epoch) // ------------------------------------- +void OSD::_refresh_my_stat(utime_t now) +{ + assert(peer_stat_lock.is_locked()); + + // refresh? + if (now - my_stat.stamp > .5) { + my_stat.stamp = now; + + my_stat.oprate = stat_oprate.get(now); + + // qlen + //double qlen = 0; + //if (stat_ops) qlen = + //qlen_calc.add(qlen); + my_stat.qlen = (float)stat_qlen / (float)stat_ops; //get_average(); + stat_ops = 0; + stat_qlen = 0; + + my_stat.read_latency = read_latency_calc.get_average(); + if (my_stat.read_latency < 0) my_stat.read_latency = 0; + + dout(-10) << "_refresh_my_stat " << my_stat << dendl; + } +} + void OSD::heartbeat() { utime_t now = g_clock.now(); @@ -606,21 +633,13 @@ void OSD::heartbeat() since.sec_ref() -= g_conf.osd_heartbeat_interval; // calc my stats - float avg_qlen = 0; - if (hb_stat_ops) avg_qlen = (float)hb_stat_qlen / (float)hb_stat_ops; + peer_stat_lock.Lock(); + _refresh_my_stat(now); + dout(-5) << "heartbeat: " << my_stat << dendl; + peer_stat_lock.Unlock(); - double read_mean_time = load_calc.get_average(); - - dout(5) << "heartbeat " << now - << ": ops " << hb_stat_ops - << ", avg qlen " << avg_qlen - << ", mean read time " << read_mean_time - << dendl; + //load_calc.set_size(stat_ops); - // reset until next time around - hb_stat_ops = 0; - hb_stat_qlen = 0; - // send pings set pingset; for (hash_map::iterator i = pg_map.begin(); @@ -641,10 +660,8 @@ void OSD::heartbeat() i != pingset.end(); i++) { _share_map_outgoing( osdmap->get_inst(*i) ); - messenger->send_message(new MOSDPing(osdmap->get_epoch(), - avg_qlen, - read_mean_time ), - osdmap->get_inst(*i)); + messenger->send_message(new MOSDPing(osdmap->get_epoch(), my_stat), + osdmap->get_inst(*i)); } if (logger) logger->set("pingset", pingset.size()); @@ -937,8 +954,9 @@ void OSD::handle_osd_ping(MOSDPing *m) _share_map_incoming(m->get_source_inst(), ((MOSDPing*)m)->map_epoch); int from = m->get_source().num(); - peer_qlen[from] = m->avg_qlen; - peer_read_time[from] = m->read_mean_time; + peer_stat_lock.Lock(); + peer_stat[from] = m->peer_stat; + peer_stat_lock.Unlock(); delete m; } @@ -1947,11 +1965,13 @@ void OSD::handle_op(MOSDOp *op) // mark the read request received time for finding the // read througput load. - op->set_received_time(g_clock.now()); + utime_t now = g_clock.now(); + op->set_received_time(now); // update qlen stats - hb_stat_ops++; - hb_stat_qlen += pending_ops; + stat_oprate.hit(now); + stat_ops++; + stat_qlen += pending_ops; // require same or newer map if (!require_same_or_newer_map(op, op->get_map_epoch())) { diff --git a/trunk/ceph/osd/OSD.h b/trunk/ceph/osd/OSD.h index eaab58511509a..ed2006b9ec4e0 100644 --- a/trunk/ceph/osd/OSD.h +++ b/trunk/ceph/osd/OSD.h @@ -26,6 +26,8 @@ #include "ObjectStore.h" #include "PG.h" +#include "common/DecayCounter.h" + #include using namespace std; @@ -49,24 +51,92 @@ public: static const int STATE_STOPPING = 3; + + /** OSD **/ +protected: + Mutex osd_lock; // global lock + SafeTimer timer; // safe timer + + Messenger *messenger; + Logger *logger; + ObjectStore *store; + MonMap *monmap; + + int whoami; + char dev_path[100]; + +public: + int get_nodeid() { return whoami; } + +private: + /** superblock **/ + OSDSuperblock superblock; + epoch_t boot_epoch; + + object_t get_osdmap_object_name(epoch_t epoch) { return object_t(0,epoch << 1); } + object_t get_inc_osdmap_object_name(epoch_t epoch) { return object_t(0, (epoch << 1) + 1); } + + void write_superblock(); + void write_superblock(ObjectStore::Transaction& t); + int read_superblock(); + + + // -- state -- + int state; + +public: + bool is_booting() { return state == STATE_BOOTING; } + bool is_active() { return state == STATE_ACTIVE; } + bool is_stopping() { return state == STATE_STOPPING; } + +private: + + // heartbeat + void heartbeat(); + + class C_Heartbeat : public Context { + OSD *osd; + public: + C_Heartbeat(OSD *o) : osd(o) {} + void finish(int r) { + osd->heartbeat(); + } + }; + + + // -- stats -- + DecayCounter stat_oprate; + int stat_ops; // ops since last heartbeat + int stat_qlen; // cumulative queue length since last refresh + + Mutex peer_stat_lock; + osd_peer_stat_t my_stat; + hash_map peer_stat; + + void _refresh_my_stat(utime_t now); + // load calculation //current implementation is moving averges. - class LoadCalculator { + class MovingAverager { private: Mutex lock; - deque m_Data ; - unsigned m_Size ; - double m_Total ; + deque m_Data; + unsigned m_Size; + double m_Total; public: - LoadCalculator( unsigned size ) : m_Size(0), m_Total(0) { } + MovingAverager(unsigned size) : m_Size(size), m_Total(0) { } + + void set_size(unsigned size) { + m_Size = size; + } - void add( double element ) { + void add(double value) { Mutex::Locker locker(lock); // add item - m_Data.push_back(element); - m_Total += element; + m_Data.push_back(value); + m_Total += value; // trim while (m_Data.size() > m_Size) { @@ -77,12 +147,10 @@ public: double get_average() { Mutex::Locker locker(lock); - - if (m_Data.empty()) - return -1; + if (m_Data.empty()) return -1; return m_Total / (double)m_Data.size(); } - }; + } read_latency_calc, qlen_calc; class IATAverager { public: @@ -125,70 +193,8 @@ public: } }; - - /** OSD **/ -protected: - Mutex osd_lock; // global lock - SafeTimer timer; // safe timer - - Messenger *messenger; - Logger *logger; - ObjectStore *store; - MonMap *monmap; - - LoadCalculator load_calc; IATAverager iat_averager; - - int whoami; - char dev_path[100]; - -public: - int get_nodeid() { return whoami; } - -private: - /** superblock **/ - OSDSuperblock superblock; - epoch_t boot_epoch; - - object_t get_osdmap_object_name(epoch_t epoch) { return object_t(0,epoch << 1); } - object_t get_inc_osdmap_object_name(epoch_t epoch) { return object_t(0, (epoch << 1) + 1); } - - void write_superblock(); - void write_superblock(ObjectStore::Transaction& t); - int read_superblock(); - - - // -- state -- - int state; - -public: - bool is_booting() { return state == STATE_BOOTING; } - bool is_active() { return state == STATE_ACTIVE; } - bool is_stopping() { return state == STATE_STOPPING; } - -private: - - // heartbeat - void heartbeat(); - - class C_Heartbeat : public Context { - OSD *osd; - public: - C_Heartbeat(OSD *o) : osd(o) {} - void finish(int r) { - osd->heartbeat(); - } - }; - - - // -- stats -- - int hb_stat_ops; // ops since last heartbeat - int hb_stat_qlen; // cumulative queue length since last hb - - hash_map peer_qlen; - hash_map peer_read_time; - - + // -- waiters -- list finished; diff --git a/trunk/ceph/osd/ReplicatedPG.cc b/trunk/ceph/osd/ReplicatedPG.cc index 78723fa3a0bc0..39b4b6cee14ab 100644 --- a/trunk/ceph/osd/ReplicatedPG.cc +++ b/trunk/ceph/osd/ReplicatedPG.cc @@ -108,7 +108,7 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op) // -- load balance reads -- - if (g_conf.osd_balance_reads && + if (g_conf.osd_shed_reads && is_primary() && g_conf.osd_rep == OSD_REP_PRIMARY) { // -- read on primary+acker --- @@ -124,7 +124,8 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op) } // -- flash crowd? - if (!op->get_source().is_osd() && + if (g_conf.osd_balance_reads && + !op->get_source().is_osd() && is_primary()) { // add sample osd->iat_averager.add_sample( op->get_oid(), (double)g_clock.now() ); @@ -160,55 +161,56 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op) } } - + // -- read shedding + // check my load. // TODO xxx we must also compare with our own load // if i am x percentage higher than replica , // redirect the read - if ( g_conf.osd_balance_reads == LOAD_LATENCY) { - double mean_read_time = osd->load_calc.get_average(); - - if ( mean_read_time != -1 ) { - - for (unsigned i=1; - ipeer_stat_lock); + + // above some minimum? + if (osd->my_stat.read_latency >= .001) { + for (unsigned i=1; ipeer_stat.count(peer) == 0) continue; - dout(10) << "my read time " << mean_read_time - << "peer_readtime" << osd->peer_read_time[peer] + dout(-10) << "preprocess_op my read latency " << osd->my_stat.read_latency + << ", peer " << peer << " is " << osd->peer_stat[peer].read_latency << " of peer" << peer << dendl; - if ( osd->peer_read_time.count(peer) && - ( (osd->peer_read_time[peer]*100/mean_read_time) < - (100 - g_conf.osd_load_diff_percent))) { + if ((osd->peer_stat[peer].read_latency/osd->my_stat.read_latency) < + (1.0 - g_conf.osd_shed_reads_min_load_diff)) { dout(10) << " forwarding to peer osd" << peer << dendl; - osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); return true; } } } } - else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE ) { + else if (g_conf.osd_shed_reads == LOAD_QUEUE_SIZE && osd->stat_ops) { + Mutex::Locker lock(osd->peer_stat_lock); + // am i above my average? - float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops; + float my_avg = osd->stat_qlen / osd->stat_ops; if (osd->pending_ops > my_avg) { // is there a peer who is below my average? for (unsigned i=1; ipeer_qlen.count(peer) && - osd->peer_qlen[peer] < my_avg) { + Mutex::Locker lock(osd->peer_stat_lock); + if (osd->peer_stat.count(peer) && + osd->peer_stat[peer].qlen < my_avg) { // calculate a probability that we should redirect - float p = (my_avg - osd->peer_qlen[peer]) / my_avg; // this is dumb. + float p = (my_avg - osd->peer_stat[peer].qlen) / my_avg; // this is dumb. if (drand48() <= p) { // take the first one dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << my_avg << ", p=" << p - << ", fwd to peer w/ qlen " << osd->peer_qlen[peer] + << ", fwd to peer w/ qlen " << osd->peer_stat[peer].qlen << " osd" << peer << dendl; osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); @@ -219,31 +221,32 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op) } } - else if ( g_conf.osd_balance_reads == LOAD_HYBRID ) { + else if ( g_conf.osd_shed_reads == LOAD_HYBRID ) { // am i above my average? - float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops; + float my_avg = osd->stat_qlen / osd->stat_ops; if (osd->pending_ops > my_avg) { // is there a peer who is below my average? for (unsigned i=1; ipeer_qlen.count(peer) && - osd->peer_qlen[peer] < my_avg) { + Mutex::Locker lock(osd->peer_stat_lock); + if (osd->peer_stat.count(peer) && + osd->peer_stat[peer].qlen < my_avg) { // calculate a probability that we should redirect - //float p = (my_avg - peer_qlen[peer]) / my_avg; // this is dumb. + //float p = (my_avg - peer_stat[peer].qlen) / my_avg; // this is dumb. - double mean_read_time = osd->load_calc.get_average(); + double mean_read_time = osd->my_stat.read_latency; if ( mean_read_time != -1 && - osd->peer_read_time.count(peer) && - ( (osd->peer_read_time[peer]*100/mean_read_time) < - ( 100 - g_conf.osd_load_diff_percent) ) ) + osd->peer_stat.count(peer) && + ( (osd->peer_stat[peer].read_latency/mean_read_time) < + ( 1.0 - g_conf.osd_shed_reads_min_load_diff) ) ) //if (drand48() <= p) { // take the first one dout(10) << "using hybrid :my qlen " << osd->pending_ops << " > my_avg " << my_avg << "my read time "<< mean_read_time - << "peer read time " << osd->peer_read_time[peer] - << ", fwd to peer w/ qlen " << osd->peer_qlen[peer] + << "peer read time " << osd->peer_stat[peer].read_latency + << ", fwd to peer w/ qlen " << osd->peer_stat[peer].read_latency << " osd" << peer << dendl; osd->messenger->send_message(op, osd->osdmap->get_inst(peer)); @@ -261,11 +264,11 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op) if (osd->store->is_cached( op->get_oid() , op->get_offset(), op->get_length() ) == 0) { - if (!is_primary()) { + if (!is_primary() && !op->get_source().is_osd()) { // am i allowed? bool v; if (osd->store->getattr(op->get_oid(), "balance-reads", &v, 1) < 0) { - dout(10) << "preprocess_op in-cache but no balance-reads on " << op->get_oid() + dout(-10) << "preprocess_op in-cache but no balance-reads on " << op->get_oid() << ", fwd to primary" << dendl; osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary())); return true; @@ -439,16 +442,15 @@ void ReplicatedPG::op_read(MOSDOp *op) if (r >= 0) { reply->set_result(0); - dout(10) << "READ TIME DIFF" - << (double)g_clock.now()-op->get_received_time() - << dendl; - osd->load_calc.add((double)g_clock.now() - op->get_received_time()); + utime_t diff = g_clock.now(); + diff -= op->get_received_time(); + dout(10) << "op_read total op latency " << diff << dendl; + osd->read_latency_calc.add(diff); } else { reply->set_result(r); // error } - // send it osd->messenger->send_message(reply, op->get_client_inst()); @@ -803,7 +805,7 @@ void ReplicatedPG::put_rep_gather(RepGather *repop) } -void ReplicatedPG::issue_repop(MOSDOp *op, int dest) +void ReplicatedPG::issue_repop(MOSDOp *op, int dest, osd_peer_stat_t& stat) { object_t oid = op->get_oid(); @@ -825,6 +827,8 @@ void ReplicatedPG::issue_repop(MOSDOp *op, int dest) wr->set_rep_tid(op->get_rep_tid()); wr->set_pg_trim_to(peers_complete_thru); + + wr->set_peer_stat(stat); osd->messenger->send_message(wr, osd->osdmap->get_inst(dest)); } @@ -1122,6 +1126,13 @@ void ReplicatedPG::op_modify(MOSDOp *op) osd->logger->inc("c_wrb", op->get_length()); } + // note my stats + osd->peer_stat_lock.Lock(); + osd->_refresh_my_stat(g_clock.now()); + osd_peer_stat_t stat = osd->my_stat; + osd->peer_stat_lock.Unlock(); + + // issue replica writes RepGather *repop = 0; bool alone = (acting.size() == 1); @@ -1133,13 +1144,13 @@ void ReplicatedPG::op_modify(MOSDOp *op) int next = acting[1]; if (acting.size() > 2) next = acting[2]; - issue_repop(op, next); + issue_repop(op, next, stat); } else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) { // splay rep. send to rest. for (unsigned i=1; i=1; --i) - issue_repop(op, acting[i]); + issue_repop(op, acting[i], stat); } else { // primary rep, or alone. repop = new_rep_gather(op); @@ -1147,7 +1158,7 @@ void ReplicatedPG::op_modify(MOSDOp *op) // send to rest. if (!alone) for (unsigned i=1; iget_offset() << "~" << op->get_length() << dendl; + // note peer's stat + int fromosd = op->get_source().num(); + osd->peer_stat_lock.Lock(); + osd->peer_stat[fromosd] = op->get_peer_stat(); + dout(20) << "op_rep_modify got peer " << fromosd << " stat " << op->get_peer_stat() << dendl; + osd->peer_stat_lock.Unlock(); + // we better not be missing this. assert(!missing.is_missing(oid)); @@ -1238,7 +1256,6 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op) } // infer ack from source - int fromosd = op->get_source().num(); get_rep_gather(repop); { //assert(repop->waitfor_ack.count(fromosd)); // no, we may come thru here twice. @@ -1252,12 +1269,14 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op) // chain? forward? if (g_conf.osd_rep == OSD_REP_CHAIN && !is_acker()) { + osd_peer_stat_t stat = osd->my_stat; // FIXME + // chain rep, not at the tail yet. int myrank = osd->osdmap->calc_pg_rank(osd->get_nodeid(), acting); int next = myrank+1; if (next == (int)acting.size()) next = 1; - issue_repop(op, acting[next]); + issue_repop(op, acting[next], stat); } } diff --git a/trunk/ceph/osd/ReplicatedPG.h b/trunk/ceph/osd/ReplicatedPG.h index 8a54129e39811..3d82bcfa7b806 100644 --- a/trunk/ceph/osd/ReplicatedPG.h +++ b/trunk/ceph/osd/ReplicatedPG.h @@ -80,7 +80,7 @@ protected: void get_rep_gather(RepGather*); void apply_repop(RepGather *repop); void put_rep_gather(RepGather*); - void issue_repop(MOSDOp *op, int osd); + void issue_repop(MOSDOp *op, int osd, osd_peer_stat_t& stat); RepGather *new_rep_gather(MOSDOp *op); void repop_ack(RepGather *repop, int result, bool commit, diff --git a/trunk/ceph/osd/osd_types.h b/trunk/ceph/osd/osd_types.h index ca6fd8d6a786e..916a5e166eabb 100644 --- a/trunk/ceph/osd/osd_types.h +++ b/trunk/ceph/osd/osd_types.h @@ -250,6 +250,22 @@ struct pg_stat_t { +struct osd_peer_stat_t { + utime_t stamp; + double oprate; + double qlen; + double read_latency; + osd_peer_stat_t() : oprate(0), qlen(0), read_latency(0) {} +}; + +inline ostream& operator<<(ostream& out, const osd_peer_stat_t &stat) { + return out << "stat(" << stat.stamp + << " oprate=" << stat.oprate + << " qlen=" << stat.qlen + << " read_latency=" << stat.read_latency + << ")"; +} + // ----------------------------------------- class ObjectExtent { -- 2.39.5