]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
read threading changes. locking fixes.
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 13 Sep 2006 20:59:50 +0000 (20:59 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 13 Sep 2006 20:59:50 +0000 (20:59 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@854 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/PG.cc
ceph/osd/PG.h

index 2a6aa5d1af30cbeb7877785cea8ebaa893e24773..302718ae88db3e5994faf5a5fb5e72aa0a135837 100644 (file)
@@ -32,8 +32,6 @@
 #include "msg/Messenger.h"
 #include "msg/Message.h"
 
-//#include "msg/HostMonitor.h"
-
 #include "messages/MGenericMessage.h"
 #include "messages/MPing.h"
 #include "messages/MPingAck.h"
@@ -217,26 +215,6 @@ int OSD::init()
          assert(whoami == superblock.whoami);
        }
 
-       // monitor
-       /*
-    char s[80];
-       sprintf(s, "osd%d", whoami);
-       string st = s;
-       monitor = new HostMonitor(messenger, st);
-       monitor->init();
-
-       // <hack> for testing monitoring
-       int i = whoami;
-       if (++i == g_conf.num_osd) i = 0;
-       monitor->get_hosts().insert(MSG_ADDR_OSD(i));
-       if (++i == g_conf.num_osd) i = 0;
-       monitor->get_hosts().insert(MSG_ADDR_OSD(i));
-       if (++i == g_conf.num_osd) i = 0;  
-       monitor->get_hosts().insert(MSG_ADDR_OSD(i));
-       
-       monitor->get_notify().insert(MSG_ADDR_MON(0));
-       // </hack>
-       */
        
        // log
        char name[80];
@@ -890,6 +868,7 @@ void OSD::handle_osd_map(MOSDMap *m)
                  for (map<tid_t,PG::RepOpGather*>::iterator p = pg->repop_gather.begin();
                           p != pg->repop_gather.end();
                           p++) {
+                       dout(-1) << "chekcing repop tid " << p->first << endl;
                        if (p->second->waitfor_ack.count(osd) ||
                                p->second->waitfor_commit.count(osd)) 
                          ls.push_back(p->second);
@@ -1367,36 +1346,7 @@ PG *OSD::create_pg(pg_t pgid, ObjectStore::Transaction& t)
 }
 
 
-/*
-void OSD::get_pg_list(list<pg_t>& ls)
-{
-  // just list collections; assume they're all pg's (for now)
-  store->list_collections(ls);
-}
-
-PG *OSD::get_pg(pg_t pgid)
-{
-  // already open?
-  if (pg_map.count(pgid)) 
-       return pg_map[pgid];
-
-  // exists?
-  if (!pg_exists(pgid))
-       return 0;
-
-  // open, stat collection
-  PG *pg = new PG(this, pgid);
-  pg_map[pgid] = pg;
-
-  // read pg info
-  store->collection_getattr(pgid, "info", &pg->info, sizeof(pg->info));
-  
-  // read pg log
-  pg->read_log(store);
 
-  return pg;
-}
-*/
 
 PG *OSD::get_pg(pg_t pgid)
 {
@@ -1952,13 +1902,6 @@ void OSD::op_pull(MOSDOp *op, PG *pg)
                  << " from " << op->get_source()
                  << endl;
 
-  // am i missing it?
-  if (waitfor_missing_object(op, pg)) {
-       // ok, i better the primary...  (or else there's a map mismatch, or the primary is wrong about my objects, or?)
-       assert(pg->is_primary());
-       return;
-  }
-
   // is a replica asking?  are they missing it?
   if (pg->is_primary() &&
          (pg->peer_missing.count(from) == 0 ||
@@ -2239,6 +2182,8 @@ void OSD::handle_op(MOSDOp *op)
   _share_map_incoming(op->get_source(), op->get_map_epoch());
 
   // what kind of op?
+  bool read = op->get_op() < 10;   // read, stat.  but not pull.
+
   if (!op->get_source().is_osd()) {
        // REGULAR OP (non-replication)
 
@@ -2251,8 +2196,6 @@ void OSD::handle_op(MOSDOp *op)
          return;
        }
        
-       bool read = op->get_op() < 10;
-
        if (read) {
          // read. am i the (same) acker?
          if (pg->get_acker() != whoami ||
@@ -2294,6 +2237,10 @@ void OSD::handle_op(MOSDOp *op)
          return;
        }
        
+       // missing object?
+       if (op->get_op() != OSD_OP_PUSH &&
+               waitfor_missing_object(op, pg)) return;
+
        dout(7) << "handle_op " << op << " in " << *pg << endl;
        
   } else {
@@ -2326,9 +2273,15 @@ void OSD::handle_op(MOSDOp *op)
   }
   
   if (g_conf.osd_maxthreads < 1) {
+       _lock_pg(pgid);
        do_op(op, pg); // do it now
+       _unlock_pg(pgid);
   } else {
-       enqueue_op(pgid, op);   // queue for worker threads
+       // queue for worker threads
+       if (read) 
+         enqueue_op(0, op);     // no locking needed for reads
+       else 
+         enqueue_op(pgid, op);         
   }
 }
 
@@ -2359,7 +2312,9 @@ void OSD::handle_op_reply(MOSDOpReply *op)
   }
 
   if (g_conf.osd_maxthreads < 1) {
+       _lock_pg(pgid);
        do_op(op, pg); // do it now
+       _unlock_pg(pgid);
   } else {
        enqueue_op(pgid, op);   // queue for worker threads
   }
@@ -2388,40 +2343,49 @@ void OSD::enqueue_op(pg_t pgid, Message *op)
  */
 void OSD::dequeue_op(pg_t pgid)
 {
-  Message *op;
-  PG *pg;
+  Message *op = 0;
+  PG *pg = 0;
 
   osd_lock.Lock();
   {
-       // lock pg
-       pg = _lock_pg(pgid);  
+       if (pgid) {
+         // lock pg
+         pg = _lock_pg(pgid);  
+       }
 
        // get pending op
        list<Message*> &ls  = op_queue[pgid];
        assert(!ls.empty());
        op = ls.front();
        ls.pop_front();
-
-       dout(10) << "dequeue_op pg " << hex << pgid << dec << " op " << op << ", " 
-                        << ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
+       
+       if (pgid) {
+         dout(10) << "dequeue_op write pg " << hex << pgid << dec << " op " << op << ", " 
+                          << ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
+       } else {
+         dout(10) << "dequeue_op read op " << op << ", " 
+                          << ls.size() << " / " << (pending_ops-1) << " more pending" << endl;
+       }
        
        if (ls.empty())
          op_queue.erase(pgid);
   }
   osd_lock.Unlock();
-  
+
   // do it
   do_op(op, pg);
 
-  // unlock pg
-  unlock_pg(pgid);
-  
   // finish
   osd_lock.Lock();
   {
+       if (pgid) {
+         // unlock pg
+         _unlock_pg(pgid);
+       }
+       
        dout(10) << "dequeue_op finish op " << op << endl;
        assert(pending_ops > 0);
-
+       
        if (pending_ops > g_conf.osd_max_opq) 
          op_queue_cond.Signal();
        
@@ -2450,6 +2414,14 @@ void OSD::do_op(Message *m, PG *pg)
 
        switch (op->get_op()) {
          
+         // reads
+       case OSD_OP_READ:
+         op_read(op);//, pg);
+         break;
+       case OSD_OP_STAT:
+         op_stat(op);//, pg);
+         break;
+         
          // rep stuff
        case OSD_OP_PULL:
          op_pull(op, pg);
@@ -2458,14 +2430,6 @@ void OSD::do_op(Message *m, PG *pg)
          op_push(op, pg);
          break;
          
-         // reads
-       case OSD_OP_READ:
-         op_read(op, pg);
-         break;
-       case OSD_OP_STAT:
-         op_stat(op, pg);
-         break;
-         
          // writes
        case OSD_OP_WRNOOP:
        case OSD_OP_WRITE:
@@ -2594,19 +2558,17 @@ bool OSD::waitfor_missing_object(MOSDOp *op, PG *pg)
  * client read op
  * NOTE: called from opqueue.
  */
-void OSD::op_read(MOSDOp *op, PG *pg)
+void OSD::op_read(MOSDOp *op)//, PG *pg)
 {
   const object_t oid = op->get_oid();
   
-  if (waitfor_missing_object(op, pg)) return;
-  
   // if the target object is locked for writing by another client, put 'op' to the waiting queue
   // for _any_ op type -- eg only the locker can unlock!
   if (block_if_wrlocked(op)) return; // op will be handled later, after the object unlocks
  
   dout(10) << "op_read " << hex << oid << dec 
                   << " " << op->get_offset() << "~" << op->get_length() 
-                  << " in " << *pg 
+       //<< " in " << *pg 
                   << endl;
 
   // read into a buffer
@@ -2645,12 +2607,10 @@ void OSD::op_read(MOSDOp *op, PG *pg)
  * client stat
  * NOTE: called from opqueue
  */
-void OSD::op_stat(MOSDOp *op, PG *pg)
+void OSD::op_stat(MOSDOp *op)//, PG *pg)
 {
   object_t oid = op->get_oid();
 
-  if (waitfor_missing_object(op, pg)) return;
-
   // if the target object is locked for writing by another client, put 'op' to the waiting queue
   if (block_if_wrlocked(op)) return; //read will be handled later, after the object unlocks
 
@@ -2661,7 +2621,7 @@ void OSD::op_stat(MOSDOp *op, PG *pg)
   dout(3) << "op_stat on " << hex << oid << dec 
                  << " r = " << r
                  << " size = " << st.st_size
-                 << " in " << *pg
+       //<< " in " << *pg
                  << endl;
   
   MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap->get_epoch(), true);
@@ -2694,7 +2654,7 @@ void OSD::put_repop_gather(PG *pg, PG::RepOpGather *repop)
          repop->op->wants_commit()) {
        // send commit.
        MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap->get_epoch(), true);
-       dout(10) << "put_repop sending commit on " << *repop << " " << reply << endl;
+       dout(10) << "put_repop  sending commit on " << *repop << " " << reply << endl;
        messenger->send_message(reply, repop->op->get_client());
        repop->sent_commit = true;
   }
@@ -2703,17 +2663,17 @@ void OSD::put_repop_gather(PG *pg, PG::RepOpGather *repop)
   else if (repop->can_send_ack() &&
                   repop->op->wants_ack()) {
        // apply
-       dout(10) << "put_repop applying update on " << *repop << endl;
+       dout(10) << "put_repop  applying update on " << *repop << endl;
        ObjectStore::Transaction t;
        prepare_op_transaction(t, repop->op, repop->new_version, pg);
        Context *oncommit = new C_OSD_WriteCommit(this, pg->info.pgid, repop->rep_tid, repop->pg_local_last_complete);
        unsigned r = store->apply_transaction(t, oncommit);
        if (r)
-         dout(-10) << "put_repop apply transaction return " << r << " on " << *repop << endl;
+         dout(-10) << "put_repop  apply transaction return " << r << " on " << *repop << endl;
 
        // send ack
        MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap->get_epoch(), false);
-       dout(10) << "put_repop sending ack on " << *repop << " " << reply << endl;
+       dout(10) << "put_repop  sending ack on " << *repop << " " << reply << endl;
        messenger->send_message(reply, repop->op->get_client());
        repop->sent_ack = true;
 
@@ -2734,15 +2694,20 @@ void OSD::put_repop_gather(PG *pg, PG::RepOpGather *repop)
          }
          
          if (min > pg->peers_complete_thru) {
-               dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
+               dout(10) << *pg << "put_repop  peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
                pg->peers_complete_thru = min;
          }
        }
 
-       dout(10) << "put_repop deleting " << *repop << endl;
+       dout(10) << "put_repop  deleting " << *repop << endl;
        //repop->lock.Unlock();  
+
+       assert(pg->repop_gather.count(repop->rep_tid));
+       //pg->repop_gather.erase(repop->rep_tid);
+
        delete repop->op;
        delete repop;
+
   } else {
        //repop->lock.Unlock();
   }
@@ -2849,12 +2814,7 @@ void OSD::repop_ack(PG *pg, PG::RepOpGather *repop,
          assert(repop->waitfor_commit.count(fromosd));   
          repop->waitfor_commit.erase(fromosd);
          repop->waitfor_ack.erase(fromosd);
-         
          repop->pg_complete_thru[fromosd] = pg_complete_thru;
-         
-         if (repop->waitfor_commit.empty()) {
-               pg->repop_gather.erase(repop->rep_tid);
-         }
        } else {
          // ack
          repop->waitfor_ack.erase(fromosd);
@@ -2885,6 +2845,7 @@ void OSD::op_modify_commit(pg_t pgid, tid_t rep_tid, eversion_t pg_complete_thru
                repop->pg_complete_thru[whoami] = pg_complete_thru;
          }
          put_repop_gather(pg, repop);
+         dout(10) << "op_modify_commit done on " << repop << endl;
        } else {
          dout(10) << "op_modify_commit pg " << hex << pgid << dec << " rep_tid " << rep_tid << " dne" << endl;
        }
@@ -2906,9 +2867,6 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
 
   const char *opname = MOSDOp::get_opname(op->get_op());
 
-  // missing?
-  if (waitfor_missing_object(op, pg)) return;
-  
   // are any peers missing this?
   for (unsigned i=1; i<pg->acting.size(); i++) {
        int peer = pg->acting[i];
index 03fdfbede52388e8a1bd099ddf4f9d384a3755f7..ef5fa1a4cf826f9ff8ff288921a9f217454c3a2a 100644 (file)
@@ -272,8 +272,8 @@ public:
   void handle_osd_ping(class MOSDPing *m);
   void handle_op(class MOSDOp *m);
 
-  void op_read(class MOSDOp *m, PG *pg);
-  void op_stat(class MOSDOp *m, PG *pg);
+  void op_read(class MOSDOp *m);//, PG *pg);
+  void op_stat(class MOSDOp *m);//, PG *pg);
   void op_modify(class MOSDOp *m, PG *pg);
   void op_modify_commit(pg_t pgid, tid_t rep_tid, eversion_t pg_complete_thru);
 
index 1fd065ef0d9c5fa171c9bf7e886a687268f5f814..e595b8f173d0de4f35b92241e55e1abeb35f396b 100644 (file)
@@ -816,6 +816,7 @@ bool PG::do_recovery()
   dout(-10) << "do_recovery pulling " << objects_pulling.size() << " in pg, "
                   << osd->num_pulling << "/" << g_conf.osd_max_pull << " total"
                   << endl;
+  dout(10) << "do_recovery " << missing << endl;
 
   // can we slow down on this PG?
   if (osd->num_pulling >= g_conf.osd_max_pull && !objects_pulling.empty()) {
index 6c5c1f9cbf14a79c68aee2f8a2595a13a7046713..9ea3600accb09864b6181d3e92eddcca35912b63 100644 (file)
@@ -654,7 +654,7 @@ inline ostream& operator<<(ostream& out, const PG& pg)
 
 inline ostream& operator<<(ostream& out, PG::RepOpGather& repop)
 {
-  out << "repop(rep_tid=" << repop.rep_tid 
+  out << "repop(" << &repop << " rep_tid=" << repop.rep_tid 
          << " wfack=" << repop.waitfor_ack
          << " wfcommit=" << repop.waitfor_commit;
   out << " pct=" << repop.pg_complete_thru;