]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: reset peering, in-flight repops on every pg change
authorSage Weil <sage@newdream.net>
Fri, 23 Jan 2009 00:14:52 +0000 (16:14 -0800)
committerSage Weil <sage@newdream.net>
Fri, 23 Jan 2009 00:14:52 +0000 (16:14 -0800)
Instead of complicated (and flawed) logic for letting in-progress rep ops
complete while the PG changes (but primary remains the same), reset the
pg state entirely.  Apply any ops we have in hand, but drop everything
else, including PUSH/PULL ops.

This vastly simplifies the logic in the OSD and makes it easy to reason
about things.

Fix clients (Objecter, osd_client) to resubmit ops when PG membership
changes (not just on primary change).

src/osd/OSD.cc
src/osd/ReplicatedPG.cc
src/osdc/Objecter.cc

index c1aa6b63846a2f3cdaa2b2ce75e54ee69fb74ce5..ed5ab2ad266d1a3a2f1dbd3a20db9f1362d648ea 100644 (file)
@@ -3372,129 +3372,100 @@ void OSD::handle_op(MOSDOp *op)
     }
   }
 
-  if (!op->get_source().is_osd()) {
-    // REGULAR OP (non-replication)
-
-    // note original source
-    op->clear_payload();    // and hose encoded payload (in case we forward)
-
-    // have pg?
-    if (!pg) {
-      dout(7) << "hit non-existent pg " 
-              << pgid 
-              << ", waiting" << dendl;
-      waiting_for_pg[pgid].push_back(op);
+  // we don't need encoded payload anymore
+  op->clear_payload();
+
+  // have pg?
+  if (!pg) {
+    dout(7) << "hit non-existent pg " 
+           << pgid 
+           << ", waiting" << dendl;
+    waiting_for_pg[pgid].push_back(op);
+    return;
+  }
+  
+  // pg must be same-ish...
+  if (!op->is_modify()) {
+    // read
+    if (!pg->same_for_read_since(op->get_map_epoch())) {
+      dout(7) << "handle_rep_op pg changed " << pg->info.history
+             << " after " << op->get_map_epoch() 
+             << ", dropping" << dendl;
+      assert(op->get_map_epoch() < osdmap->get_epoch());
+      pg->unlock();
+      delete op;
       return;
     }
-
-    // pg must be same-ish...
-    if (!op->is_modify()) {
-      // read
-      if (!pg->same_for_read_since(op->get_map_epoch())) {
-       dout(7) << "handle_rep_op pg changed " << pg->info.history
-               << " after " << op->get_map_epoch() 
-               << ", dropping" << dendl;
-       assert(op->get_map_epoch() < osdmap->get_epoch());
-       pg->unlock();
-       delete op;
-       return;
-      }
-      
-      if (op->get_oid().snap > 0) {
-       // snap read.  hrm.
-       // are we missing a revision that we might need?
-       // let's get them all.
-       for (unsigned i=0; i<op->get_snaps().size(); i++) {
-         object_t oid = op->get_oid();
-         oid.snap = op->get_snaps()[i];
-         if (pg->is_missing_object(oid)) {
-           dout(10) << "handle_op _may_ need missing rev " << oid << ", pulling" << dendl;
-           pg->wait_for_missing_object(op->get_oid(), op);
-           pg->unlock();
-           return;
-         }
-       }
-      }
-
-    } else {
-      // modify
-      if ((!pg->is_primary() ||
-          !pg->same_for_modify_since(op->get_map_epoch()))) {
-       dout(7) << "handle_op pg changed " << pg->info.history
-               << " after " << op->get_map_epoch() 
-               << ", dropping" << dendl;
-       assert(op->get_map_epoch() < osdmap->get_epoch());
-       pg->unlock();
-       delete op;
-       return;
-      }
-
-      // scrubbing?
-      if (pg->is_scrubbing()) {
-       dout(10) << *pg << " is scrubbing, deferring op " << *op << dendl;
-       pg->waiting_for_active.push_back(op);
-       pg->unlock();
-       return;
-      }
-    }
     
-    // pg must be active.
-    if (!pg->is_active()) {
-      // replay?
-      if (op->get_version().version > 0) {
-        if (op->get_version() > pg->info.last_update) {
-          dout(7) << *pg << " queueing replay at " << op->get_version()
-                  << " for " << *op << dendl;
-          pg->replay_queue[op->get_version()] = op;
+    if (op->get_oid().snap > 0) {
+      // snap read.  hrm.
+      // are we missing a revision that we might need?
+      // let's get them all.
+      for (unsigned i=0; i<op->get_snaps().size(); i++) {
+       object_t oid = op->get_oid();
+       oid.snap = op->get_snaps()[i];
+       if (pg->is_missing_object(oid)) {
+         dout(10) << "handle_op _may_ need missing rev " << oid << ", pulling" << dendl;
+         pg->wait_for_missing_object(op->get_oid(), op);
          pg->unlock();
-          return;
-        } else {
-          dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update 
-                  << " for " << *op
-                  << ", will queue for WRNOOP" << dendl;
-        }
+         return;
+       }
       }
-      
-      dout(7) << *pg << " not active (yet)" << dendl;
-      pg->waiting_for_active.push_back(op);
-      pg->unlock();
-      return;
     }
     
-    // missing object?
-    if (pg->is_missing_object(op->get_oid())) {
-      pg->wait_for_missing_object(op->get_oid(), op);
-      pg->unlock();
-      return;
-    }
-
-    dout(10) << "handle_op " << *op << " in " << *pg << dendl;
-
   } else {
-    // REPLICATION OP (it's from another OSD)
-
-    // have pg?
-    if (!pg) {
-      derr(-7) << "handle_rep_op " << *op 
-               << " pgid " << pgid << " dne" << dendl;
+    // modify
+    if ((!pg->is_primary() ||
+        !pg->same_for_modify_since(op->get_map_epoch()))) {
+      dout(7) << "handle_op pg changed " << pg->info.history
+             << " after " << op->get_map_epoch() 
+             << ", dropping" << dendl;
+      assert(op->get_map_epoch() < osdmap->get_epoch());
+      pg->unlock();
       delete op;
-      //assert(0); // wtf, shouldn't happen.
       return;
     }
     
-    // check osd map: same set, or primary+acker?
-    if (!pg->same_for_rep_modify_since(op->get_map_epoch())) {
-      dout(10) << "handle_rep_op pg changed " << pg->info.history
-               << " after " << op->get_map_epoch() 
-               << ", dropping" << dendl;
+    // scrubbing?
+    if (pg->is_scrubbing()) {
+      dout(10) << *pg << " is scrubbing, deferring op " << *op << dendl;
+      pg->waiting_for_active.push_back(op);
       pg->unlock();
-      delete op;
       return;
     }
-
-    assert(pg->get_role() >= 0);
-    dout(7) << "handle_rep_op " << op << " in " << *pg << dendl;
   }
+  
+  // pg must be active.
+  if (!pg->is_active()) {
+    // replay?
+    if (op->get_version().version > 0) {
+      if (op->get_version() > pg->info.last_update) {
+       dout(7) << *pg << " queueing replay at " << op->get_version()
+               << " for " << *op << dendl;
+       pg->replay_queue[op->get_version()] = op;
+       pg->unlock();
+       return;
+      } else {
+       dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update 
+               << " for " << *op
+               << ", will queue for WRNOOP" << dendl;
+      }
+    }
+    
+    dout(7) << *pg << " not active (yet)" << dendl;
+    pg->waiting_for_active.push_back(op);
+    pg->unlock();
+    return;
+  }
+  
+  // missing object?
+  if (pg->is_missing_object(op->get_oid())) {
+    pg->wait_for_missing_object(op->get_oid(), op);
+    pg->unlock();
+    return;
+  }
+  
+  dout(10) << "handle_op " << *op << " in " << *pg << dendl;
 
   // proprocess op? 
   if (pg->preprocess_op(op, now)) {
@@ -3554,6 +3525,18 @@ void OSD::handle_sub_op(MOSDSubOp *op)
   } 
 
   PG *pg = _lookup_lock_pg(pgid);
+
+  // same pg?
+  //  if pg changes _at all_, we reset and repeer!
+  if (op->map_epoch < pg->info.history.same_since) {
+    dout(10) << "handle_sub_op pg changed " << pg->info.history
+            << " after " << op->map_epoch 
+            << ", dropping" << dendl;
+    pg->unlock();
+    delete op;
+    return;
+  }
+
   if (g_conf.osd_maxthreads < 1) {
     pg->do_sub_op(op);    // do it now
   } else {
index dbb4777dc3916f070440b132aa52f2fdf027058a..c679cafdf1e3bcc429a006f49515403d58057214 100644 (file)
@@ -1764,17 +1764,8 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
           << dendl;  
 
   // sanity checks
-  if (op->map_epoch < info.history.same_primary_since) {
-    dout(10) << "sub_op_modify discarding old sub_op from "
-            << op->map_epoch << " < " << info.history.same_primary_since << dendl;
-    delete op;
-    return;
-  }
-  if (!is_active()) {
-    dout(10) << "sub_op_modify not active" << dendl;
-    delete op;
-    return;
-  }
+  assert(op->map_epoch >= info.history.same_primary_since);
+  assert(is_active());
   assert(is_replica());
   
   // note peer's stat
@@ -2246,16 +2237,8 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
           << " from " << op->get_source()
           << dendl;
 
-  if (op->map_epoch < info.history.same_primary_since) {
-    dout(10) << "sub_op_pull discarding old sub_op from "
-            << op->map_epoch << " < " << info.history.same_primary_since << dendl;
-    delete op;
-    return;
-  }
-
   assert(!is_primary());  // we should be a replica or stray.
 
-  // push it back!
   push(poid, op->get_source().num(), op->data_subset, op->clone_subsets);
   delete op;
 }
@@ -2282,24 +2265,6 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
   interval_set<__u64> data_subset;
   map<pobject_t, interval_set<__u64> > clone_subsets;
 
-  if (!is_primary()) {
-    // non-primary should only accept pushes from the current primary.
-    if (op->map_epoch < info.history.same_primary_since) {
-      dout(10) << "sub_op_push discarding old sub_op from "
-              << op->map_epoch << " < " << info.history.same_primary_since << dendl;
-      delete op;
-      return;
-    }
-    // FIXME: actually, no, what i really want here is a personal "same_role_since"
-    if (!is_active()) {
-      dout(10) << "sub_op_push not active" << dendl;
-      delete op;
-      return;
-    }
-  } else {
-    // primary will accept pushes anytime.
-  }
-
   // are we missing (this specific version)?
   //  (if version is wrong, it is either old (we don't want it) or 
   //   newer (peering is buggy))
@@ -2498,31 +2463,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
 
 void ReplicatedPG::on_osd_failure(int o)
 {
-  dout(10) << "on_osd_failure " << o << dendl;
-
-  // artificially ack failed osds
-  xlist<RepGather*>::iterator p = repop_queue.begin();
-  while (!p.end()) {
-    RepGather *repop = *p;
-    ++p;
-    dout(-1) << " artificialling acking repop tid " << repop->rep_tid << dendl;
-    if (repop->waitfor_ack.count(o) ||
-       repop->waitfor_nvram.count(o) ||
-       repop->waitfor_disk.count(o))
-      repop_ack(repop, -EIO, CEPH_OSD_OP_ONDISK, o);
-  }
-  
-  // remove from pushing map
-  {
-    map<object_t, pair<eversion_t,int> >::iterator p = pulling.begin();
-    while (p != pulling.end())
-      if (p->second.second == o) {
-       dout(10) << " forgetting pull of " << p->first << " " << p->second.first
-                << " from osd" << o << dendl;
-       pulling.erase(p++);
-      } else
-       p++;
-  }
+  //dout(10) << "on_osd_failure " << o << dendl;
 }
 
 void ReplicatedPG::on_shutdown()
@@ -2548,47 +2489,20 @@ void ReplicatedPG::on_change()
 {
   dout(10) << "on_change" << dendl;
 
-  // apply all local repops
-  //  (pg is inactive; we will repeer)
-  for (xlist<RepGather*>::iterator p = repop_queue.begin();
-       !p.end(); ++p)
-    if (!(*p)->applied)
-      apply_repop(*p);
-
-  xlist<RepGather*>::iterator p = repop_queue.begin(); 
-  while (!p.end()) {
-    RepGather *repop = *p;
-    ++p;
-
-    if (!is_primary()) {
-      // no longer primary.  hose repops.
-      dout(-1) << " aborting repop tid " << repop->rep_tid << dendl;
-      repop->aborted = true;
-      repop->queue_item.remove_myself();
-      repop_map.erase(repop->rep_tid);
-      repop->put();
-    } else {
-      // still primary. artificially ack+commit any replicas who dropped out of the pg
-      dout(-1) << " checking for dropped osds on repop tid " << repop->rep_tid << dendl;
-      set<int> all;
-      set_union(repop->waitfor_disk.begin(), repop->waitfor_disk.end(),
-               repop->waitfor_ack.begin(), repop->waitfor_ack.end(),
-               inserter(all, all.begin()));
-      for (set<int>::iterator q = all.begin(); q != all.end(); q++) {
-       if (*q == osd->get_nodeid())
-         continue;
-       bool have = false;
-       for (unsigned i=1; i<acting.size(); i++)
-         if (acting[i] == *q) 
-           have = true;
-       if (!have)
-         repop_ack(repop, -EIO, CEPH_OSD_OP_ONDISK, *q);
-      }
-    }
+  // apply all repops
+  while (!repop_queue.empty()) {
+    RepGather *repop = repop_queue.front();
+    repop_queue.pop_front();
+    dout(10) << " applying repop tid " << repop->rep_tid << dendl;
+    if (!repop->applied)
+      apply_repop(repop);
+    repop->aborted = true;
+    repop->put();
   }
   
-  // clear pushing map
+  // clear pushing/pulling maps
   pushing.clear();
+  pulling.clear();
 }
 
 void ReplicatedPG::on_role_change()
index 43ed4d20cae095655e39a33b44a95200f8ff8295..333635e0e6c1677f22a2e1be00d6ec7a5899b940 100644 (file)
@@ -215,25 +215,6 @@ void Objecter::scan_pgs(set<pg_t>& changed_pgs)
     
     other.swap(pg.acting);
 
-    if (g_conf.osd_rep == OSD_REP_PRIMARY) {
-      // same primary?
-      if (!other.empty() &&
-          !pg.acting.empty() &&
-          other[0] == pg.acting[0]) 
-        continue;
-    }
-    else if (g_conf.osd_rep == OSD_REP_SPLAY) {
-      // same primary and acker?
-      if (!other.empty() &&
-          !pg.acting.empty() &&
-          other[0] == pg.acting[0] &&
-          other[other.size() > 1 ? 1:0] == pg.acting[pg.acting.size() > 1 ? 1:0]) 
-        continue;
-    }
-    else if (g_conf.osd_rep == OSD_REP_CHAIN) {
-      // any change is significant.
-    }
-    
     // changed significantly.
     dout(10) << "scan_pgs pg " << pgid 
              << " (" << pg.active_tids << ")"