]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
balance_reads watches object temperature (hack)
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 29 Aug 2007 21:24:08 +0000 (21:24 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 29 Aug 2007 21:24:08 +0000 (21:24 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1735 29311d96-e01e-0410-9327-a35deaab8ce9

trunk/ceph/config.cc
trunk/ceph/config.h
trunk/ceph/osd/PG.cc
trunk/ceph/osd/PG.h
trunk/ceph/osd/ReplicatedPG.cc

index 6205ec2eaceeb7bb7cd048fd58eaae67083e73a7..5a4e97d64acdeb3a06e3dbf0af60c7213bbb7b6f 100644 (file)
@@ -246,8 +246,9 @@ md_config_t g_conf = {
   osd_rep: OSD_REP_PRIMARY,
 
   osd_balance_reads: false,  // send from client to replica
-  osd_flash_crowd_iat_threshold: 100,
+  osd_flash_crowd_iat_threshold: 0,//100,
   osd_flash_crowd_iat_alpha: 0.125,
+  osd_balance_reads_temp: 100,
   
   osd_shed_reads: false,     // forward from primary to replica
   osd_shed_reads_min_latency: .001,       // 
index 0ff1bb72a7316138c6d1512db24827cd2ddd6789..ae70ac690a3638509ea5a4308777d8d3f0af1e89 100644 (file)
@@ -241,6 +241,7 @@ struct md_config_t {
   bool osd_balance_reads;
   int osd_flash_crowd_iat_threshold;  // flash crowd interarrival time threshold in ms
   double osd_flash_crowd_iat_alpha;
+  double osd_balance_reads_temp;
 
   bool  osd_shed_reads;
   double osd_shed_reads_min_latency;
index 5438fe8fec76108a4697a9c94ebb0866b59dea37..5b55c9a88e1def1617c0354bfedd5d0273d191a0 100644 (file)
@@ -571,6 +571,8 @@ void PG::clear_primary_state()
   peer_info.clear();
   peer_missing.clear();
   
+  stat_object_temp_rd.clear();
+
   last_epoch_started_any = info.last_epoch_started;
 }
 
index 833e8da081501f90511e558e32fb7a82109b4eb8..7898cef817c1957a45e4651595bc5ce0f60b341f 100644 (file)
@@ -24,6 +24,8 @@
 #include "ObjectStore.h"
 #include "msg/Messenger.h"
 
+#include "common/DecayCounter.h"
+
 #include <list>
 #include <string>
 using namespace std;
@@ -497,6 +499,8 @@ protected:
   off_t stat_size;
   off_t stat_num_blocks;
 
+  hash_map<object_t, DecayCounter> stat_object_temp_rd;
+
   Mutex pg_stats_lock;
   pg_stat_t pg_stats;
 
index 397dd7aa672dc000b537822457d0764813b54885..4ea5e9d1afd21b1f94e4155e6db6f1560e52b365 100644 (file)
@@ -106,6 +106,7 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
   if (!op->is_read()) 
     return false;
 
+  object_t oid = op->get_oid();
 
   // -- load balance reads --
   if (g_conf.osd_shed_reads &&
@@ -123,37 +124,52 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
       }
     }
     
-    // -- flash crowd?
+    // -- balance reads?
     if (g_conf.osd_balance_reads &&
        !op->get_source().is_osd() && 
        is_primary()) {
-      // add sample
-      osd->iat_averager.add_sample( op->get_oid(), (double)g_clock.now() );
-      
-      // candidate?
-      bool is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( op->get_oid() );
+      // flash crowd?
+      bool is_flash_crowd_candidate = false;
+      if (g_conf.osd_flash_crowd_iat_threshold > 0) {
+       osd->iat_averager.add_sample( oid, (double)g_clock.now() );
+       is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( oid );
+      }
+
+      // hot?
+      double temp = 0;
+      if (stat_object_temp_rd.count(oid))
+       temp = stat_object_temp_rd[oid].get(op->request_received_time);
+      bool is_hotly_read = temp > g_conf.osd_balance_reads_temp;
+
+      dout(20) << "balance_reads oid " << oid << " temp " << temp 
+               << (is_hotly_read ? " hotly_read":"")
+               << (is_flash_crowd_candidate ? " flash_crowd_candidate":"")
+               << dendl;
+
+      bool should_balance = is_flash_crowd_candidate || is_hotly_read;
       bool is_balanced = false;
       bool b;
-      if (osd->store->getattr(op->get_oid(), "balance-reads", &b, 1) >= 0)
+      // *** FIXME *** this may block, and we're in the fast past! ***
+      if (osd->store->getattr(oid, "balance-reads", &b, 1) >= 0)
        is_balanced = true;
       
-      if (!is_balanced && is_flash_crowd_candidate &&
-         balancing_reads.count(op->get_oid()) == 0) {
-       dout(-10) << "preprocess_op balance-reads on " << op->get_oid() << dendl;
-       balancing_reads.insert(op->get_oid());
+      if (!is_balanced && should_balance &&
+         balancing_reads.count(oid) == 0) {
+       dout(-10) << "preprocess_op balance-reads on " << oid << dendl;
+       balancing_reads.insert(oid);
        MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(),
-                                op->get_oid(),
+                                oid,
                                 ObjectLayout(info.pgid),
                                 osd->osdmap->get_epoch(),
                                 OSD_OP_BALANCEREADS);
        do_op(pop);
       }
-      if (is_balanced && !is_flash_crowd_candidate &&
-         !unbalancing_reads.count(op->get_oid()) == 0) {
-       dout(-10) << "preprocess_op unbalance-reads on " << op->get_oid() << dendl;
-       unbalancing_reads.insert(op->get_oid());
+      if (is_balanced && !should_balance &&
+         !unbalancing_reads.count(oid) == 0) {
+       dout(-10) << "preprocess_op unbalance-reads on " << oid << dendl;
+       unbalancing_reads.insert(oid);
        MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(),
-                                op->get_oid(),
+                                oid,
                                 ObjectLayout(info.pgid),
                                 osd->osdmap->get_epoch(),
                                 OSD_OP_UNBALANCEREADS);
@@ -284,14 +300,14 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
   // -- fastpath read?
   // if this is a read and the data is in the cache, do an immediate read.. 
   if ( g_conf.osd_immediate_read_from_cache ) {
-    if (osd->store->is_cached( op->get_oid() , 
+    if (osd->store->is_cached( oid , 
                               op->get_offset(), 
                               op->get_length() ) == 0) {
       if (!is_primary() && !op->get_source().is_osd()) {
        // am i allowed?
        bool v;
-       if (osd->store->getattr(op->get_oid(), "balance-reads", &v, 1) < 0) {
-         dout(-10) << "preprocess_op in-cache but no balance-reads on " << op->get_oid()
+       if (osd->store->getattr(oid, "balance-reads", &v, 1) < 0) {
+         dout(-10) << "preprocess_op in-cache but no balance-reads on " << oid
                    << ", fwd to primary" << dendl;
          osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
          return true;
@@ -469,11 +485,16 @@ void ReplicatedPG::op_read(MOSDOp *op)
   if (r >= 0) {
     reply->set_result(0);
 
-    utime_t diff = g_clock.now();
+    utime_t now = g_clock.now();
+    utime_t diff = now;
     diff -= op->get_received_time();
     dout(10) <<  "op_read total op latency " << diff << dendl;
     osd->read_latency_calc.add(diff);
 
+    if (is_primary() &&
+       g_conf.osd_balance_reads)
+      stat_object_temp_rd[oid].hit(now);  // hit temp.
+
   } else {
     reply->set_result(r);   // error
   }