]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD,PG: Move Op,SubOp queueing into PG
authorSamuel Just <samuel.just@dreamhost.com>
Tue, 24 Apr 2012 00:40:58 +0000 (17:40 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 5 Jul 2012 17:14:58 +0000 (10:14 -0700)
PG now handles delaying/discarding messages since pg map epoch may not
be the same as the OSD map.

Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h

index 48c2053728e2e77652c5d69949a1e6e58ba64082..5d2cdb2f600060590e6827c15f6c66dc910bced2 100644 (file)
@@ -4567,20 +4567,6 @@ void OSD::handle_pg_scan(OpRequestRef op)
   pg->put();
 }
 
-bool OSD::scan_is_queueable(PG *pg, OpRequestRef op)
-{
-  MOSDPGScan *m = (MOSDPGScan *)op->request;
-  assert(m->get_header().type == MSG_OSD_PG_SCAN);
-  assert(pg->is_locked());
-
-  if (m->query_epoch < pg->info.history.same_interval_since) {
-    dout(10) << *pg << " got old scan, ignoring" << dendl;
-    return false;
-  }
-
-  return true;
-}
-
 void OSD::handle_pg_backfill(OpRequestRef op)
 {
   MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
@@ -4612,19 +4598,6 @@ void OSD::handle_pg_backfill(OpRequestRef op)
   pg->put();
 }
 
-bool OSD::backfill_is_queueable(PG *pg, OpRequestRef op)
-{
-  MOSDPGBackfill *m = (MOSDPGBackfill *)op->request;
-  assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
-  assert(pg->is_locked());
-
-  if (m->query_epoch < pg->info.history.same_interval_since) {
-    dout(10) << *pg << " got old backfill, ignoring" << dendl;
-    return false;
-  }
-
-  return true;
-}
 
 /** PGQuery
  * from primary to replica | stray
@@ -5247,8 +5220,23 @@ void OSD::handle_op(OpRequestRef op)
       handle_misdirected_op(NULL, op);
     }
     return;
+  } else if (m->may_write() &&
+            (!pg->is_primary() ||
+             !pg->same_for_modify_since(m->get_map_epoch()))) {
+    handle_misdirected_op(pg, op);
+    pg->unlock();
+    return;
+  } else if (m->may_read() &&
+            !pg->same_for_read_since(m->get_map_epoch())) {
+    handle_misdirected_op(pg, op);
+    pg->unlock();
+    return;
+  } else if (!op_has_sufficient_caps(pg, m)) {
+    pg->unlock();
+    return;
   }
 
+
   pg->get();
   enqueue_op(pg, op);
   pg->unlock();
@@ -5363,94 +5351,11 @@ bool OSD::op_is_discardable(MOSDOp *op)
   // want to do what we can to apply it.
   if (!op->get_connection()->is_connected() &&
       op->get_version().version == 0) {
-    dout(10) << " sender " << op->get_connection()->get_peer_addr()
-            << " not connected, dropping " << *op << dendl;
     return true;
   }
   return false;
 }
 
-/*
- * Determine if we can queue the op right now; if not this deals with it.
- * If it's not queueable, we deal with it in one of a few ways:
- * dropping the request, putting it into a wait list for later, or
- * telling the sender that the request was misdirected.
- *
- * @return true if the op is queueable; false otherwise.
- */
-bool OSD::op_is_queueable(PG *pg, OpRequestRef op)
-{
-  assert(pg->is_locked());
-  MOSDOp *m = (MOSDOp*)op->request;
-  assert(m->get_header().type == CEPH_MSG_OSD_OP);
-
-  if (!op_has_sufficient_caps(pg, m)) {
-    reply_op_error(op, -EPERM);
-    return false;
-  }
-
-  if (op_is_discardable(m)) {
-    return false;
-  }
-
-  // misdirected?
-  if (m->may_write()) {
-    if (!pg->is_primary() ||
-       !pg->same_for_modify_since(m->get_map_epoch())) {
-      handle_misdirected_op(pg, op);
-      return false;
-    }
-  } else {
-    if (!pg->same_for_read_since(m->get_map_epoch())) {
-      handle_misdirected_op(pg, op);
-      return false;
-    }
-  }
-
-  if (!pg->is_active()) {
-    dout(7) << *pg << " not active (yet)" << dendl;
-    pg->waiting_for_active.push_back(op);
-    op->mark_delayed();
-    return false;
-  }
-
-  if (pg->is_replay()) {
-    if (m->get_version().version > 0) {
-      dout(7) << *pg << " queueing replay at " << m->get_version()
-             << " for " << *m << dendl;
-      pg->replay_queue[m->get_version()] = op;
-      op->mark_delayed();
-    } else {
-      dout(7) << *pg << " waiting until after replay for " << *m << dendl;
-      pg->waiting_for_active.push_back(op);
-    }
-    return false;
-  }
-
-  return true;
-}
-
-/*
- * discard operation, or return true.  no side-effects.
- */
-bool OSD::subop_is_queueable(PG *pg, OpRequestRef op)
-{
-  MOSDSubOp *m = (MOSDSubOp *)op->request;
-  assert(m->get_header().type == MSG_OSD_SUBOP);
-  assert(pg->is_locked());
-
-  // same pg?
-  //  if pg changes _at all_, we reset and repeer!
-  if (m->map_epoch < pg->info.history.same_interval_since) {
-    dout(10) << "handle_sub_op pg changed " << pg->info.history
-            << " after " << m->map_epoch
-            << ", dropping" << dendl;
-    return false;
-  }
-
-  return true;
-}
-
 /*
  * enqueue called with osd_lock held
  */
@@ -5459,42 +5364,7 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op)
   dout(15) << *pg << " enqueue_op " << op->request << " "
            << *(op->request) << dendl;
   assert(pg->is_locked());
-
-  switch (op->request->get_type()) {
-  case CEPH_MSG_OSD_OP:
-    if (!op_is_queueable(pg, op))
-      return;
-    break;
-
-  case MSG_OSD_SUBOP:
-    if (!subop_is_queueable(pg, op))
-      return;
-    break;
-
-  case MSG_OSD_SUBOPREPLY:
-    // don't care.
-    break;
-
-  case MSG_OSD_PG_SCAN:
-    if (!scan_is_queueable(pg, op))
-      return;
-    break;
-
-  case MSG_OSD_PG_BACKFILL:
-    if (!backfill_is_queueable(pg, op))
-      return;
-    break;
-
-  default:
-    assert(0 == "enqueued an illegal message type");
-  }
-
-  // add to pg's op_queue
-  pg->op_queue.push_back(op);
-  
-  op_wq.queue(pg);
-
-  op->mark_queued_for_pg();
+  pg->queue_op(op);
 }
 
 bool OSD::OpWQ::_enqueue(PG *pg)
index e88c5554f45239bfcf81926e51fc7a9fc2aaec4e..9890dceeccf11d811ad3abd124d0d6ad2c7cb700 100644 (file)
@@ -677,10 +677,8 @@ protected:
   void handle_pg_trim(OpRequestRef op);
 
   void handle_pg_scan(OpRequestRef op);
-  bool scan_is_queueable(PG *pg, OpRequestRef op);
 
   void handle_pg_backfill(OpRequestRef op);
-  bool backfill_is_queueable(PG *pg, OpRequestRef op);
 
   void handle_pg_remove(OpRequestRef op);
   void queue_pg_for_deletion(PG *pg);
@@ -1136,16 +1134,11 @@ public:
   void handle_sub_op(OpRequestRef op);
   void handle_sub_op_reply(OpRequestRef op);
 
-private:
+  static bool op_is_discardable(class MOSDOp *m);
   /// check if we can throw out op from a disconnected client
-  bool op_is_discardable(class MOSDOp *m);
   /// check if op has sufficient caps
   bool op_has_sufficient_caps(PG *pg, class MOSDOp *m);
   /// check if op should be (re)queued for processing
-  bool op_is_queueable(PG *pg, OpRequestRef op);
-  /// check if subop should be (re)queued for processing
-  bool subop_is_queueable(PG *pg, OpRequestRef op);
-
 public:
   void force_remount();
 
index 11a61cdd16f2f7d8575464b62be1b24cc5ba9033..f5b9329a6059a4fe3a5c71f64ce8fd7050b0bb60 100644 (file)
@@ -25,6 +25,8 @@
 #include "messages/MOSDPGRemove.h"
 #include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGTrim.h"
+#include "messages/MOSDPGScan.h"
+#include "messages/MOSDPGBackfill.h"
 
 #include "messages/MOSDSubOp.h"
 #include "messages/MOSDSubOpReply.h"
@@ -1461,11 +1463,22 @@ void PG::do_request(OpRequestRef op)
 {
   // do any pending flush
   do_pending_flush();
+  if (can_discard_request(op)) {
+    return;
+  } else if (must_delay_request(op)) {
+    op_waiters.push_back(op);
+    return;
+  } else if (!is_active()) {
+    waiting_for_active.push_back(op);
+    return;
+  } else if (is_replay()) {
+    waiting_for_active.push_back(op);
+    return;
+  }
 
   switch (op->request->get_type()) {
   case CEPH_MSG_OSD_OP:
-    if (!osd->op_is_discardable((MOSDOp*)op->request))
-      do_op(op); // do it now
+    do_op(op); // do it now
     break;
 
   case MSG_OSD_SUBOP:
@@ -3882,6 +3895,132 @@ ostream& operator<<(ostream& out, const PG& pg)
   return out;
 }
 
+bool PG::can_discard_op(OpRequestRef op)
+{
+  MOSDOp *m = (MOSDOp*)op->request;
+  if (OSD::op_is_discardable(m)) {
+    return true;
+  } else if (m->may_write() &&
+            (!is_primary() ||
+             !same_for_modify_since(m->get_map_epoch()))) {
+    return true;
+  } else if (m->may_read() &&
+            !same_for_read_since(m->get_map_epoch())) {
+    return true;
+  } else if (is_replay()) {
+    if (m->get_version().version > 0) {
+      dout(7) << " queueing replay at " << m->get_version()
+             << " for " << *m << dendl;
+      replay_queue[m->get_version()] = op;
+      op->mark_delayed();
+      return true;
+    }
+  }
+  return false;
+}
+
+bool PG::can_discard_subop(OpRequestRef op)
+{
+  MOSDSubOp *m = (MOSDSubOp *)op->request;
+  assert(m->get_header().type == MSG_OSD_SUBOP);
+
+  // same pg?
+  //  if pg changes _at all_, we reset and repeer!
+  if (old_peering_msg(m->map_epoch, m->map_epoch)) {
+    dout(10) << "handle_sub_op pg changed " << info.history
+            << " after " << m->map_epoch
+            << ", dropping" << dendl;
+    return true;
+  }
+  return false;
+}
+
+bool PG::can_discard_scan(OpRequestRef op)
+{
+  MOSDPGScan *m = (MOSDPGScan *)op->request;
+  assert(m->get_header().type == MSG_OSD_PG_SCAN);
+
+  if (old_peering_msg(m->map_epoch, m->query_epoch)) {
+    dout(10) << " got old scan, ignoring" << dendl;
+    return true;
+  }
+  return false;
+}
+
+bool PG::can_discard_backfill(OpRequestRef op)
+{
+  MOSDPGBackfill *m = (MOSDPGBackfill *)op->request;
+  assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
+
+  if (old_peering_msg(m->map_epoch, m->query_epoch)) {
+    dout(10) << " got old backfill, ignoring" << dendl;
+    return true;
+  }
+
+  return false;
+
+}
+
+bool PG::can_discard_request(OpRequestRef op)
+{
+  switch (op->request->get_type()) {
+  case CEPH_MSG_OSD_OP:
+    return can_discard_op(op);
+  case MSG_OSD_SUBOP:
+    return can_discard_subop(op);
+  case MSG_OSD_SUBOPREPLY:
+    return false;
+  case MSG_OSD_PG_SCAN:
+    return can_discard_scan(op);
+
+  case MSG_OSD_PG_BACKFILL:
+    return can_discard_backfill(op);
+  }
+  return true;
+}
+
+bool PG::must_delay_request(OpRequestRef op)
+{
+  switch (op->request->get_type()) {
+  case CEPH_MSG_OSD_OP:
+    return !require_same_or_newer_map(
+      static_cast<MOSDOp*>(op->request)->get_map_epoch());
+
+  case MSG_OSD_SUBOP:
+    return !require_same_or_newer_map(
+      static_cast<MOSDSubOp*>(op->request)->map_epoch);
+
+  case MSG_OSD_SUBOPREPLY:
+    return !require_same_or_newer_map(
+      static_cast<MOSDSubOpReply*>(op->request)->map_epoch);
+
+  case MSG_OSD_PG_SCAN:
+    return !require_same_or_newer_map(
+      static_cast<MOSDPGScan*>(op->request)->map_epoch);
+
+  case MSG_OSD_PG_BACKFILL:
+    return !require_same_or_newer_map(
+      static_cast<MOSDPGBackfill*>(op->request)->map_epoch);
+  }
+  assert(0);
+  return false;
+}
+
+void PG::queue_op(OpRequestRef op)
+{
+  if (can_discard_op(op))
+    return;
+  // TODO: deal with osd queueing
+  op_queue.push_back(op);
+}
+
+void PG::take_waiters()
+{
+  list<OpRequestRef> ls;
+  ls.swap(op_waiters);
+  ls.splice(ls.end(), op_queue);
+}
+
 void PG::handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx)
 {
   if (old_peering_evt(evt))
@@ -4120,6 +4259,7 @@ boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&)
   }
 
   pg->update_heartbeat_peers();
+  pg->take_waiters();
 
   return transit< Started >();
 }
index 14e5495a838e739b6126be2942b23fefbbe58aa0..58ecd5aff3997d1ea21166a5c51a0edaa6eabe80 100644 (file)
@@ -1363,12 +1363,24 @@ public:
                    pair<int, pg_info_t> &notify_info);
   void fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch);
   bool acting_up_affected(const vector<int>& newup, const vector<int>& newacting);
+
+  // OpRequest queueing
+  bool can_discard_op(OpRequestRef op);
+  bool can_discard_scan(OpRequestRef op);
+  bool can_discard_subop(OpRequestRef op);
+  bool can_discard_backfill(OpRequestRef op);
+  bool can_discard_request(OpRequestRef op);
+
+  bool must_delay_request(OpRequestRef op);
+  void queue_op(OpRequestRef op);
+
   bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
   bool old_peering_evt(CephPeeringEvtRef evt) {
     return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested());
   }
 
   // recovery bits
+  void take_waiters();
   void queue_peering_event(CephPeeringEvtRef evt);
   void handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx);
   void queue_notify(epoch_t msg_epoch, epoch_t query_epoch,