]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
* recast in terms of read-balance attribute
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 6 Jun 2007 16:01:43 +0000 (16:01 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 6 Jun 2007 16:01:43 +0000 (16:01 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1399 29311d96-e01e-0410-9327-a35deaab8ce9

branches/sage/pgs/client/SyntheticClient.cc
branches/sage/pgs/client/SyntheticClient.h
branches/sage/pgs/messages/MOSDOp.h
branches/sage/pgs/osd/OSD.cc
branches/sage/pgs/osd/OSD.h
branches/sage/pgs/osd/PG.cc
branches/sage/pgs/osd/PG.h
branches/sage/pgs/osd/ReplicatedPG.cc
branches/sage/pgs/osd/ReplicatedPG.h

index 7618c627adb6b19dbbdbb75f1269593fde95ee7a..a42a8e8b94632d038e6a33dd4c6e22f832673f81 100644 (file)
@@ -482,7 +482,7 @@ int SyntheticClient::run()
         int iarg1 = iargs.front();  iargs.pop_front();
         int iarg2 = iargs.front();  iargs.pop_front();
         if (run_me())
-          read_file(sarg1, iarg1, iarg2);
+          read_file(sarg1, iarg1, iarg2, true);
       }
       break;
     case SYNCLIENT_MODE_WRITEBATCH:
@@ -1138,7 +1138,7 @@ int SyntheticClient::write_batch(int nfile, int size, int wrsize)
   return 0;
 }
 
-int SyntheticClient::read_file(string& fn, int size, int rdsize)   // size is in MB, wrsize in bytes
+int SyntheticClient::read_file(string& fn, int size, int rdsize, bool ignoreprint)   // size is in MB, wrsize in bytes
 {
   char *buf = new char[rdsize]; 
   memset(buf, 1, rdsize);
@@ -1169,14 +1169,14 @@ int SyntheticClient::read_file(string& fn, int size, int rdsize)   // size is in
       p++;
       if (readoff != wantoff ||
          readclient != client->get_nodeid()) {
-        if (!bad)
+        if (!bad && !ignoreprint)
           dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient
                  << ", should be offset " << wantoff << " clietn " << client->get_nodeid()
                  << endl;
         bad++;
       }
     }
-    if (bad) 
+    if (bad && !ignoreprint
       dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl;
   }
   
index cd6033f1fcc12a8b3a87fc30fffcff2a42f2fdf6..dc1cf58121d261488330d32e950ed23b8767b00b 100644 (file)
@@ -213,7 +213,7 @@ class SyntheticClient {
 
   int write_file(string& fn, int mb, int chunk);
   int write_batch(int nfile, int mb, int chunk);
-  int read_file(string& fn, int mb, int chunk);
+  int read_file(string& fn, int mb, int chunk, bool ignoreprint=false);
   int read_random(string& fn, int mb, int chunk);
   int read_random_ex(string& fn, int mb, int chunk);
 
index fc3c410094fc1c2583df51c75e47f5cd034952bb..220da427aeb02474486cbbb3d35f44117096554d 100644 (file)
 #define OSD_OP_UPLOCK     24
 #define OSD_OP_DNLOCK     25
 
-#define OSD_OP_PRIMARYLOCK    26
-#define OSD_OP_PRIMARYUNLOCK  27
-
 #define OSD_OP_PULL       30
 #define OSD_OP_PUSH       31
 
+#define OSD_OP_BALANCEREADS    101
+#define OSD_OP_UNBALANCEREADS  102
+
 
 
 class MOSDOp : public Message {
@@ -74,8 +74,8 @@ public:
     case OSD_OP_UPLOCK: return "uplock"; 
     case OSD_OP_DNLOCK: return "dnlock"; 
 
-    case OSD_OP_PRIMARYLOCK: return "primary-lock";
-    case OSD_OP_PRIMARYUNLOCK: return "primary-unlock";
+    case OSD_OP_BALANCEREADS: return "balance-reads";
+    case OSD_OP_UNBALANCEREADS: return "unbalance-reads";
 
     case OSD_OP_PULL: return "pull";
     case OSD_OP_PUSH: return "push";
@@ -129,6 +129,11 @@ private:
   const entity_inst_t& get_client_inst() { return st.client; }
   void set_client_inst(const entity_inst_t& i) { st.client = i; }
 
+  bool wants_reply() {
+    if (st.op < 100) return true;
+    return false;  // no reply needed for primary-lock, -unlock.
+  }
+
   const tid_t       get_rep_tid() { return st.rep_tid; }
   void set_rep_tid(tid_t t) { st.rep_tid = t; }
 
index 3d4af30d3b03df54db5baf7a675f148ded64ac03..96e80e8337c01f6c206a2bcff19d36c12777e6d2 100644 (file)
@@ -616,12 +616,15 @@ void OSD::activate_pg(pg_t pgid, epoch_t epoch)
   }
 
   // finishers?
+  finished_lock.Lock();
   if (finished.empty()) {
+    finished_lock.Unlock();
     osd_lock.Unlock();
   } else {
     list<Message*> waiting;
     waiting.splice(waiting.begin(), finished);
 
+    finished_lock.Unlock();
     osd_lock.Unlock();
     
     for (list<Message*>::iterator it = waiting.begin();
@@ -858,10 +861,12 @@ void OSD::dispatch(Message *m)
   }
 
   // finishers?
+  finished_lock.Lock();
   if (!finished.empty()) {
     list<Message*> waiting;
     waiting.splice(waiting.begin(), finished);
 
+    finished_lock.Unlock();
     osd_lock.Unlock();
     
     for (list<Message*>::iterator it = waiting.begin();
@@ -872,6 +877,7 @@ void OSD::dispatch(Message *m)
     return;
   }
   
+  finished_lock.Unlock();
   osd_lock.Unlock();
 }
 
@@ -1530,17 +1536,17 @@ bool OSD::require_current_map(Message *m, epoch_t ep)
  */
 bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch)
 {
-  dout(10) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ")" << dendl;
+  dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ")" << dendl;
 
   // newer map?
   if (epoch > osdmap->get_epoch()) {
-    dout(7) << "  from newer map epoch " << epoch << " > " << osdmap->get_epoch() << dendl;
+    dout(7) << "waiting for newer map epoch " << epoch << " > my " << osdmap->get_epoch() << dendl;
     wait_for_new_map(m);
     return false;
   }
 
   if (epoch < boot_epoch) {
-    dout(7) << "  from pre-boot epoch " << epoch << " < " << boot_epoch << dendl;
+    dout(7) << "from pre-boot epoch " << epoch << " < " << boot_epoch << dendl;
     delete m;
     return false;
   }
@@ -2072,7 +2078,7 @@ void OSD::handle_op(MOSDOp *op)
       return;
     }
 
-    dout(10) << "handle_op " << *op << " in " << *pg << endl;
+    dout(10) << "handle_op " << *op << " in " << *pg << dendl;
 
   } else {
     // REPLICATION OP (it's from another OSD)
@@ -2260,32 +2266,3 @@ void OSD::wait_for_no_ops()
 
 
 
-// ==============================
-// Object locking
-
-//
-// If the target object of the operation op is locked for writing by another client, the function puts op to the waiting queue waiting_for_wr_unlock
-// returns true if object was locked, otherwise returns false
-// 
-bool OSD::block_if_wrlocked(MOSDOp* op)
-{
-  object_t oid = op->get_oid();
-
-  entity_name_t source;
-  int len = store->getattr(oid, "wrlock", &source, sizeof(entity_name_t));
-  //cout << "getattr returns " << len << " on " << oid << dendl;
-
-  if (len == sizeof(source) &&
-      source != op->get_client()) {
-    //the object is locked for writing by someone else -- add the op to the waiting queue      
-    waiting_for_wr_unlock[oid].push_back(op);
-    return true;
-  }
-
-  return false; //the object wasn't locked, so the operation can be handled right away
-}
-
-
-
-// ===============================
-// OPS
index 3701e1d49eb3a0347b5653b29c225aec7bb34463..d067260c4729d22a4da216e2ab6063d28d17e7fb 100644 (file)
@@ -175,24 +175,20 @@ private:
   int hb_stat_ops;  // ops since last heartbeat
   int hb_stat_qlen; // cumulative queue length since last hb
 
-  hash_map<int, float> peer_qlen;
+  hash_map<int, float>  peer_qlen;
   hash_map<int, double> peer_read_time;
   
 
   // -- waiters --
   list<class Message*> finished;
-
+  Mutex finished_lock;
+  
   void take_waiters(list<class Message*>& ls) {
+    finished_lock.Lock();
     finished.splice(finished.end(), ls);
+    finished_lock.Unlock();
   }
   
-  // -- object locking --
-  hash_map<object_t, list<Message*> > waiting_for_wr_unlock; 
-  hash_map<object_t, list<Message*> > waiting_for_primary_unlock; 
-  
-  bool block_if_wrlocked(class MOSDOp* op);
-
-
   // -- op queue --
   class ThreadPool<class OSD*, pg_t>   *threadpool;
   hash_map<pg_t, list<Message*> >       op_queue;
index ad93fb701ad5225338f1385842220153bbdf3c18..2592bd9ca69f8cf07acc017990b33743d7268f6a 100644 (file)
@@ -1141,6 +1141,31 @@ void PG::read_log(ObjectStore *store)
 
 
 
+// ==============================
+// Object locking
+
+//
+// If the target object of the operation op is locked for writing by another client, the function puts op to the waiting queue waiting_for_wr_unlock
+// returns true if object was locked, otherwise returns false
+// 
+bool PG::block_if_wrlocked(MOSDOp* op)
+{
+  object_t oid = op->get_oid();
+
+  entity_name_t source;
+  int len = osd->store->getattr(oid, "wrlock", &source, sizeof(entity_name_t));
+  //cout << "getattr returns " << len << " on " << oid << dendl;
+  
+  if (len == sizeof(source) &&
+      source != op->get_client()) {
+    //the object is locked for writing by someone else -- add the op to the waiting queue      
+    waiting_for_wr_unlock[oid].push_back(op);
+    return true;
+  }
+  
+  return false; //the object wasn't locked, so the operation can be handled right away
+}
+
 
 
 
@@ -1201,3 +1226,4 @@ bool PG::pick_object_rev(object_t& oid)
 
 
 
+
index 591abacda46a0f6715be1d11a0c8f050a02b31ef..e591bd3f457f08c4eb7486f4aa3bc00bc1af39b2 100644 (file)
@@ -461,6 +461,11 @@ protected:
            list<class Message*> > waiting_for_missing_object;   
   map<eversion_t,class MOSDOp*>   replay_queue;
   
+  hash_map<object_t, list<Message*> > waiting_for_wr_unlock; 
+
+  bool block_if_wrlocked(MOSDOp* op);
+
+
   // recovery
   map<object_t, eversion_t> objects_pulling;  // which objects are currently being pulled
   
@@ -549,7 +554,12 @@ public:
   void       set_role(int r) { role = r; }
 
   bool       is_primary() const { return role == PG_ROLE_HEAD; }
-  bool       is_acker() const { return role == PG_ROLE_ACKER; }
+  bool       is_acker() const { 
+    if (g_conf.osd_rep == OSD_REP_PRIMARY)
+      return is_primary();
+    else
+      return role == PG_ROLE_ACKER; 
+  }
   bool       is_head() const { return role == PG_ROLE_HEAD; }
   bool       is_middle() const { return role == PG_ROLE_MIDDLE; }
   bool       is_residual() const { return role == PG_ROLE_STRAY; }
index c1d80baaa223d0fb0dd2d89710a9978187888162..ebadf9ded163f4ef58262a6307bccf4fbe5951e2 100644 (file)
@@ -108,190 +108,142 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
 
 
   // -- load balance reads --
-  if (g_conf.osd_balance_reads) {
-
-    // replicate/unreplicate?
-    if (!is_acker()) {
-      // -- replica --
-      if (op->get_op() == OSD_OP_REPLICATE) {
-       dout(-10) << "preprocess_op replicating " << op->get_oid() << endl;
-       replicated_objects.insert(op->get_oid());
-       delete op;
-       return true;
-      }
-      if (op->get_op() == OSD_OP_UNREPLICATE) {
-       dout(-10) << "preprocess_op un-replicating " << op->get_oid() << endl;
-       replicated_objects.erase(op->get_oid());
-       delete op;
+  if (g_conf.osd_balance_reads &&
+      is_primary() &&
+      g_conf.osd_rep == OSD_REP_PRIMARY) {
+    // -- read on primary+acker ---
+    
+    // test
+    if (false) {
+      if (acting.size() > 1) {
+       int peer = acting[1];
+       dout(-10) << "preprocess_op fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl;
+       osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
        return true;
       }
+    }
     
-      if (!op->get_source().is_osd()) {
-       // -- read on replica --
-       if (!osd->store->exists(op->get_oid())) {
-         // fwd to primary
-         dout(-10) << "preprocess_op got read on replica, object dne, fwd to primary" << endl;
-         osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
-         return true;
-       }
-
-       // primary lock?
-       //  FIXME: this may cause a (blocking) stat+disk seek.
-       char v;
-       if (osd->store->getattr(op->get_oid(), "primary-lock", &v, 1) >= 0) {
-         dout(-10) << "preprocess_op primary-lock on " << op->get_oid() << " fwd to primary" << endl;
-         osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
-         return true;
-       }
-
-       // in replicate list?
-       if (replicated_objects.count(op->get_oid())) {
-         // yes.  continue.  
-         // note that we've already failed the fastpath above.
-         dout(-10) << "preprocess_op got read on replica, object replicated, processing/queuing as usual" << endl;
-       } else {
-         // no.  forward to primary.
-         dout(-10) << "preprocess_op got read on replica, object not replicated, fwd to primary" << endl;
-         osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
-         return true;
-       }
+    // -- flash crowd?
+    if (!op->get_source().is_osd()) {
+      // candidate?
+      bool is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( op->get_oid() );
+      bool is_balanced = false;
+      bool b;
+      if (osd->store->getattr(op->get_oid(), "balance-reads", &b, 1) >= 0)
+       is_balanced = true;
+      
+      if (!is_balanced && is_flash_crowd_candidate &&
+         balancing_reads.count(op->get_oid()) == 0) {
+       dout(-10) << "preprocess_op balance-reads on " << op->get_oid() << dendl;
+       balancing_reads.insert(op->get_oid());
+       MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(),
+                                op->get_oid(),
+                                ObjectLayout(info.pgid),
+                                osd->osdmap->get_epoch(),
+                                OSD_OP_BALANCEREADS);
+       do_op(pop);
       }
-    }
-  
-    if (is_acker()) {
-      // -- read on acker ---
-
-      // test
-      if (false) {
-       if (acting.size() > 1) {
-         int peer = acting[1];
-         dout(-10) << "preprocess_op fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << dendl;
-         osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
-         return true;
-       }
+      if (is_balanced && !is_flash_crowd_candidate &&
+         !unbalancing_reads.count(op->get_oid()) == 0) {
+       dout(-10) << "preprocess_op unbalance-reads on " << op->get_oid() << dendl;
+       unbalancing_reads.insert(op->get_oid());
+       MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(),
+                                op->get_oid(),
+                                ObjectLayout(info.pgid),
+                                osd->osdmap->get_epoch(),
+                                OSD_OP_UNBALANCEREADS);
+       do_op(pop);
       }
+    }
+    
+    
+    // check my load. 
+    // TODO xxx we must also compare with our own load
+    // if i am x percentage higher than replica , 
+    // redirect the read 
+    
+    if ( g_conf.osd_balance_reads == LOAD_LATENCY) {
+      double mean_read_time = osd->load_calc.get_average();
       
-      // -- flash crowd?
-      if (!op->get_source().is_osd()) {
-       // candidate?
-       bool is_flash_crowd_candidate = osd->iat_averager.is_flash_crowd_candidate( op->get_oid() );
-       bool is_replicated = replicated_objects.count( op->get_oid() );
+      if ( mean_read_time != -1 ) {
        
-       if (!is_replicated && is_flash_crowd_candidate) {
-         // replicate
-         dout(-10) << "preprocess_op replicating " << op->get_oid() << endl;
-         replicated_objects.insert(op->get_oid());
-         for (unsigned i=1; i<acting.size(); ++i) {
-           osd->messenger->send_message(new MOSDOp(osd->messenger->get_myinst(), 0, 0,
-                                                   op->get_oid(), ObjectLayout(info.pgid),
-                                                   osd->osdmap->get_epoch(),
-                                                   OSD_OP_REPLICATE),
-                                        osd->osdmap->get_inst(acting[i]));
-         }
-       }
-       if (is_replicated && !is_flash_crowd_candidate) {
-         // unreplicate
-         dout(-10) << "preprocess_op unreplicating " << op->get_oid() << endl;
-         replicated_objects.erase(op->get_oid());
-         for (unsigned i=1; i<acting.size(); ++i) {
-           osd->messenger->send_message(new MOSDOp(osd->messenger->get_myinst(), 0, 0,
-                                                   op->get_oid(), ObjectLayout(info.pgid),
-                                                   osd->osdmap->get_epoch(),
-                                                   OSD_OP_REPLICATE),
-                                        osd->osdmap->get_inst(acting[i]));
+       for (unsigned i=1; 
+            i<acting.size(); 
+            ++i) {
+         int peer = acting[i];
+         
+         dout(10) << "my read time " << mean_read_time 
+                  << "peer_readtime" << osd->peer_read_time[peer] 
+                  << " of peer" << peer << dendl;
+         
+         if ( osd->peer_read_time.count(peer) &&
+              ( (osd->peer_read_time[peer]*100/mean_read_time) <
+                (100 - g_conf.osd_load_diff_percent))) {
+           dout(10) << " forwarding to peer osd" << peer << dendl;
+           
+           osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
+           return true;
          }
        }
       }
+    }
+    else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE ) {
+      // am i above my average?
+      float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
       
-    
-      // check my load. 
-      // TODO xxx we must also compare with our own load
-      // if i am x percentage higher than replica , 
-      // redirect the read 
-      
-      if ( g_conf.osd_balance_reads == LOAD_LATENCY) {
-       double mean_read_time = osd->load_calc.get_average();
-       
-       if ( mean_read_time != -1 ) {
-         
-         for (unsigned i=1; 
-              i<acting.size(); 
-              ++i) {
-           int peer = acting[i];
-           
-           dout(10) << "my read time " << mean_read_time 
-                    << "peer_readtime" << osd->peer_read_time[peer] 
-                    << " of peer" << peer << endl;
+      if (osd->pending_ops > my_avg) {
+       // is there a peer who is below my average?
+       for (unsigned i=1; i<acting.size(); ++i) {
+         int peer = acting[i];
+         if (osd->peer_qlen.count(peer) &&
+             osd->peer_qlen[peer] < my_avg) {
+           // calculate a probability that we should redirect
+           float p = (my_avg - osd->peer_qlen[peer]) / my_avg;             // this is dumb.
            
-           if ( osd->peer_read_time.count(peer) &&
-                ( (osd->peer_read_time[peer]*100/mean_read_time) <
-                  (100 - g_conf.osd_load_diff_percent))) {
-             dout(10) << " forwarding to peer osd" << peer << endl;
-             
+           if (drand48() <= p) {
+             // take the first one
+             dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << my_avg
+                      << ", p=" << p 
+                      << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
+                      << " osd" << peer
+                      << dendl;
              osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
              return true;
            }
          }
        }
       }
-      else if ( g_conf.osd_balance_reads == LOAD_QUEUE_SIZE ) {
-       // am i above my average?
-       float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
-       
-       if (osd->pending_ops > my_avg) {
-         // is there a peer who is below my average?
-         for (unsigned i=1; i<acting.size(); ++i) {
-           int peer = acting[i];
-           if (osd->peer_qlen.count(peer) &&
-               osd->peer_qlen[peer] < my_avg) {
-             // calculate a probability that we should redirect
-             float p = (my_avg - osd->peer_qlen[peer]) / my_avg;             // this is dumb.
-             
-             if (drand48() <= p) {
-               // take the first one
-               dout(10) << "my qlen " << osd->pending_ops << " > my_avg " << my_avg
-                        << ", p=" << p 
-                        << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
-                        << " osd" << peer
-                        << dendl;
-               osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
-               return true;
-             }
-           }
-         }
-       }
-      }
+    }
+    
+    else if ( g_conf.osd_balance_reads == LOAD_HYBRID ) {
+      // am i above my average?
+      float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
       
-      else if ( g_conf.osd_balance_reads == LOAD_HYBRID ) {
-       // am i above my average?
-       float my_avg = osd->hb_stat_qlen / osd->hb_stat_ops;
-       
-       if (osd->pending_ops > my_avg) {
-         // is there a peer who is below my average?
-         for (unsigned i=1; i<acting.size(); ++i) {
-           int peer = acting[i];
-           if (osd->peer_qlen.count(peer) &&
-               osd->peer_qlen[peer] < my_avg) {
-             // calculate a probability that we should redirect
-             //float p = (my_avg - peer_qlen[peer]) / my_avg;             // this is dumb.
-             
-             double mean_read_time = osd->load_calc.get_average();
-             
-             if ( mean_read_time != -1 &&  
-                  osd->peer_read_time.count(peer) &&
-                  ( (osd->peer_read_time[peer]*100/mean_read_time) <
-                    ( 100 - g_conf.osd_load_diff_percent) ) )
-               //if (drand48() <= p) {
-               // take the first one
-               dout(10) << "using hybrid :my qlen " << osd->pending_ops << " > my_avg " << my_avg
-                        << "my read time  "<<  mean_read_time
-                        << "peer read time " << osd->peer_read_time[peer]  
-                        << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
-                        << " osd" << peer
-                        << endl;
-             osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
-             return true;
-           }
+      if (osd->pending_ops > my_avg) {
+       // is there a peer who is below my average?
+       for (unsigned i=1; i<acting.size(); ++i) {
+         int peer = acting[i];
+         if (osd->peer_qlen.count(peer) &&
+             osd->peer_qlen[peer] < my_avg) {
+           // calculate a probability that we should redirect
+           //float p = (my_avg - peer_qlen[peer]) / my_avg;             // this is dumb.
+           
+           double mean_read_time = osd->load_calc.get_average();
+           
+           if ( mean_read_time != -1 &&  
+                osd->peer_read_time.count(peer) &&
+                ( (osd->peer_read_time[peer]*100/mean_read_time) <
+                  ( 100 - g_conf.osd_load_diff_percent) ) )
+             //if (drand48() <= p) {
+             // take the first one
+             dout(10) << "using hybrid :my qlen " << osd->pending_ops << " > my_avg " << my_avg
+                      << "my read time  "<<  mean_read_time
+                      << "peer read time " << osd->peer_read_time[peer]  
+                      << ", fwd to peer w/ qlen " << osd->peer_qlen[peer]
+                      << " osd" << peer
+                      << dendl;
+           osd->messenger->send_message(op, osd->osdmap->get_inst(peer));
+           return true;
          }
        }
       }
@@ -305,8 +257,19 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op)
     if (osd->store->is_cached( op->get_oid() , 
                               op->get_offset(), 
                               op->get_length() ) == 0) {
+      if (!is_primary()) {
+       // am i allowed?
+       bool v;
+       if (osd->store->getattr(op->get_oid(), "balance-reads", &v, 1) < 0) {
+         dout(10) << "preprocess_op in-cache but no balance-reads on " << op->get_oid()
+                   << ", fwd to primary" << dendl;
+         osd->messenger->send_message(op, osd->osdmap->get_inst(get_primary()));
+         return true;
+       }
+      }
+
       // do it now
-      dout(-10) << "preprocess_op data is in cache, reading from cache" << *op <<  endl;
+      dout(-10) << "preprocess_op data is in cache, reading from cache" << *op <<  dendl;
       do_op(op);
       return true;
     }
@@ -330,14 +293,8 @@ void ReplicatedPG::do_op(MOSDOp *op)
     
     // reads
   case OSD_OP_READ:
-    if (osd->block_if_wrlocked(op)) 
-      return;
-    op_read(op);
-    break;
   case OSD_OP_STAT:
-    if (osd->block_if_wrlocked(op)) 
-      return;
-    op_stat(op);
+    op_read(op);
     break;
     
     // rep stuff
@@ -403,83 +360,91 @@ void ReplicatedPG::do_op_reply(MOSDOpReply *r)
 // ========================================================================
 // READS
 
-int ReplicatedPG::op_read(MOSDOp *op)
+void ReplicatedPG::op_read(MOSDOp *op)
 {
   object_t oid = op->get_oid();
 
-  dout(10) << "op_read " << oid 
+  dout(10) << "op_read " << MOSDOp::get_opname(op->get_op())
+          << " " << oid 
            << " " << op->get_offset() << "~" << op->get_length() 
-    //<< " in " << *pg 
            << dendl;
+  
+  // wrlocked?
+  if (block_if_wrlocked(op)) 
+    return;
+
+  // !primary and unbalanced?
+  if (!is_primary()) {
+    // make sure i exist and am balanced, otherwise fw back to acker.
+    bool b;
+    if (!osd->store->exists(oid) || 
+       osd->store->getattr(oid, "balance-reads", &b, 1) < 0) {
+      dout(-10) << "read on replica, object " << oid 
+               << " dne or no balance-reads, fw back to primary" << dendl;
+      osd->messenger->send_message(op, osd->osdmap->get_inst(get_acker()));
+      return;
+    }
+  }
+  
 
+  // set up reply
+  MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); 
   long r = 0;
-  bufferlist bl;
 
+  // do it.
   if (oid.rev && !pick_object_rev(oid)) {
     // we have no revision for this request.
     r = -EEXIST;
   } else {
-    // read into a buffer
-    r = osd->store->read(oid, 
-                        op->get_offset(), op->get_length(),
-                        bl);
+    switch (op->get_op()) {
+    case OSD_OP_READ:
+      {
+       // read into a buffer
+       bufferlist bl;
+       r = osd->store->read(oid, 
+                            op->get_offset(), op->get_length(),
+                            bl);
+       reply->set_data(bl);
+       reply->set_length(r);
+       dout(15) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
+      }
+      break;
+
+    case OSD_OP_STAT:
+      {
+       struct stat st;
+       memset(&st, sizeof(st), 0);
+       r = osd->store->stat(oid, &st);
+       if (r >= 0)
+         reply->set_object_size(st.st_size);
+      }
+      break;
+
+    default:
+      assert(0);
+    }
   }
   
-  // set up reply
-  MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); 
   if (r >= 0) {
     reply->set_result(0);
-    reply->set_data(bl);
-    reply->set_length(r);
 
     dout(10) <<  "READ TIME DIFF"
             << (double)g_clock.now()-op->get_received_time()
-            << endl;
+            << dendl;
     osd->load_calc.add((double)g_clock.now() - op->get_received_time());
 
   } else {
     reply->set_result(r);   // error
-    reply->set_length(0);
   }
   
-  dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
-  
   // send it
   osd->messenger->send_message(reply, op->get_client_inst());
   
   delete op;
-
-  return r;
 }
 
 
-void ReplicatedPG::op_stat(MOSDOp *op)
-{
-  object_t oid = op->get_oid();
-
-  struct stat st;
-  memset(&st, sizeof(st), 0);
-  int r = 0;
-
-  if (oid.rev && !pick_object_rev(oid)) {
-    // we have no revision for this request.
-    r = -EEXIST;
-  } else {
-    r = osd->store->stat(oid, &st);
-  }
-  
-  dout(3) << "op_stat on " << oid 
-          << " r = " << r
-          << " size = " << st.st_size
-    //<< " in " << *pg
-          << dendl;
-  
-  MOSDOpReply *reply = new MOSDOpReply(op, r, osd->osdmap->get_epoch(), true);
-  reply->set_object_size(st.st_size);
-  osd->messenger->send_message(reply, op->get_client_inst());
-  
-  delete op;
-}
 
 
 
@@ -576,42 +541,24 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t,
 
   case OSD_OP_WRLOCK:
     { // lock object
-      //r = store->setattr(oid, "wrlock", &op->get_asker(), sizeof(msg_addr_t), oncommit);
       t.setattr(oid, "wrlock", &op->get_client(), sizeof(entity_name_t));
     }
     break;  
-    
   case OSD_OP_WRUNLOCK:
     { // unlock objects
-      //r = store->rmattr(oid, "wrlock", oncommit);
       t.rmattr(oid, "wrlock");
-      
-      // unblock all operations that were waiting for this object to become unlocked
-      if (osd->waiting_for_wr_unlock.count(oid)) {
-        osd->take_waiters(osd->waiting_for_wr_unlock[oid]);
-        osd->waiting_for_wr_unlock.erase(oid);
-      }
     }
     break;
 
-  case OSD_OP_PRIMARYLOCK:
-    { // lock object
-      bool locked = true;
-      t.setattr(oid, "primary-lock", &locked, sizeof(locked));
+  case OSD_OP_BALANCEREADS:
+    {
+      bool bal = true;
+      t.setattr(oid, "balance-reads", &bal, sizeof(bal));
     }
-    break;
-
-  case OSD_OP_PRIMARYUNLOCK:
-    { // unlock object
-      t.rmattr(oid, "primary-lock");
-
-      // kick waiters? -- only if we make replicas block ops instead of fwd to primary.
-      if (osd->waiting_for_primary_unlock.count(oid)) {
-        osd->take_waiters(osd->waiting_for_primary_unlock[oid]);
-        osd->waiting_for_primary_unlock.erase(oid);
-      }
+  case OSD_OP_UNBALANCEREADS:
+    {
+      t.rmattr(oid, "balance-reads");
     }
-    break;
 
 
     // -- modify --
@@ -656,14 +603,12 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t,
 
   case OSD_OP_TRUNCATE:
     { // truncate
-      //r = store->truncate(oid, op->get_offset());
       t.truncate(oid, op->get_length() );
     }
     break;
     
   case OSD_OP_DELETE:
     { // delete
-      //r = store->remove(oid);
       t.remove(oid);
     }
     break;
@@ -732,6 +677,39 @@ void ReplicatedPG::apply_repop(RepGather *repop)
   repop->op->get_data().clear();
   
   repop->applied = true;
+
+
+  // any completion stuff to do here?
+  object_t oid = repop->op->get_oid();
+
+  switch (repop->op->get_op()) { 
+  case OSD_OP_UNBALANCEREADS:
+    dout(-10) << "apply_repop  completed unbalance-reads on " << oid << dendl;
+    unbalancing_reads.erase(oid);
+    if (waiting_for_unbalanced_reads.count(oid)) {
+      osd->take_waiters(waiting_for_unbalanced_reads[oid]);
+      waiting_for_unbalanced_reads.erase(oid);
+    }
+    break;
+
+  case OSD_OP_BALANCEREADS:
+    dout(-10) << "apply_repop  completed balance-reads on " << oid << dendl;
+    if (waiting_for_balanced_reads.count(oid)) {
+      osd->take_waiters(waiting_for_balanced_reads[oid]);
+      waiting_for_balanced_reads.erase(oid);
+    }
+    break;
+    
+  case OSD_OP_WRUNLOCK:
+    dout(-10) << "apply_repop  completed wrunlock on " << oid << dendl;
+    if (waiting_for_wr_unlock.count(oid)) {
+      osd->take_waiters(waiting_for_wr_unlock[oid]);
+      waiting_for_wr_unlock.erase(oid);
+    }
+    break;
+  }   
+  
+
 }
 
 void ReplicatedPG::put_rep_gather(RepGather *repop)
@@ -742,9 +720,11 @@ void ReplicatedPG::put_rep_gather(RepGather *repop)
   if (repop->can_send_commit() &&
       repop->op->wants_commit()) {
     // send commit.
-    MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true);
-    dout(10) << "put_repop  sending commit on " << *repop << " " << reply << dendl;
-    osd->messenger->send_message(reply, repop->op->get_client_inst());
+    if (repop->op->wants_reply()) {
+      MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true);
+      dout(10) << "put_repop  sending commit on " << *repop << " " << reply << dendl;
+      osd->messenger->send_message(reply, repop->op->get_client_inst());
+    }
     repop->sent_commit = true;
   }
 
@@ -755,9 +735,11 @@ void ReplicatedPG::put_rep_gather(RepGather *repop)
     apply_repop(repop);
 
     // send ack
-    MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false);
-    dout(10) << "put_repop  sending ack on " << *repop << " " << reply << dendl;
-    osd->messenger->send_message(reply, repop->op->get_client_inst());
+    if (repop->op->wants_reply()) {
+      MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false);
+      dout(10) << "put_repop  sending ack on " << *repop << " " << reply << dendl;
+      osd->messenger->send_message(reply, repop->op->get_client_inst());
+    }
     repop->sent_ack = true;
 
     utime_t now = g_clock.now();
@@ -1049,12 +1031,38 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   object_t oid = op->get_oid();
   const char *opname = MOSDOp::get_opname(op->get_op());
 
-  // locked by someone else?
-  // for _any_ op type -- eg only the locker can unlock!
+  // --- locking ---
+
+  // wrlock?
   if (op->get_op() != OSD_OP_WRNOOP &&  // except WRNOOP; we just want to flush
-      osd->block_if_wrlocked(op)) 
+      block_if_wrlocked(op)) 
     return; // op will be handled later, after the object unlocks
   
+  // balance-reads set?
+  char v;
+  if (osd->store->getattr(op->get_oid(), "balance-reads", &v, 1) >= 0 ||
+      balancing_reads.count(op->get_oid())) {
+    
+    if (!unbalancing_reads.count(op->get_oid())) {
+      // unbalance
+      dout(-10) << "preprocess_op unbalancing-reads on " << op->get_oid() << dendl;
+      unbalancing_reads.insert(op->get_oid());
+      
+      MOSDOp *pop = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(),
+                              op->get_oid(),
+                              ObjectLayout(info.pgid),
+                              osd->osdmap->get_epoch(),
+                              OSD_OP_UNBALANCEREADS);
+      do_op(pop);
+    }
+
+    // add to wait queue
+    dout(-10) << "preprocess_op waiting for unbalance-reads on " << op->get_oid() << dendl;
+    waiting_for_unbalanced_reads[op->get_oid()].push_back(op);
+    return;
+  }
+
+
   // share latest osd map with rest of pg?
   osd->osd_lock.Lock();
   {
@@ -1571,10 +1579,6 @@ void ReplicatedPG::on_role_change()
        it++)
     osd->take_waiters(it->second);
   waiting_for_missing_object.clear();
-  
-  // clear object replica list?
-  if (get_role() < 0)
-    replicated_objects.clear();  // hmm, should i be less sloppy about this?  FIXME.
 }
 
 
index 49529e7a49914186c7b8980f6f66726942e60bae..703fb2d7f2bae223c02e90f30360a8d934d371f6 100644 (file)
@@ -73,7 +73,10 @@ protected:
   map<tid_t, list<class Message*> > waiting_for_repop;
 
   // load balancing
-  set<object_t>  replicated_objects;
+  set<object_t> balancing_reads;
+  set<object_t> unbalancing_reads;
+  hash_map<object_t, list<Message*> > waiting_for_balanced_reads;
+  hash_map<object_t, list<Message*> > waiting_for_unbalanced_reads;  // i.e. primary-lock
 
   void get_rep_gather(RepGather*);
   void apply_repop(RepGather *repop);
@@ -117,8 +120,7 @@ protected:
   void clean_replicas();
 
 
-  void op_stat(MOSDOp *op);
-  int op_read(MOSDOp *op);
+  void op_read(MOSDOp *op);
   void op_modify(MOSDOp *op);
   void op_rep_modify(MOSDOp *op);
   void op_push(MOSDOp *op);