]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd load calc changes
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 29 Aug 2007 19:23:23 +0000 (19:23 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 29 Aug 2007 19:23:23 +0000 (19:23 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1733 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/config.cc
trunk/ceph/config.h
trunk/ceph/messages/MOSDOp.h
trunk/ceph/messages/MOSDOpReply.h
trunk/ceph/messages/MOSDPing.h
trunk/ceph/osd/OSD.cc
trunk/ceph/osd/OSD.h
trunk/ceph/osd/ReplicatedPG.cc
trunk/ceph/osd/ReplicatedPG.h
trunk/ceph/osd/osd_types.h

index 26004c143fa473a09cfc6200aa541d5880e9ad42..4d536089c537b2ffab802a99b7a85ed2114b0cf8 100644 (file)
@@ -245,12 +245,16 @@ md_config_t g_conf = {
   // --- osd ---
   osd_rep: OSD_REP_PRIMARY,
 
-  osd_balance_reads: false,
-  osd_immediate_read_from_cache: true, // osds to read from the cache immediately?
-  osd_exclusive_caching: true,         // replicas evict replicated writes
-  osd_load_diff_percent: 20, // load diff for read forwarding
+  osd_balance_reads: false,  // send from client to replica
   osd_flash_crowd_iat_threshold: 100,
   osd_flash_crowd_iat_alpha: 0.125,
+  
+  osd_shed_reads: false,     // forward from primary to replica
+  osd_shed_reads_min_latency: .001,
+  osd_shed_reads_min_load_diff: .2,
+
+  osd_immediate_read_from_cache: true, // osds to read from the cache immediately?
+  osd_exclusive_caching: true,         // replicas evict replicated writes
 
   osd_pg_bits: 0,  // 0 == let osdmonitor decide
   osd_object_layout: OBJECT_LAYOUT_HASHINO,
@@ -804,17 +808,23 @@ void parse_config_options(std::vector<char*>& args)
 
     else if (strcmp(args[i], "--osd_balance_reads") == 0) 
       g_conf.osd_balance_reads = atoi(args[++i]);
-    else if ( strcmp(args[i],"--osd_immediate_read_from_cache" ) == 0)
-      g_conf.osd_immediate_read_from_cache = atoi(args[++i]);
-    else if ( strcmp(args[i],"--osd_exclusive_caching" ) == 0)
-      g_conf.osd_exclusive_caching = atoi(args[++i]);
-    else if (strcmp(args[i], "--osd_load_diff_percent") == 0) 
-      g_conf.osd_load_diff_percent = atoi(args[++i]);
     else if (strcmp(args[i], "--osd_flash_crowd_iat_threshold") == 0) 
       g_conf.osd_flash_crowd_iat_threshold = atoi(args[++i]);
     else if (strcmp(args[i], "--osd_flash_crowd_iat_alpha") == 0) 
       g_conf.osd_flash_crowd_iat_alpha = atoi(args[++i]);
 
+    else if (strcmp(args[i], "--osd_shed_reads") == 0) 
+      g_conf.osd_shed_reads = atoi(args[++i]);
+    else if (strcmp(args[i], "--osd_shed_reads_min_latency") == 0) 
+      g_conf.osd_shed_reads_min_latency = atof(args[++i]);
+    else if (strcmp(args[i], "--osd_shed_reads_min_load_diff") == 0) 
+      g_conf.osd_shed_reads_min_load_diff = atof(args[++i]);
+
+    else if ( strcmp(args[i],"--osd_immediate_read_from_cache" ) == 0)
+      g_conf.osd_immediate_read_from_cache = atoi(args[++i]);
+    else if ( strcmp(args[i],"--osd_exclusive_caching" ) == 0)
+      g_conf.osd_exclusive_caching = atoi(args[++i]);
+
     else if (strcmp(args[i], "--osd_rep") == 0) 
       g_conf.osd_rep = atoi(args[++i]);
     else if (strcmp(args[i], "--osd_rep_chain") == 0) 
index 4b183fdfd2cc910c232a4dcd265544c68869135a..424cddc0df393eaf23a51b36b7c2b7cbe6b59f44 100644 (file)
@@ -238,13 +238,17 @@ struct md_config_t {
   // osd
   int   osd_rep;
 
-  bool  osd_balance_reads;
-  bool  osd_immediate_read_from_cache;
-  bool  osd_exclusive_caching;
-  int  osd_load_diff_percent;
+  bool osd_balance_reads;
   int osd_flash_crowd_iat_threshold;  // flash crowd interarrival time threshold in ms
   double osd_flash_crowd_iat_alpha;
 
+  bool  osd_shed_reads;
+  double osd_shed_reads_min_latency;
+  double osd_shed_reads_min_load_diff;  // .5 == 50%
+
+  bool  osd_immediate_read_from_cache;
+  bool  osd_exclusive_caching;
+
   int   osd_pg_bits;
   int   osd_object_layout;
   int   osd_pg_layout;
index e6695be8ad02cf9ddeb3893fd28923f5c684b765..0b2a1be0f66e9d12381ddbc325c1984695fa15ee 100644 (file)
@@ -113,6 +113,8 @@ private:
     bool   want_ack;
     bool   want_commit;
     bool   retry_attempt;
+
+    osd_peer_stat_t peer_stat;
   } st;
 
   bufferlist data;
@@ -179,6 +181,8 @@ private:
     return request_received_time;
   }
 
+  void set_peer_stat(osd_peer_stat_t& stat) { st.peer_stat = stat; }
+  const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; }
   
   void set_data(bufferlist &d) {
     data.claim(d);
index ed2971296d9a7f6a501f05d766c3f9d090a86e1b..bb5d413228e82b6a4b6018c0126aa561c5727a47 100644 (file)
@@ -51,6 +51,8 @@ class MOSDOpReply : public Message {
     eversion_t pg_complete_thru;
     
     epoch_t map_epoch;
+
+    osd_peer_stat_t peer_stat;
   } st;
 
   bufferlist data;
@@ -85,6 +87,9 @@ class MOSDOpReply : public Message {
   void set_op(int op) { st.op = op; }
   void set_rep_tid(tid_t t) { st.rep_tid = t; }
 
+  void set_peer_stat(osd_peer_stat_t& stat) { st.peer_stat = stat; }
+  const osd_peer_stat_t& get_peer_stat() { return st.peer_stat; }
+
   // data payload
   void set_data(bufferlist &d) {
     data.claim(d);
index dda21888c31d72852d4b931438e4688757dc985e..37be289c0a9233c79cca0320ecd8742e9decb6b3 100644 (file)
 #include "common/Clock.h"
 
 #include "msg/Message.h"
+#include "osd/osd_types.h"
 
 
 class MOSDPing : public Message {
  public:
   epoch_t map_epoch;
   bool ack;
-  float avg_qlen;
-  double read_mean_time;
+  osd_peer_stat_t peer_stat;
 
-  MOSDPing(epoch_t e, 
-          float aq,
-          double _read_mean_time,
-          bool a=false) : Message(MSG_OSD_PING), map_epoch(e), ack(a), avg_qlen(aq), read_mean_time(_read_mean_time) {
-  }
+  MOSDPing(epoch_t e, osd_peer_stat_t& ps, bool a=false) : 
+    Message(MSG_OSD_PING), map_epoch(e), ack(a), peer_stat(ps) { }
   MOSDPing() {}
 
   virtual void decode_payload() {
     int off = 0;
     ::_decode(map_epoch, payload, off);
     ::_decode(ack, payload, off);
-    ::_decode(avg_qlen, payload, off);
-    ::_decode(read_mean_time, payload, off);
+    ::_decode(peer_stat, payload, off);
   }
   virtual void encode_payload() {
     ::_encode(map_epoch, payload);
     ::_encode(ack, payload);
-    ::_encode(avg_qlen, payload);
-    ::_encode(read_mean_time, payload);
+    ::_encode(peer_stat, payload);
   }
 
   virtual char *get_type_name() { return "osd_ping"; }
index 3dcf3b0de26dacf47824d2b7acf7f9c404ac82e7..e601757bb231bfef52cd0e75ff3b9ef4542d0ff8 100644 (file)
@@ -109,7 +109,9 @@ LogType osd_logtype;
 
 OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) : 
   timer(osd_lock),
-  load_calc(g_conf.osd_max_opq<1?1:g_conf.osd_max_opq),
+  stat_oprate(5.0),
+  read_latency_calc(g_conf.osd_max_opq<1?1:g_conf.osd_max_opq),
+  qlen_calc(3),
   iat_averager(g_conf.osd_flash_crowd_iat_alpha)
 {
   whoami = id;
@@ -124,8 +126,8 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev) :
 
   state = STATE_BOOTING;
 
-  hb_stat_ops = 0;
-  hb_stat_qlen = 0;
+  stat_ops = 0;
+  stat_qlen = 0;
 
   pending_ops = 0;
   waiting_for_no_ops = false;
@@ -599,6 +601,31 @@ void OSD::activate_pg(pg_t pgid, epoch_t epoch)
 
 // -------------------------------------
 
+void OSD::_refresh_my_stat(utime_t now)
+{
+  assert(peer_stat_lock.is_locked());
+
+  // refresh?
+  if (now - my_stat.stamp > .5) {
+    my_stat.stamp = now;
+
+    my_stat.oprate = stat_oprate.get(now);
+
+    // qlen
+    //double qlen = 0;
+    //if (stat_ops) qlen = 
+    //qlen_calc.add(qlen);
+    my_stat.qlen = (float)stat_qlen / (float)stat_ops;  //get_average();
+    stat_ops = 0;
+    stat_qlen = 0;
+
+    my_stat.read_latency = read_latency_calc.get_average();
+    if (my_stat.read_latency < 0) my_stat.read_latency = 0;
+
+    dout(-10) << "_refresh_my_stat " << my_stat << dendl;
+  }
+}
+
 void OSD::heartbeat()
 {
   utime_t now = g_clock.now();
@@ -606,21 +633,13 @@ void OSD::heartbeat()
   since.sec_ref() -= g_conf.osd_heartbeat_interval;
 
   // calc my stats
-  float avg_qlen = 0;
-  if (hb_stat_ops) avg_qlen = (float)hb_stat_qlen / (float)hb_stat_ops;
+  peer_stat_lock.Lock();
+  _refresh_my_stat(now);
+  dout(-5) << "heartbeat: " << my_stat << dendl;
+  peer_stat_lock.Unlock();
 
-  double read_mean_time = load_calc.get_average();
-
-  dout(5) << "heartbeat " << now 
-         << ": ops " << hb_stat_ops
-         << ", avg qlen " << avg_qlen
-         << ", mean read time " << read_mean_time
-         << dendl;
+  //load_calc.set_size(stat_ops);
   
-  // reset until next time around
-  hb_stat_ops = 0;
-  hb_stat_qlen = 0;
-
   // send pings
   set<int> pingset;
   for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
@@ -641,10 +660,8 @@ void OSD::heartbeat()
        i != pingset.end();
        i++) {
     _share_map_outgoing( osdmap->get_inst(*i) );
-    messenger->send_message(new MOSDPing(osdmap->get_epoch(), 
-                               avg_qlen, 
-                               read_mean_time ), 
-                               osdmap->get_inst(*i));
+    messenger->send_message(new MOSDPing(osdmap->get_epoch(), my_stat),
+                           osdmap->get_inst(*i));
   }
 
   if (logger) logger->set("pingset", pingset.size());
@@ -937,8 +954,9 @@ void OSD::handle_osd_ping(MOSDPing *m)
   _share_map_incoming(m->get_source_inst(), ((MOSDPing*)m)->map_epoch);
   
   int from = m->get_source().num();
-  peer_qlen[from] = m->avg_qlen;
-  peer_read_time[from] = m->read_mean_time;
+  peer_stat_lock.Lock();
+  peer_stat[from] = m->peer_stat;
+  peer_stat_lock.Unlock();
 
   delete m;
 }
@@ -1947,11 +1965,13 @@ void OSD::handle_op(MOSDOp *op)
 
   // mark the read request received time for finding the 
   // read througput load.  
-  op->set_received_time(g_clock.now());
+  utime_t now = g_clock.now();
+  op->set_received_time(now);
 
   // update qlen stats
-  hb_stat_ops++;
-  hb_stat_qlen += pending_ops;
+  stat_oprate.hit(now);
+  stat_ops++;
+  stat_qlen += pending_ops;
 
   // require same or newer map
   if (!require_same_or_newer_map(op, op->get_map_epoch())) {
index eaab58511509a6092c7c707d8d719934568d7b90..ed2006b9ec4e00ac14745ceccee0078742edaaf5 100644 (file)
@@ -26,6 +26,8 @@
 #include "ObjectStore.h"
 #include "PG.h"
 
+#include "common/DecayCounter.h"
+
 
 #include <map>
 using namespace std;
@@ -49,24 +51,92 @@ public:
   static const int STATE_STOPPING = 3;
 
 
+
+  /** OSD **/
+protected:
+  Mutex osd_lock;     // global lock
+  SafeTimer timer;    // safe timer
+
+  Messenger   *messenger; 
+  Logger      *logger;
+  ObjectStore *store;
+  MonMap      *monmap;
+
+  int whoami;
+  char dev_path[100];
+
+public:
+  int get_nodeid() { return whoami; }
+  
+private:
+  /** superblock **/
+  OSDSuperblock superblock;
+  epoch_t  boot_epoch;      
+
+  object_t get_osdmap_object_name(epoch_t epoch) { return object_t(0,epoch << 1); }
+  object_t get_inc_osdmap_object_name(epoch_t epoch) { return object_t(0, (epoch << 1) + 1); }
+  
+  void write_superblock();
+  void write_superblock(ObjectStore::Transaction& t);
+  int read_superblock();
+
+
+  // -- state --
+  int state;
+
+public:
+  bool is_booting() { return state == STATE_BOOTING; }
+  bool is_active() { return state == STATE_ACTIVE; }
+  bool is_stopping() { return state == STATE_STOPPING; }
+
+private:
+
+  // heartbeat
+  void heartbeat();
+
+  class C_Heartbeat : public Context {
+    OSD *osd;
+  public:
+    C_Heartbeat(OSD *o) : osd(o) {}
+    void finish(int r) {
+      osd->heartbeat();
+    }
+  };
+
+
+  // -- stats --
+  DecayCounter stat_oprate;
+  int stat_ops;  // ops since last heartbeat
+  int stat_qlen; // cumulative queue length since last refresh
+  
+  Mutex peer_stat_lock;
+  osd_peer_stat_t my_stat;
+  hash_map<int, osd_peer_stat_t> peer_stat;
+
+  void _refresh_my_stat(utime_t now);
+  
   // load calculation
   //current implementation is moving averges.
-  class LoadCalculator {
+  class MovingAverager {
   private:
     Mutex lock;
-    deque<double> m_Data ;
-    unsigned m_Size ;
-    double  m_Total ;
+    deque<double> m_Data;
+    unsigned m_Size;
+    double m_Total;
     
   public:
-    LoadCalculator( unsigned size ) : m_Size(0), m_Total(0) { }
+    MovingAverager(unsigned size) : m_Size(size), m_Total(0) { }
+
+    void set_size(unsigned size) {
+      m_Size = size;
+    }
 
-    void add( double element ) {
+    void add(double value) {
       Mutex::Locker locker(lock);
 
       // add item
-      m_Data.push_back(element);
-      m_Total += element;
+      m_Data.push_back(value);
+      m_Total += value;
 
       // trim
       while (m_Data.size() > m_Size) {
@@ -77,12 +147,10 @@ public:
     
     double get_average() {
       Mutex::Locker locker(lock);
-
-      if (m_Data.empty())
-       return -1;
+      if (m_Data.empty()) return -1;
       return m_Total / (double)m_Data.size();
     }
-  };
+  } read_latency_calc, qlen_calc;
 
   class IATAverager {
   public:
@@ -125,70 +193,8 @@ public:
     }
   };
 
-
-  /** OSD **/
-protected:
-  Mutex osd_lock;     // global lock
-  SafeTimer timer;    // safe timer
-
-  Messenger   *messenger; 
-  Logger      *logger;
-  ObjectStore *store;
-  MonMap      *monmap;
-
-  LoadCalculator load_calc;
   IATAverager    iat_averager;
-  
-  int whoami;
-  char dev_path[100];
-
-public:
-  int get_nodeid() { return whoami; }
-  
-private:
-  /** superblock **/
-  OSDSuperblock superblock;
-  epoch_t  boot_epoch;      
-
-  object_t get_osdmap_object_name(epoch_t epoch) { return object_t(0,epoch << 1); }
-  object_t get_inc_osdmap_object_name(epoch_t epoch) { return object_t(0, (epoch << 1) + 1); }
-  
-  void write_superblock();
-  void write_superblock(ObjectStore::Transaction& t);
-  int read_superblock();
-
-
-  // -- state --
-  int state;
-
-public:
-  bool is_booting() { return state == STATE_BOOTING; }
-  bool is_active() { return state == STATE_ACTIVE; }
-  bool is_stopping() { return state == STATE_STOPPING; }
-
-private:
-
-  // heartbeat
-  void heartbeat();
-
-  class C_Heartbeat : public Context {
-    OSD *osd;
-  public:
-    C_Heartbeat(OSD *o) : osd(o) {}
-    void finish(int r) {
-      osd->heartbeat();
-    }
-  };
-
-
-  // -- stats --
-  int hb_stat_ops;  // ops since last heartbeat
-  int hb_stat_qlen; // cumulative queue length since last hb
-
-  hash_map<int, float>  peer_qlen;
-  hash_map<int, double> peer_read_time;
-  
-
 
   // -- waiters --
   list<class Message*> finished;
index 78723fa3a0bc0d9609ba7a374b7d6fe55a11d024..39b4b6cee14ab982a517737917f5b717d6cca5b8 100644 (file)
@@ -108,7 +108,7 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
 
 
   // -- load balance reads --
-  if (g_conf.osd_balance_reads &&
+  if (g_conf.osd_shed_reads &&
       is_primary() &&
       g_conf.osd_rep == OSD_REP_PRIMARY) {
     // -- read on primary+acker ---
@@ -124,7 +124,8 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
     }
     
     // -- flash crowd?
-    if (!op->get_source().is_osd() && 
+    if (g_conf.osd_balance_reads &&
+       !op->get_source().is_osd() && 
        is_primary()) {
       // add sample
       osd->iat_averager.add_sample( op->get_oid(), (double)g_clock.now() );
@@ -160,55 +161,56 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
       }
     }
     
-    
+    // -- read shedding
+
     // check my load. 
     // TODO xxx we must also compare with our own load
     // if i am x percentage higher than replica , 
     // redirect the read 
     
-    if ( g_conf.osd_balance_reads == LOAD_LATENCY) {
-      double mean_read_time = osd->load_calc.get_average();
-      
-      if ( mean_read_time != -1 ) {
-       
-       for (unsigned i=1; 
-            i<acting.size(); 
-            ++i) {
+    if (g_conf.osd_shed_reads == LOAD_LATENCY) {
+      Mutex::Locker lock(osd->peer_stat_lock);
+
+      // above some minimum?
+      if (osd->my_stat.read_latency >= .001) {  
+       for (unsigned i=1; i<acting.size(); ++i) {
          int peer = acting[i];
+         if (osd->peer_stat.count(peer) == 0) continue;
          
-         dout(10) << "my read time " << mean_read_time 
-                  << "peer_readtime" << osd->peer_read_time[peer] 
+         dout(-10) << "preprocess_op my read latency " << osd->my_stat.read_latency
+                  << ", peer " << peer << " is " << osd->peer_stat[peer].read_latency
                   << " of peer" << peer << dendl;
          
-         if ( osd->peer_read_time.count(peer) &&
-              ( (osd->peer_read_time[peer]*100/mean_read_time) <
-                (100 - g_conf.osd_load_diff_percent))) {
+         if ((osd->peer_stat[peer].read_latency/osd->my_stat.read_latency) <
+             (1.0 - g_conf.osd_shed_reads_min_load_diff)) {
            dout(10) << " forwarding to peer osd" << peer << dendl;
-           
            osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
            return true;
          }
        }
       }
     }
-    else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE ) {
+    else if (g_conf.osd_shed_reads == LOAD_QUEUE_SIZE && osd->stat_ops) {
+      Mutex::Locker lock(osd->peer_stat_lock);
+
       // am i above my average?
-      float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
+      float my_avg = osd->stat_qlen / osd->stat_ops;
       
       if (osd->pending_ops > my_avg) {
        // is there a peer who is below my average?
        for (unsigned i=1; i<acting.size(); ++i) {
          int peer = acting[i];
-         if (osd->peer_qlen.count(peer) &&
-             osd->peer_qlen[peer] < my_avg) {
+         Mutex::Locker lock(osd->peer_stat_lock);
+         if (osd->peer_stat.count(peer) &&
+             osd->peer_stat[peer].qlen < my_avg) {
            // calculate a probability that we should redirect
-           float p = (my_avg - osd->peer_qlen[peer]) / my_avg;             // this is dumb.
+           float p = (my_avg - osd->peer_stat[peer].qlen) / my_avg;             // this is dumb.
            
            if (drand48() <= p) {
              // take the first one
              dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << my_avg
                       << ", p=" << p 
-                      << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
+                      << ", fwd to peer w/ qlen " << osd->peer_stat[peer].qlen
                       << " osd" << peer
                       << dendl;
              osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
@@ -219,31 +221,32 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
       }
     }
     
-    else if ( g_conf.osd_balance_reads == LOAD_HYBRID ) {
+    else if ( g_conf.osd_shed_reads == LOAD_HYBRID ) {
       // am i above my average?
-      float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
+      float my_avg = osd->stat_qlen / osd->stat_ops;
       
       if (osd->pending_ops > my_avg) {
        // is there a peer who is below my average?
        for (unsigned i=1; i<acting.size(); ++i) {
          int peer = acting[i];
-         if (osd->peer_qlen.count(peer) &&
-             osd->peer_qlen[peer] < my_avg) {
+         Mutex::Locker lock(osd->peer_stat_lock);
+         if (osd->peer_stat.count(peer) &&
+             osd->peer_stat[peer].qlen < my_avg) {
            // calculate a probability that we should redirect
-           //float p = (my_avg - peer_qlen[peer]) / my_avg;             // this is dumb.
+           //float p = (my_avg - peer_stat[peer].qlen) / my_avg;             // this is dumb.
            
-           double mean_read_time = osd->load_calc.get_average();
+           double mean_read_time = osd->my_stat.read_latency;
            
            if ( mean_read_time != -1 &&  
-                osd->peer_read_time.count(peer) &&
-                ( (osd->peer_read_time[peer]*100/mean_read_time) <
-                  ( 100 - g_conf.osd_load_diff_percent) ) )
+                osd->peer_stat.count(peer) &&
+                ( (osd->peer_stat[peer].read_latency/mean_read_time) <
+                  ( 1.0 - g_conf.osd_shed_reads_min_load_diff) ) )
              //if (drand48() <= p) {
              // take the first one
              dout(10) << "using hybrid :my qlen " << osd->pending_ops << " > my_avg " << my_avg
                       << "my read time  "<<  mean_read_time
-                      << "peer read time " << osd->peer_read_time[peer]  
-                      << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
+                      << "peer read time " << osd->peer_stat[peer].read_latency
+                      << ", fwd to peer w/ qlen " << osd->peer_stat[peer].read_latency
                       << " osd" << peer
                       << dendl;
            osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
@@ -261,11 +264,11 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
     if (osd->store->is_cached( op->get_oid() , 
                               op->get_offset(), 
                               op->get_length() ) == 0) {
-      if (!is_primary()) {
+      if (!is_primary() && !op->get_source().is_osd()) {
        // am i allowed?
        bool v;
        if (osd->store->getattr(op->get_oid(), "balance-reads", &v, 1) < 0) {
-         dout(10) << "preprocess_op in-cache but no balance-reads on " << op->get_oid()
+         dout(-10) << "preprocess_op in-cache but no balance-reads on " << op->get_oid()
                    << ", fwd to primary" << dendl;
          osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
          return true;
@@ -439,16 +442,15 @@ void ReplicatedPG::op_read(MOSDOp *op)
   if (r >= 0) {
     reply->set_result(0);
 
-    dout(10) <<  "READ TIME DIFF"
-            << (double)g_clock.now()-op->get_received_time()
-            << dendl;
-    osd->load_calc.add((double)g_clock.now() - op->get_received_time());
+    utime_t diff = g_clock.now();
+    diff -= op->get_received_time();
+    dout(10) <<  "op_read total op latency " << diff << dendl;
+    osd->read_latency_calc.add(diff);
 
   } else {
     reply->set_result(r);   // error
   }
   
   // send it
   osd->messenger->send_message(reply, op->get_client_inst());
   
@@ -803,7 +805,7 @@ void ReplicatedPG::put_rep_gather(RepGather *repop)
 }
 
 
-void ReplicatedPG::issue_repop(MOSDOp *op, int dest)
+void ReplicatedPG::issue_repop(MOSDOp *op, int dest, osd_peer_stat_t& stat)
 {
   object_t oid = op->get_oid();
   
@@ -825,6 +827,8 @@ void ReplicatedPG::issue_repop(MOSDOp *op, int dest)
   
   wr->set_rep_tid(op->get_rep_tid());
   wr->set_pg_trim_to(peers_complete_thru);
+
+  wr->set_peer_stat(stat);
   
   osd->messenger->send_message(wr, osd->osdmap->get_inst(dest));
 }
@@ -1122,6 +1126,13 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     osd->logger->inc("c_wrb", op->get_length());
   }
 
+  // note my stats
+  osd->peer_stat_lock.Lock();
+  osd->_refresh_my_stat(g_clock.now());
+  osd_peer_stat_t stat = osd->my_stat;
+  osd->peer_stat_lock.Unlock();
+
+
   // issue replica writes
   RepGather *repop = 0;
   bool alone = (acting.size() == 1);
@@ -1133,13 +1144,13 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     int next = acting[1];
     if (acting.size() > 2)
       next = acting[2];
-    issue_repop(op, next);
+    issue_repop(op, next, stat);
   } 
   else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) {
     // splay rep.  send to rest.
     for (unsigned i=1; i<acting.size(); ++i)
     //for (unsigned i=acting.size()-1; i>=1; --i)
-      issue_repop(op, acting[i]);
+      issue_repop(op, acting[i], stat);
   } else {
     // primary rep, or alone.
     repop = new_rep_gather(op);
@@ -1147,7 +1158,7 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     // send to rest.
     if (!alone)
       for (unsigned i=1; i<acting.size(); i++)
-        issue_repop(op, acting[i]);
+        issue_repop(op, acting[i], stat);
   }
 
   if (repop) {    
@@ -1216,6 +1227,13 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op)
            << " " << op->get_offset() << "~" << op->get_length()
            << dendl;  
   
+  // note peer's stat
+  int fromosd = op->get_source().num();
+  osd->peer_stat_lock.Lock();
+  osd->peer_stat[fromosd] = op->get_peer_stat();
+  dout(20) << "op_rep_modify got peer " << fromosd << " stat " << op->get_peer_stat() << dendl;
+  osd->peer_stat_lock.Unlock();
+
   // we better not be missing this.
   assert(!missing.is_missing(oid));
 
@@ -1238,7 +1256,6 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op)
       }
       
       // infer ack from source
-      int fromosd = op->get_source().num();
       get_rep_gather(repop);
       {
         //assert(repop->waitfor_ack.count(fromosd));   // no, we may come thru here twice.
@@ -1252,12 +1269,14 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op)
 
     // chain?  forward?
     if (g_conf.osd_rep == OSD_REP_CHAIN && !is_acker()) {
+      osd_peer_stat_t stat = osd->my_stat; // FIXME
+      
       // chain rep, not at the tail yet.
       int myrank = osd->osdmap->calc_pg_rank(osd->get_nodeid(), acting);
       int next = myrank+1;
       if (next == (int)acting.size())
        next = 1;
-      issue_repop(op, acting[next]);   
+      issue_repop(op, acting[next], stat);     
     }
   }
 
index 8a54129e3981163ea7516e0627b85d0b17cf016e..3d82bcfa7b80668040ffc6fc607017ade8f416b7 100644 (file)
@@ -80,7 +80,7 @@ protected:
   void get_rep_gather(RepGather*);
   void apply_repop(RepGather *repop);
   void put_rep_gather(RepGather*);
-  void issue_repop(MOSDOp *op, int osd);
+  void issue_repop(MOSDOp *op, int osd, osd_peer_stat_t& stat);
   RepGather *new_rep_gather(MOSDOp *op);
   void repop_ack(RepGather *repop,
                  int result, bool commit,
index ca6fd8d6a786e3ee23fbee8ffaae8be126ffb957..916a5e166eabbdde637093990786b215eb81d294 100644 (file)
@@ -250,6 +250,22 @@ struct pg_stat_t {
 
 
 
+struct osd_peer_stat_t {
+  utime_t stamp;
+  double oprate;
+  double qlen;
+  double read_latency;
+  osd_peer_stat_t() : oprate(0), qlen(0), read_latency(0) {}
+};
+
+inline ostream& operator<<(ostream& out, const osd_peer_stat_t &stat) {
+  return out << "stat(" << stat.stamp
+            << " oprate=" << stat.oprate
+            << " qlen=" << stat.qlen
+            << " read_latency=" << stat.read_latency
+            << ")";
+}
+
 // -----------------------------------------
 
 class ObjectExtent {