]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: on do ack or safe messages if they are requested
authorSage Weil <sage@newdream.net>
Thu, 20 Mar 2008 18:29:30 +0000 (11:29 -0700)
committerSage Weil <sage@newdream.net>
Mon, 24 Mar 2008 16:53:49 +0000 (09:53 -0700)
src/mds/MDCache.cc
src/messages/MOSDOp.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osdc/Journaler.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 8cc707d0ba02e9c3998154de88c4fd6dd72415f1..71ae54cbf51d36fd5d68a4428265ae626605cc34 100644 (file)
@@ -3523,8 +3523,10 @@ void MDCache::shutdown_check()
   dout(0) << "log len " << mds->mdlog->get_num_events() << dendl;
 
 
-  if (mds->filer->is_active()) 
-    dout(0) << "filer still active" << dendl;
+  if (mds->objecter->is_active()) {
+    dout(0) << "objecter still active" << dendl;
+    mds->objecter->dump_active();
+  }
 }
 
 void MDCache::shutdown_start()
@@ -3640,8 +3642,9 @@ bool MDCache::shutdown_pass()
   }
 
   // filer active?
-  if (mds->filer->is_active()) {
-    dout(7) << "filer still active" << dendl;
+  if (mds->objecter->is_active()) {
+    dout(7) << "objecter still active" << dendl;
+    mds->objecter->dump_active();
     return false;
   }
 
index b8f45bb702d409b5697df5fd5cb4272459ed066b..57828abdd609579a7e8fbbf654c7d17d6bfb2807 100644 (file)
@@ -100,7 +100,8 @@ public:
 
 
   MOSDOp(entity_inst_t asker, int inc, long tid,
-         object_t oid, ceph_object_layout ol, epoch_t mapepoch, int op) :
+         object_t oid, ceph_object_layout ol, epoch_t mapepoch, int op,
+        int flags) :
     Message(CEPH_MSG_OSD_OP) {
     memset(&head, 0, sizeof(head));
     head.client_inst.name = asker.name;
@@ -111,8 +112,7 @@ public:
     head.layout = ol;
     head.osdmap_epoch = cpu_to_le32(mapepoch);
     head.op = op;
-    
-    head.flags = CEPH_OSD_OP_ACK | CEPH_OSD_OP_SAFE;
+    head.flags = flags;
   }
   MOSDOp() {}
 
index 867ce6c1002a6b483f6b316842800e051fcefd0f..81abe4db0d800a38a7ce7005f826b32df4442f0b 100644 (file)
@@ -164,7 +164,7 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
                                 oid,
                                 layout,
                                 osd->osdmap->get_epoch(),
-                                CEPH_OSD_OP_BALANCEREADS);
+                                CEPH_OSD_OP_BALANCEREADS, 0);
        do_op(pop);
       }
       if (is_balanced && !should_balance &&
@@ -178,7 +178,7 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
                                 oid,
                                 layout,
                                 osd->osdmap->get_epoch(),
-                                CEPH_OSD_OP_UNBALANCEREADS);
+                                CEPH_OSD_OP_UNBALANCEREADS, 0);
        do_op(pop);
       }
     }
@@ -842,27 +842,29 @@ void ReplicatedPG::put_rep_gather(RepGather *repop)
   dout(10) << "put_repop " << *repop << dendl;
   
   // commit?
-  if (repop->can_send_commit() &&
-      repop->op->wants_commit()) {
-    // send commit.
-    MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true);
-    dout(10) << "put_repop  sending commit on " << *repop << " " << reply << dendl;
-    osd->messenger->send_message(reply, repop->op->get_client_inst());
-    repop->sent_commit = true;
+  if (repop->can_send_commit()) {
+    if (repop->op->wants_commit()) {
+      // send commit.
+      MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), true);
+      dout(10) << "put_repop  sending commit on " << *repop << " " << reply << dendl;
+      osd->messenger->send_message(reply, repop->op->get_client_inst());
+      repop->sent_commit = true;
+    }
   }
 
   // ack?
-  else if (repop->can_send_ack() &&
-           repop->op->wants_ack()) {
+  else if (repop->can_send_ack()) {
     // apply
     if (!repop->applied)
       apply_repop(repop);
 
-    // send ack
-    MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false);
-    dout(10) << "put_repop  sending ack on " << *repop << " " << reply << dendl;
-    osd->messenger->send_message(reply, repop->op->get_client_inst());
-    repop->sent_ack = true;
+    if (repop->op->wants_ack()) {
+      // send ack
+      MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osd->osdmap->get_epoch(), false);
+      dout(10) << "put_repop  sending ack on " << *repop << " " << reply << dendl;
+      osd->messenger->send_message(reply, repop->op->get_client_inst());
+      repop->sent_ack = true;
+    }
 
     utime_t now = g_clock.now();
     now -= repop->start;
@@ -1157,7 +1159,7 @@ void ReplicatedPG::op_modify(MOSDOp *op)
                               poid.oid,
                               layout,
                               osd->osdmap->get_epoch(),
-                              CEPH_OSD_OP_UNBALANCEREADS);
+                              CEPH_OSD_OP_UNBALANCEREADS, 0);
       do_op(pop);
     }
 
index 45d5da7f3b6bef64e9d38a7a6bc574fb3faf4ca7..796f4e518c431482b6e7edb0598d66312228d6e9 100644 (file)
@@ -55,12 +55,10 @@ public:
       pg_local_last_complete(lc) { }
 
     bool can_send_ack() { 
-      return !sent_ack && !sent_commit &&
-        waitfor_ack.empty(); 
+      return !sent_ack && !sent_commit && waitfor_ack.empty(); 
     }
     bool can_send_commit() { 
-      return !sent_commit &&
-        waitfor_ack.empty() && waitfor_commit.empty(); 
+      return !sent_commit && waitfor_ack.empty() && waitfor_commit.empty(); 
     }
     bool can_delete() { 
       return waitfor_ack.empty() && waitfor_commit.empty(); 
index 71514dd38808aca8c87bd0c3cac890d652785902..db0267b9c8ee0609a27c0bf56a622fb080745e5b 100644 (file)
@@ -169,7 +169,7 @@ void Journaler::write_head(Context *oncommit)
   bufferlist bl;
   bl.append((char*)&last_written, sizeof(last_written));
   filer.write(inode, 0, bl.length(), bl, 0, 
-             0
+             NULL
              new C_WriteHead(this, last_written, oncommit));
 }
 
index eb4153ddaec6433756e52404518e3c7e7844ddea..12abe419b64196398ac7a3a316b15d6ec072139f 100644 (file)
@@ -208,6 +208,7 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs)
     tids.swap( pg.active_tids );
     close_pg( pgid );  // will pbly reopen, unless it's just commits we're missing
     
+    dout(10) << "kick_requests pg " << pgid << " tids " << tids << dendl;
     for (set<tid_t>::iterator p = tids.begin();
          p != tids.end();
          p++) {
@@ -216,22 +217,33 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs)
       if (op_modify.count(tid)) {
         OSDModify *wr = op_modify[tid];
         op_modify.erase(tid);
-        
+
+       if (wr->onack)
+         num_unacked--;
+       if (wr->oncommit)
+         num_uncommitted--;
+       
         // WRITE
-        if (wr->tid_version.count(tid)) {
-          if (wr->op == CEPH_OSD_OP_WRITE &&
-              !g_conf.objecter_buffer_uncommitted) {
-            dout(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << dendl;
-          } else {
-            dout(3) << "kick_requests missing commit, replay write " << tid
-                    << " v " << wr->tid_version[tid] << dendl;
-            modifyx_submit(wr, wr->waitfor_commit[tid], tid);
-          }
-        } 
-        else if (wr->waitfor_ack.count(tid)) {
+       if (wr->waitfor_ack.count(tid)) {
           dout(3) << "kick_requests missing ack, resub write " << tid << dendl;
           modifyx_submit(wr, wr->waitfor_ack[tid], tid);
-        }
+        } else {
+         assert(wr->waitfor_commit.count(tid));
+         
+         if (wr->tid_version.count(tid)) {
+           if (wr->op == CEPH_OSD_OP_WRITE &&
+               !g_conf.objecter_buffer_uncommitted) {
+             dout(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << dendl;
+             assert(0);  // crap. fixme.
+           } else {
+             dout(3) << "kick_requests missing commit, replay write " << tid
+                     << " v " << wr->tid_version[tid] << dendl;
+           }
+         } else {
+           dout(3) << "kick_requests missing commit, resub write " << tid << dendl;
+         }
+         modifyx_submit(wr, wr->waitfor_commit[tid], tid);
+        } 
       }
 
       else if (op_read.count(tid)) {
@@ -355,10 +367,13 @@ tid_t Objecter::stat_submit(OSDStat *st)
            << dendl;
 
   if (pg.acker() >= 0) {
-       MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
-                                                  ex.oid, ex.layout, osdmap->get_epoch(), 
-                                                  CEPH_OSD_OP_STAT);
+    int flags = st->flags;
+    if (st->onfinish) flags |= CEPH_OSD_OP_ACK;
 
+    MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
+                          ex.oid, ex.layout, osdmap->get_epoch(), 
+                          CEPH_OSD_OP_STAT, flags);
+    
     messenger->send_message(m, osdmap->get_inst(pg.acker()));
   }
   
@@ -471,9 +486,11 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry)
            << dendl;
 
   if (pg.acker() >= 0) {
+    int flags = rd->flags;
+    if (rd->onfinish) flags |= CEPH_OSD_OP_ACK;
     MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
                           ex.oid, ex.layout, osdmap->get_epoch(), 
-                          CEPH_OSD_OP_READ);
+                          CEPH_OSD_OP_READ, flags);
     m->set_length(ex.length);
     m->set_offset(ex.start);
     m->set_retry_attempt(retry);
@@ -729,16 +746,26 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
     tid = ++last_tid;
   assert(client_inc >= 0);
 
-  // add to gather set
-  wr->waitfor_ack[tid] = ex;
-  wr->waitfor_commit[tid] = ex;
+  // add to gather set(s)
+  int flags = wr->flags;
+  if (wr->onack) {
+    flags |= CEPH_OSD_OP_ACK;
+    wr->waitfor_ack[tid] = ex;
+    ++num_unacked;
+  } else {
+    dout(20) << " note: not requesting ack" << dendl;
+  }
+  if (wr->oncommit) {
+    flags |= CEPH_OSD_OP_SAFE;
+    wr->waitfor_commit[tid] = ex;
+    ++num_uncommitted;
+  } else {
+    dout(20) << " note: not requesting commit" << dendl;
+  }
   op_modify[tid] = wr;
   pg.active_tids.insert(tid);
   pg.last = g_clock.now();
 
-  ++num_unacked;
-  ++num_uncommitted;
-
   // send?
   dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid
            << "  oid " << ex.oid
@@ -749,7 +776,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
   if (pg.primary() >= 0) {
     MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
                           ex.oid, ex.layout, osdmap->get_epoch(),
-                          wr->op);
+                          wr->op, flags);
     m->set_length(ex.length);
     m->set_offset(ex.start);
     if (usetid > 0)
@@ -819,40 +846,16 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
 
   assert(m->get_result() >= 0);
 
-  // ack or safe?
-  if (m->is_safe()) {
-    assert(wr->tid_version.count(tid) == 0 ||
-           m->get_version() == wr->tid_version[tid]);
-
-    // remove from tid/osd maps
-    assert(pg.active_tids.count(tid));
-    pg.active_tids.erase(tid);
-    dout(15) << "handle_osd_modify_reply pg " << m->get_pg() << " still has " << pg.active_tids << dendl;
-    if (pg.active_tids.empty()) close_pg( m->get_pg() );
-
-    // commit.
-    op_modify.erase( tid );
-    wr->waitfor_ack.erase(tid);
-    wr->waitfor_commit.erase(tid);
-
-    num_uncommitted--;
-
-    if (wr->waitfor_commit.empty()) {
-      onack = wr->onack;
-      oncommit = wr->oncommit;
-      delete wr;
-    }
-  } else {
-    // ack.
-    assert(wr->waitfor_ack.count(tid));
+  // ack|commit -> ack
+  if (wr->waitfor_ack.count(tid)) {
     wr->waitfor_ack.erase(tid);
-    
     num_unacked--;
-
+    dout(15) << "handle_osd_modify_reply ack" << dendl;
+    
     if (wr->tid_version.count(tid) &&
-        wr->tid_version[tid].version != m->get_version().version) {
+       wr->tid_version[tid].version != m->get_version().version) {
       dout(-10) << "handle_osd_modify_reply WARNING: replay of tid " << tid 
-                << " did not achieve previous ordering" << dendl;
+               << " did not achieve previous ordering" << dendl;
     }
     wr->tid_version[tid] = m->get_version();
     
@@ -862,12 +865,39 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
       
       // buffer uncommitted?
       if (!g_conf.objecter_buffer_uncommitted &&
-          wr->op == CEPH_OSD_OP_WRITE) {
-        // discard buffer!
-        ((OSDWrite*)wr)->bl.clear();
+         wr->op == CEPH_OSD_OP_WRITE) {
+       // discard buffer!
+       ((OSDWrite*)wr)->bl.clear();
       }
     }
   }
+  if (m->is_safe()) {
+    // safe
+    assert(wr->tid_version.count(tid) == 0 ||
+           m->get_version() == wr->tid_version[tid]);
+
+    wr->waitfor_commit.erase(tid);
+    num_uncommitted--;
+    dout(15) << "handle_osd_modify_reply safe" << dendl;
+    
+    if (wr->waitfor_commit.empty()) {
+      oncommit = wr->oncommit;
+      wr->oncommit = 0;
+    }
+  }
+
+  // done?
+  if (wr->onack == 0 && wr->oncommit == 0) {
+    // remove from tid/osd maps
+    assert(pg.active_tids.count(tid));
+    pg.active_tids.erase(tid);
+    dout(15) << "handle_osd_modify_reply completed.  pg " << m->get_pg()
+            << " still has " << pg.active_tids << dendl;
+    if (pg.active_tids.empty()) 
+      close_pg( m->get_pg() );
+    op_modify.erase( tid );
+    delete wr;
+  }
   
   dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
 
@@ -916,3 +946,17 @@ void Objecter::ms_handle_failure(Message *m, entity_name_t dest, const entity_in
     delete m;
   }
 }
+
+
+void Objecter::dump_active()
+{
+  dout(10) << "dump_active" << dendl;
+  
+  for (hash_map<tid_t,OSDStat*>::iterator p = op_stat.begin(); p != op_stat.end(); p++)
+    dout(10) << " stat " << p->first << dendl;
+  for (hash_map<tid_t,OSDRead*>::iterator p = op_read.begin(); p != op_read.end(); p++)
+    dout(10) << " read " << p->first << dendl;
+  for (hash_map<tid_t,OSDModify*>::iterator p = op_modify.begin(); p != op_modify.end(); p++)
+    dout(10) << " modify " << p->first << dendl;
+
+}
index 93c1a2398c16ec2683498af33bd5cedfe3519819..8aefd0559be05bba6368b3249b6006dded4b79a9 100644 (file)
@@ -202,6 +202,7 @@ class Objecter {
   bool is_active() {
     return !(op_read.empty() && op_modify.empty());
   }
+  void dump_active();
 
   int get_client_incarnation() { return client_inc; }
   void set_client_incarnation(int inc) {