]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 15 Sep 2006 23:36:00 +0000 (23:36 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 15 Sep 2006 23:36:00 +0000 (23:36 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@859 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/config.cc
ceph/config.h
ceph/osdc/Objecter.cc
ceph/osdc/Objecter.h

index 3e5368b79432f36a8f017374b1d9191a65636ea5..7a6017ff0b9bf7eda2667d0723876646f0ba408f 100644 (file)
@@ -137,6 +137,9 @@ md_config_t g_conf = {
   client_trace: 0,
   fuse_direct_io: 0,
   
+  // --- objecter ---
+  objecter_buffer_uncommitted: true,
+
   // --- mds ---
   mds_cache_size: MDS_CACHE_SIZE,
   mds_cache_mid: .7,
@@ -465,6 +468,10 @@ void parse_config_options(vector<char*>& args)
        else if (strcmp(args[i], "--fakemessenger_serialize") == 0) 
          g_conf.fakemessenger_serialize = atoi(args[++i]);
 
+
+       else if (strcmp(args[i], "--objecter_buffer_uncommitted") == 0) 
+         g_conf.objecter_buffer_uncommitted = atoi(args[++i]);
+
        else if (strcmp(args[i], "--mds_cache_size") == 0) 
          g_conf.mds_cache_size = atoi(args[++i]);
 
index ebd0dfdd49b0f3c3a9292c61f702abfc76e12933..160c817e860905848ca487c59eebf824ded4d470 100644 (file)
@@ -98,6 +98,7 @@ struct md_config_t {
   int      client_oc_max_dirty;
   size_t   client_oc_max_sync_write;
 
+  
 
   /*
   bool     client_bcache;
@@ -113,6 +114,9 @@ struct md_config_t {
   int      client_trace;
   int      fuse_direct_io;
 
+  // objecter
+  bool  objecter_buffer_uncommitted;
+
   // mds
   int   mds_cache_size;
   float mds_cache_mid;
index 95a961587e7b6b0bceca953ebeb006c4504a3419..23c17544403b897361038de72e83e1d63482ae47 100644 (file)
@@ -15,6 +15,7 @@
 #include "config.h"
 #undef dout
 #define dout(x)  if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << messenger->get_myaddr() << ".objecter "
+#define derr(x)  if (x <= g_conf.debug || x <= g_conf.debug_objecter) cerr << messenger->get_myaddr() << ".objecter "
 
 
 // messages ------------------------------
@@ -171,9 +172,14 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs)
                
                // WRITE
                if (wr->tid_version.count(tid)) {
-                 dout(0) << "kick_requests missing commit, replay write " << tid
-                                 << " v " << wr->tid_version[tid] << endl;
-                 modifyx_submit(wr, wr->waitfor_commit[tid], tid);
+                 if (wr->op == OSD_OP_WRITE &&
+                         !g_conf.objecter_buffer_uncommitted) {
+                       derr(0) << "kick_requests missing commit, cannot replay: objecter_buffer_uncommitted == FALSE" << endl;
+                 } else {
+                       dout(0) << "kick_requests missing commit, replay write " << tid
+                                       << " v " << wr->tid_version[tid] << endl;
+                       modifyx_submit(wr, wr->waitfor_commit[tid], tid);
+                 }
                } 
                else if (wr->waitfor_ack.count(tid)) {
                  dout(0) << "kick_requests missing ack, resub write " << tid << endl;
@@ -554,6 +560,9 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
   op_modify[tid] = wr;
   pg.active_tids.insert(tid);
   
+  ++num_unacked;
+  ++num_uncommitted;
+
   // send
   dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid
                   << "  oid " << hex << ex.oid << dec 
@@ -564,6 +573,8 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
   if (pg.primary() >= 0)
        messenger->send_message(m, MSG_ADDR_OSD(pg.primary()), 0);
   
+  dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << endl;
+  
   return tid;
 }
 
@@ -618,6 +629,8 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
        wr->waitfor_ack.erase(tid);
        wr->waitfor_commit.erase(tid);
 
+       num_uncommitted--;
+
        if (wr->waitfor_commit.empty()) {
          onack = wr->onack;
          oncommit = wr->oncommit;
@@ -628,6 +641,8 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
        //dout(15) << " handle_osd_write_reply ack on " << tid << endl;
        assert(wr->waitfor_ack.count(tid));
        wr->waitfor_ack.erase(tid);
+       
+       num_unacked--;
 
        if (wr->tid_version.count(tid) &&
                wr->tid_version[tid].version != m->get_version().version) {
@@ -639,6 +654,13 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
        if (wr->waitfor_ack.empty()) {
          onack = wr->onack;
          wr->onack = 0;  // only do callback once
+         
+         // buffer uncommitted?
+         if (!g_conf.objecter_buffer_uncommitted &&
+                 wr->op == OSD_OP_WRITE) {
+               // discard buffer!
+               ((OSDWrite*)wr)->bl.clear();
+         }
        }
   }
   
index 140bb58dc7efb5dc6e5e5c53801ce50a3f2629d8..450f5701159131cc302a9fdc70b307a934f4d4fc 100644 (file)
@@ -25,6 +25,8 @@ class Objecter {
   
  private:
   tid_t last_tid;
+  int num_unacked;
+  int num_uncommitted;
 
   /*** track pending operations ***/
   // read
@@ -121,7 +123,8 @@ class Objecter {
  public:
   Objecter(Messenger *m, OSDMap *om) : 
        messenger(m), osdmap(om),
-       last_tid(0)
+       last_tid(0),
+       num_unacked(0), num_uncommitted(0)
        {}
   ~Objecter() {
        // clean up op_*