]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: fix up recovery op accounting
authorSage Weil <sage@newdream.net>
Wed, 29 Apr 2009 22:33:31 +0000 (15:33 -0700)
committerSage Weil <sage@newdream.net>
Wed, 29 Apr 2009 22:33:31 +0000 (15:33 -0700)
Fix varous rop accounting bugs.  Add assertions.  Log recovery
ops.

src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.h
src/osd/ReplicatedPG.cc

index aa7968c2d3abc1b7f0283de61153990f69f57364..93cb437c6dceb4d8ddf3e560dee5690d811388f9 100644 (file)
@@ -358,6 +358,7 @@ int OSD::init()
 
   osd_logtype.add_inc(l_osd_subop, "subop");
 
+  osd_logtype.add_inc(l_osd_r_push, "rop");
   osd_logtype.add_inc(l_osd_r_push, "r_push");
   osd_logtype.add_inc(l_osd_r_pushb, "r_pushb");
   osd_logtype.add_inc(l_osd_r_pull, "r_pull");
@@ -2035,10 +2036,10 @@ void OSD::advance_map(ObjectStore::Transaction& t, interval_set<snapid_t>& remov
     }
     if (oldprimary != pg->get_primary()) {
       pg->info.history.same_primary_since = osdmap->get_epoch();
-      pg->cancel_recovery();
       pg->dirty_info = true;
     }
-    
+    pg->cancel_recovery();
+
     // deactivate.
     pg->state_clear(PG_STATE_ACTIVE);
     pg->state_clear(PG_STATE_DOWN);
@@ -3246,8 +3247,7 @@ void OSD::do_recovery(PG *pg)
   int max = g_conf.osd_recovery_max_active - recovery_ops_active;
  
   dout(10) << "do_recovery starting " << max
-          << " (" << recovery_ops_active
-          << "/" << g_conf.osd_recovery_max_active << " active) on "
+          << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops) on "
           << *pg << dendl;
 
   int started = pg->start_recovery_ops(max);
@@ -3260,18 +3260,30 @@ void OSD::do_recovery(PG *pg)
   pg->put();
 }
 
-
-
+void OSD::start_recovery_op(PG *pg, int count)
+{
+  recovery_wq.lock();
+  dout(10) << "start_recovery_op " << *pg << " count " << count
+          << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops)"
+          << dendl;
+  assert(pg->recovery_ops_active >= 0);
+  pg->recovery_ops_active += count;
+  recovery_wq.unlock();
+}
 
 void OSD::finish_recovery_op(PG *pg, int count, bool dequeue)
 {
   dout(10) << "finish_recovery_op " << *pg << " count " << count
-          << " dequeue=" << dequeue << dendl;
+          << " dequeue=" << dequeue
+          << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops)"
+          << dendl;
   recovery_wq.lock();
 
   // adjust count
   recovery_ops_active -= count;
+  assert(recovery_ops_active >= 0);
   pg->recovery_ops_active -= count;
+  assert(pg->recovery_ops_active >= 0);
 
   if (dequeue)
     pg->recovery_item.remove_myself();
index 8081907de6986e814335d860cf93ee9ba57bbdac..d314a2cbecede70f6ab527a62695b5a9b2644041 100644 (file)
@@ -51,6 +51,7 @@ enum {
   l_osd_r_wr,
   l_osd_r_wrb,
   l_osd_subop,
+  l_osd_rop,
   l_osd_r_push,
   l_osd_r_pushb,
   l_osd_r_pull,
@@ -638,6 +639,7 @@ private:
   } recovery_wq;
 
   bool queue_for_recovery(PG *pg);
+  void start_recovery_op(PG *pg, int count);
   void finish_recovery_op(PG *pg, int count, bool more);
   void defer_recovery(PG *pg);
   void do_recovery(PG *pg);
index 49af4822389515ad7c03dabffaa16a1372ed375c..73d46a5caf465b36e220b03c9717c9ce423eef79 100644 (file)
@@ -931,6 +931,9 @@ inline ostream& operator<<(ostream& out, const PG& pg)
 {
   out << "pg[" << pg.info 
       << " r=" << pg.get_role();
+  
+  if (pg.recovery_ops_active)
+    out << " rops=" << pg.recovery_ops_active;
 
   if (pg.log.bottom != pg.info.log_bottom ||
       pg.log.top != pg.info.last_update)
index a029585f54acaf64730bed66531f4938261d8e38..158d1aa0960c0f474492b9291c031a9b9490ab8e 100644 (file)
@@ -97,6 +97,7 @@ void ReplicatedPG::wait_for_missing_object(object_t oid, Message *m)
            << ", pulling"
            << dendl;
     pull(poid);
+    osd->start_recovery_op(this, 1);
   }
   waiting_for_missing_object[oid].push_back(m);
 }
@@ -1761,6 +1762,7 @@ void ReplicatedPG::op_modify(MOSDOp *op)
       // push it before this update. 
       // FIXME, this is probably extra much work (eg if we're about to overwrite)
       push_to_replica(poid, peer);
+      osd->start_recovery_op(this, 1);
     }
   }
 
@@ -2120,10 +2122,10 @@ bool ReplicatedPG::pull(pobject_t poid)
     if (missing.is_missing(head.oid)) {
       if (pulling.count(head.oid)) {
        dout(10) << " missing but already pulling head " << head << dendl;
+       return false;
       } else {
-       pull(head);
+       return pull(head);
       }
-      return false;
     }
 
     // check snapset
@@ -2524,8 +2526,10 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
     missing_loc.erase(poid.oid);
 
     // close out pull op?
-    if (pulling.count(poid.oid))
+    if (pulling.count(poid.oid)) {
       pulling.erase(poid.oid);
+      finish_recovery_op();
+    }
 
     update_stats();
 
@@ -2538,11 +2542,11 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
       for (unsigned i=1; i<acting.size(); i++) {
        int peer = acting[i];
        assert(peer_missing.count(peer));
-       if (peer_missing[peer].is_missing(poid.oid)) 
+       if (peer_missing[peer].is_missing(poid.oid)) {
          push_to_replica(poid, peer);  // ok, push it, and they (will) have it now.
+         osd->start_recovery_op(this, 1);
+       }
       }
-
-      finish_recovery_op();
     }
 
   } else {
@@ -2658,6 +2662,7 @@ int ReplicatedPG::start_recovery_ops(int max)
     else
       n = recover_primary(max);
     started += n;
+    osd->logger->inc(l_osd_rop, n);
     if (n < max)
       break;
     max -= n;
@@ -2780,7 +2785,6 @@ int ReplicatedPG::recover_primary(int max)
     finish_recovery();
   } else {
     dout(-10) << "recover_primary primary now complete, starting peer recovery" << dendl;
-    finish_recovery_op();
   }
 
   return started;