]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: throttle read or write ops as per osdmap flags
authorSage Weil <sage@newdream.net>
Thu, 12 Mar 2009 17:34:39 +0000 (10:34 -0700)
committerSage Weil <sage@newdream.net>
Thu, 12 Mar 2009 21:20:05 +0000 (14:20 -0700)
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 06127b0a8239899978a25b4633f2387d461a7e12..5510fb3d8da00d52df32efdfe39b4eb182c8017b 100644 (file)
@@ -96,6 +96,10 @@ void Objecter::handle_osd_map(MOSDMap *m)
       for (epoch_t e = osdmap->get_epoch() + 1;
           e <= m->get_last();
           e++) {
+
+       bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
+       bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR);
+    
        if (m->incremental_maps.count(e)) {
          dout(3) << "handle_osd_map decoding incremental epoch " << e << dendl;
          OSDMap::Incremental inc(m->incremental_maps[e]);
@@ -122,6 +126,28 @@ void Objecter::handle_osd_map(MOSDMap *m)
        
        // scan pgs for changes
        scan_pgs(changed_pgs);
+
+       // kick paused
+       if (was_pauserd && !osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
+         for (hash_map<tid_t,ReadOp*>::iterator p = op_read.begin();
+              p != op_read.end();
+              p++) {
+           if (p->second->paused) {
+             p->second->paused = false;
+             read_submit(p->second);
+           }
+         }
+       }
+       if (was_pausewr && !osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
+         for (hash_map<tid_t,ModifyOp*>::iterator p = op_modify.begin();
+              p != op_modify.end();
+              p++) {
+           if (p->second->paused) {
+             p->second->paused = false;
+             modify_submit(p->second);
+           }
+         }
+       }
         
        assert(e == osdmap->get_epoch());
       }
@@ -348,7 +374,11 @@ tid_t Objecter::read_submit(ReadOp *rd)
            << " osd" << pg.acker() 
            << dendl;
 
-  if (pg.acker() >= 0) {
+  if (osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
+    dout(10) << " paused read " << rd << " tid " << last_tid << dendl;
+    rd->paused = true;
+    maybe_request_map();
+  } else if (pg.acker() >= 0) {
     int flags = rd->flags;
     if (rd->onfinish)
       flags |= CEPH_OSD_OP_ACK;
@@ -483,7 +513,12 @@ tid_t Objecter::modify_submit(ModifyOp *wr)
            << " " << wr->layout 
            << " osd" << pg.primary()
            << dendl;
-  if (pg.primary() >= 0) {
+
+  if (osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
+    dout(10) << " paused modify " << wr << " tid " << last_tid << dendl;
+    wr->paused = true;
+    maybe_request_map();
+  } else if (pg.primary() >= 0) {
     MOSDOp *m = new MOSDOp(client_inc, wr->tid,
                           wr->oid, wr->layout, osdmap->get_epoch(),
                           flags | CEPH_OSD_OP_MODIFY);
index 121197fcd3adc484aa9c02efbcac37fd05d83e09..d928305bea850ee73461da315e912de2f1222862 100644 (file)
@@ -83,10 +83,13 @@ class Objecter {
     int attempts;
     int inc_lock;
 
+    bool paused;
+
     ReadOp(object_t o, ceph_object_layout& ol, vector<ceph_osd_op>& op, int f, Context *of) :
       oid(o), layout(ol), 
       pbl(0), psize(0), flags(f), onfinish(of), 
-      tid(0), attempts(0), inc_lock(-1) {
+      tid(0), attempts(0), inc_lock(-1),
+      paused(false) {
       ops.swap(op);
     }
   };
@@ -106,10 +109,13 @@ class Objecter {
     int inc_lock;
     eversion_t version;
 
+    bool paused;
+
     ModifyOp(object_t o, ceph_object_layout& l, vector<ceph_osd_op>& op,
             const SnapContext& sc, int f, Context *ac, Context *co) :
       oid(o), layout(l), snapc(sc), flags(f), onack(ac), oncommit(co), 
-      tid(0), attempts(0), inc_lock(-1) {
+      tid(0), attempts(0), inc_lock(-1),
+      paused(false) {
       ops.swap(op);
     }
   };