]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: adjust recovery op accounting; explicitly track set of recovering objects
authorSage Weil <sage@newdream.net>
Wed, 24 Jun 2009 18:17:55 +0000 (11:17 -0700)
committerSage Weil <sage@newdream.net>
Wed, 24 Jun 2009 18:17:55 +0000 (11:17 -0700)
Use a single {start,finish}_recovery_op() func to start and stop
recovery ops, so that there is a single point for counter adjustments
to occur.  On reset, simply call into OSD multiple times.

Also maintain a set<sobject_t> in each PG and on the OSD to track
the set of objects that are recovering.  This can hopefully be
compiled out once all the bugs are identified.

We are chasing this:

osd/OSD.cc:3465: FAILED assert(recovery_ops_active >= 0)
 1: ./cosd(_Z18__ceph_assert_failPKcS0_iS0_+0x3a) [0x7a769b]
 2: ./cosd(_ZN3OSD18finish_recovery_opEP2PGib+0x148) [0x696bce]
 3: ./cosd(_ZN12ReplicatedPG18finish_recovery_opEv+0x77) [0x6359c5]
 4: ./cosd(_ZN12ReplicatedPG17sub_op_push_replyEP14MOSDSubOpReply+0x540) [0x63628a]
 5: ./cosd(_ZN12ReplicatedPG15do_sub_op_replyEP14MOSDSubOpReply+0x64) [0x6407fe]
 6: ./cosd(_ZN3OSD10dequeue_opEP2PG+0x224) [0x6996ee]
 7: ./cosd(_ZN3OSD4OpWQ8_processEP2PG+0x21) [0x70d175]
 8: ./cosd(_ZN10ThreadPool9WorkQueueI2PGE13_void_processEPv+0x28) [0x6c9f78]
 9: ./cosd(_ZN10ThreadPool6workerEv+0x280) [0x7a825c]
 10: ./cosd(_ZN10ThreadPool10WorkThread5entryEv+0x19) [0x70cb9f]
 11: ./cosd(_ZN6Thread11_entry_funcEPv+0x20) [0x629d48]
 12: /lib/libpthread.so.0 [0x7f2f1e3f33f7]
 13: /lib/libc.so.6(clone+0x6d) [0x7f2f1d9c294d]
 NOTE: a copy of the executable, or `objdump -rdS <executable>` is needed to interpret this.

src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index f02da357fa337a7e82e80317990ae13f471a6711..538b77dc88a407664f93844cf54041740a197b1b 100644 (file)
@@ -3434,10 +3434,16 @@ void OSD::do_recovery(PG *pg)
   dout(10) << "do_recovery starting " << max
           << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops) on "
           << *pg << dendl;
+#ifdef DEBUG_RECOVERY_OIDS
+  dout(20) << "  active was " << recovery_oids << dendl;
+#endif
 
   int started = pg->start_recovery_ops(max);
-  recovery_ops_active += started;
-  pg->recovery_ops_active += started;
+
+  dout(10) << "do_recovery started " << started
+          << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops) on "
+          << *pg << dendl;
+
   if (started < max)
     pg->recovery_item.remove_myself();
 
@@ -3445,32 +3451,43 @@ void OSD::do_recovery(PG *pg)
   pg->put();
 }
 
-void OSD::start_recovery_op(PG *pg, int count)
+void OSD::start_recovery_op(PG *pg, const sobject_t& soid)
 {
   recovery_wq.lock();
-  dout(10) << "start_recovery_op " << *pg << " count " << count
+  dout(10) << "start_recovery_op " << *pg << " " << soid
           << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops)"
           << dendl;
   assert(recovery_ops_active >= 0);
-  assert(pg->recovery_ops_active >= 0);
-  recovery_ops_active += count;
-  pg->recovery_ops_active += count;
+  recovery_ops_active++;
+
+#ifdef DEBUG_RECOVERY_OIDS
+  dout(20) << "  active was " << recovery_oids << dendl;
+  assert(recovery_oids.count(soid) == 0);
+  recovery_oids.insert(soid);
+  assert((int)recovery_oids.size() == recovery_ops_active);
+#endif
+
   recovery_wq.unlock();
 }
 
-void OSD::finish_recovery_op(PG *pg, int count, bool dequeue)
+void OSD::finish_recovery_op(PG *pg, const sobject_t& soid, bool dequeue)
 {
-  dout(10) << "finish_recovery_op " << *pg << " count " << count
+  dout(10) << "finish_recovery_op " << *pg << " " << soid
           << " dequeue=" << dequeue
           << " (" << recovery_ops_active << "/" << g_conf.osd_recovery_max_active << " rops)"
           << dendl;
   recovery_wq.lock();
 
   // adjust count
-  recovery_ops_active -= count;
+  recovery_ops_active--;
   assert(recovery_ops_active >= 0);
-  pg->recovery_ops_active -= count;
-  assert(pg->recovery_ops_active >= 0);
+
+#ifdef DEBUG_RECOVERY_OIDS
+  dout(20) << "  active oids was " << recovery_oids << dendl;
+  assert(recovery_oids.count(soid));
+  recovery_oids.erase(soid);
+  assert((int)recovery_oids.size() == recovery_ops_active);
+#endif
 
   if (dequeue)
     pg->recovery_item.remove_myself();
index 8a0dbbf35d0fef2e3845f09cf540ac77014d2755..5ddca9c75fb7a50d113760f92374ef66a5cc2776 100644 (file)
@@ -625,6 +625,9 @@ protected:
   xlist<PG*> recovery_queue;
   utime_t defer_recovery_until;
   int recovery_ops_active;
+#ifdef DEBUG_RECOVERY_OIDS
+  set<sobject_t> recovery_oids;
+#endif
 
   struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
     OSD *osd;
@@ -671,8 +674,8 @@ protected:
   } recovery_wq;
 
   bool queue_for_recovery(PG *pg);
-  void start_recovery_op(PG *pg, int count);
-  void finish_recovery_op(PG *pg, int count, bool more);
+  void start_recovery_op(PG *pg, const sobject_t& soid);
+  void finish_recovery_op(PG *pg, const sobject_t& soid, bool dequeue);
   void defer_recovery(PG *pg);
   void do_recovery(PG *pg);
   bool _recover_now();
index 75622023520e08cb56e1b39f088f27834ee01a2e..4fba1ffcd933c24d433d55e020da9728945d45fb 100644 (file)
@@ -1543,6 +1543,39 @@ void PG::_finish_recovery(Context *c)
   put();
 }
 
+void PG::start_recovery_op(const sobject_t& soid)
+{
+  dout(10) << "start_recovery_op " << soid
+#ifdef DEBUG_RECOVERY_OIDS
+          << " (" << recovering_oids << ")"
+#endif
+          << dendl;
+  assert(recovery_ops_active >= 0);
+  recovery_ops_active++;
+#ifdef DEBUG_RECOVERY_OIDS
+  assert(recovering_oids.count(soid) == 0);
+  recovering_oids.insert(soid);
+#endif
+  osd->start_recovery_op(this, soid);
+}
+
+void PG::finish_recovery_op(const sobject_t& soid, bool dequeue)
+{
+  dout(10) << "finish_recovery_op " << soid
+#ifdef DEBUG_RECOVERY_OIDS
+          << " (" << recovering_oids << ")" 
+#endif
+          << dendl;
+  assert(recovery_ops_active > 0);
+  recovery_ops_active--;
+#ifdef DEBUG_RECOVERY_OIDS
+  assert(recovering_oids.count(soid));
+  recovering_oids.erase(soid);
+#endif
+  osd->finish_recovery_op(this, soid, dequeue);
+}
+
+
 void PG::defer_recovery()
 {
   osd->defer_recovery(this);
@@ -1555,7 +1588,13 @@ void PG::clear_recovery_state()
   log.reset_recovery_pointers();
   finish_sync_event = 0;
 
-  osd->finish_recovery_op(this, recovery_ops_active, true);
+  sobject_t soid;
+  while (recovery_ops_active > 0) {
+#ifdef DEBUG_RECOVERY_OIDS
+    soid = *recovering_oids.begin();
+#endif
+    finish_recovery_op(soid, true);
+  }
 
   _clear_recovery_state();  // pg impl specific hook
 }
@@ -1851,15 +1890,23 @@ void PG::read_log(ObjectStore *store)
        osd->get_logclient()->log(LOG_ERROR, ss);
        reorder = true;
       }
-      last = e.version;
 
-      if (e.version > log.bottom || log.backlog) { // ignore items below log.bottom
-        if (opos % 4096 < pos % 4096)
-         ondisklog.block_map[opos] = e.version;
-        log.log.push_back(e);
-      } else {
-       dout(20) << "read_log  ignoring entry at " << pos << dendl;
+      if (e.version <= log.bottom && !log.backlog) {
+       dout(20) << "read_log  ignoring entry at " << pos << " below log.bottom" << dendl;
+       continue;
       }
+      if (last.version == e.version.version) {
+       dout(0) << "read_log  got dup " << e.version << " (last was " << last << ", dropping that one)" << dendl;
+       log.log.pop_back();
+       stringstream ss;
+       ss << info.pgid << " read_log got dup " << e.version << " after " << last;
+       osd->get_logclient()->log(LOG_ERROR, ss);
+      }
+      
+      if (opos % 4096 < pos % 4096)
+       ondisklog.block_map[opos] = e.version;
+      log.log.push_back(e);
+      last = e.version;
 
       // [repair] at end of log?
       if (!p.end() && e.version == info.last_update) {
index 52ce2324a1e7753cd0f9efe7cae1bc22647b624c..2746bfa765b90cc8766fa18d09afb8ceabef0254 100644 (file)
@@ -36,6 +36,9 @@ using namespace std;
 using namespace __gnu_cxx;
 
 
+#define DEBUG_RECOVERY_OIDS   // track set of recovering oids explicitly, to find counting bugs
+
+
 class OSD;
 class MOSDOp;
 class MOSDSubOp;
@@ -651,6 +654,9 @@ public:
 
   xlist<PG*>::item recovery_item, backlog_item, scrub_item, snap_trim_item, stat_queue_item;
   int recovery_ops_active;
+#ifdef DEBUG_RECOVERY_OIDS
+  set<sobject_t> recovering_oids;
+#endif
 
   epoch_t generate_backlog_epoch;  // epoch we decided to build a backlog.
   utime_t replay_until;
@@ -778,6 +784,8 @@ public:
   void clear_recovery_state();
   virtual void _clear_recovery_state() = 0;
   void defer_recovery();
+  void start_recovery_op(const sobject_t& soid);
+  void finish_recovery_op(const sobject_t& soid, bool dequeue=false);
 
   loff_t get_log_write_pos() {
     return 0;
index 3d26a752284938b9f93cba6fe24b278467e7a0d5..0d2a4264440ff9467176567a1d82e22d69511771 100644 (file)
@@ -99,7 +99,7 @@ void ReplicatedPG::wait_for_missing_object(const sobject_t& soid, Message *m)
            << ", pulling"
            << dendl;
     pull(soid);
-    osd->start_recovery_op(this, 1);
+    start_recovery_op(soid);
   }
   waiting_for_missing_object[soid].push_back(m);
 }
@@ -617,7 +617,7 @@ void ReplicatedPG::do_op(MOSDOp *op)
       // push it before this update. 
       // FIXME, this is probably extra much work (eg if we're about to overwrite)
       push_to_replica(soid, peer);
-      osd->start_recovery_op(this, 1);
+      start_recovery_op(soid);
     }
     
     issue_repop(repop, peer, now, old_exists, old_size, old_version);
@@ -2560,7 +2560,7 @@ void ReplicatedPG::sub_op_push_reply(MOSDSubOpReply *reply)
 
     if (pushing[soid].empty()) {
       dout(10) << "pushed " << soid << " to all replicas" << dendl;
-      finish_recovery_op();
+      finish_recovery_op(soid);
     } else {
       dout(10) << "pushed " << soid << ", still waiting for push ack from " 
               << pushing[soid] << dendl;
@@ -2797,7 +2797,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
     // close out pull op?
     if (pulling.count(soid)) {
       pulling.erase(soid);
-      finish_recovery_op();
+      finish_recovery_op(soid);
     }
 
     update_stats();
@@ -2813,7 +2813,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
        assert(peer_missing.count(peer));
        if (peer_missing[peer].is_missing(soid)) {
          push_to_replica(soid, peer);  // ok, push it, and they (will) have it now.
-         osd->start_recovery_op(this, 1);
+         start_recovery_op(soid);
        }
       }
     }
@@ -2912,6 +2912,9 @@ void ReplicatedPG::on_role_change()
 void ReplicatedPG::_clear_recovery_state()
 {
   missing_loc.clear();
+#ifdef DEBUG_RECOVERY_OIDS
+  recovering_oids.clear();
+#endif
   pulling.clear();
   pushing.clear();
 }
@@ -2937,11 +2940,7 @@ int ReplicatedPG::start_recovery_ops(int max)
   return started;
 }
 
-void ReplicatedPG::finish_recovery_op()
-{
-  dout(10) << "finish_recovery_op" << dendl;
-  osd->finish_recovery_op(this, 1, false);
-}
+
 
 
 /**
@@ -3017,9 +3016,10 @@ int ReplicatedPG::recover_primary(int max)
          }
        }
        
-       if (pull(soid))
+       if (pull(soid)) {
          ++started;
-       else
+         start_recovery_op(soid);
+       } else
          ++skipped;
        if (started >= max)
          return started;
@@ -3082,6 +3082,9 @@ int ReplicatedPG::recover_replicas(int max)
     sobject_t soid = peer_missing[peer].rmissing.begin()->second;
     eversion_t v = peer_missing[peer].rmissing.begin()->first;
 
+    start_recovery_op(soid);
+    started++;
+
     push_to_replica(soid, peer);
 
     // do other peers need it too?
@@ -3092,7 +3095,7 @@ int ReplicatedPG::recover_replicas(int max)
        push_to_replica(soid, peer);
     }
 
-    if (++started >= max)
+    if (started >= max)
       return started;
   }
   
index 439010e79d9ff38660a1ae418ef88c6cd93f2be1..8cb12913c767dd9c1c8c6254d04fa4254b118bdb 100644 (file)
@@ -414,7 +414,6 @@ protected:
 
   void queue_for_recovery();
   int start_recovery_ops(int max);
-  void finish_recovery_op();
   int recover_primary(int max);
   int recover_replicas(int max);