]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: move queue checks into enqueue_op, kill _handle_ helpers
authorSage Weil <sage.weil@dreamhost.com>
Tue, 25 Oct 2011 05:13:59 +0000 (22:13 -0700)
committerSage Weil <sage.weil@dreamhost.com>
Tue, 25 Oct 2011 05:13:59 +0000 (22:13 -0700)
This simplifies things, and renames the checks to make it clear that we are
doing validation checks only, with no side-effects allowed.

Also move some checks into the parent handle_op() to further simplify the
(re)queue checks.

Signed-off-by: Sage Weil <sage.weil@dreamhost.com>
src/osd/OSD.cc
src/osd/OSD.h

index 85a7358c1f23ad7dbdadc3179cb5bdf7a5845444..829f3e79b2d7a4e2b6fbcf7d20d3d55d115fd8fe 100644 (file)
@@ -5152,6 +5152,9 @@ void OSD::handle_op(MOSDOp *op)
     return;
   }
 
+  // we don't need encoded payload anymore
+  op->clear_payload();
+
   // require same or newer map
   if (!require_same_or_newer_map(op, op->get_map_epoch()))
     return;
@@ -5171,6 +5174,27 @@ void OSD::handle_op(MOSDOp *op)
     return;
   }
 
+  // full?
+  if (osdmap->test_flag(CEPH_OSDMAP_FULL) &&
+      !op->get_source().is_mds()) {  // FIXME: we'll exclude mds writes for now.
+    reply_op_error(op, -ENOSPC);
+    return;
+  }
+
+  // invalid?
+  if (op->get_snapid() != CEPH_NOSNAP) {
+    reply_op_error(op, -EINVAL);
+    return;
+  }
+
+  // too big?
+  if (g_conf->osd_max_write_size &&
+      op->get_data_len() > g_conf->osd_max_write_size << 20) {
+    // journal can't hold commit!
+    reply_op_error(op, -OSD_WRITETOOBIG);
+    return;
+  }
+
   // share our map with sender, if they're old
   _share_map_incoming(op->get_source_inst(), op->get_map_epoch(),
                      (Session *)op->get_connection()->get_priv());
@@ -5198,7 +5222,7 @@ void OSD::handle_op(MOSDOp *op)
   }
 
   pg->get();
-  _handle_op(pg, op);
+  enqueue_op(pg, op);
   pg->unlock();
   pg->put();
 }
@@ -5234,77 +5258,6 @@ bool OSD::op_has_sufficient_caps(PG *pg, MOSDOp *op)
   return true;
 }
 
-/*
- * queue an operation, or discard it.  avoid side-effects or any "real" work.
- */
-void OSD::_handle_op(PG *pg, MOSDOp *op)
-{
-  dout(10) << *pg << " _handle_op " << op << " " << *op << dendl;
-  assert(pg->is_locked());
-
-  if (!op_has_sufficient_caps(pg, op)) {
-    reply_op_error(op, -EPERM);
-    return;
-  }
-
-  // we don't need encoded payload anymore
-  op->clear_payload();
-  if (op->may_write()) {
-    // misdirected?
-    if (!pg->is_primary() ||
-       !pg->same_for_modify_since(op->get_map_epoch())) {
-      handle_misdirected_op(pg, op);
-      return;
-    }
-
-    // full?
-    if (osdmap->test_flag(CEPH_OSDMAP_FULL) &&
-       !op->get_source().is_mds()) {  // FIXME: we'll exclude mds writes for now.
-      reply_op_error(op, -ENOSPC);
-      return;
-    }
-
-    // invalid?
-    if (op->get_snapid() != CEPH_NOSNAP) {
-      reply_op_error(op, -EINVAL);
-      return;
-    }
-
-    // too big?
-    if (g_conf->osd_max_write_size &&
-        op->get_data_len() > g_conf->osd_max_write_size << 20) {
-      // journal can't hold commit!
-      reply_op_error(op, -OSD_WRITETOOBIG);
-      return;
-    }
-  } else {
-    // misdirected?
-    if (!pg->same_for_read_since(op->get_map_epoch())) {
-      handle_misdirected_op(pg, op);
-      return;
-    }
-  }
-
-  // pg must be active.
-  if (!pg->is_active()) {
-    dout(7) << *pg << " not active (yet)" << dendl;
-    pg->waiting_for_active.push_back(op);
-    return;
-  }
-  if (pg->is_replay()) {
-    if (op->get_version().version > 0) {
-      dout(7) << *pg << " queueing replay at " << op->get_version()
-             << " for " << *op << dendl;
-      pg->replay_queue[op->get_version()] = op;
-      return;
-    }
-  }
-
-  enqueue_op(pg, op);         
-}
-
-
 void OSD::handle_sub_op(MOSDSubOp *op)
 {
   dout(10) << "handle_sub_op " << *op << " epoch " << op->map_epoch << dendl;
@@ -5337,32 +5290,11 @@ void OSD::handle_sub_op(MOSDSubOp *op)
     return;
   }
   pg->get();
-  _handle_sub_op(pg, op);
+  enqueue_op(pg, op);
   pg->unlock();
   pg->put();
 }
 
-/*
- * queue an operation, or discard it.  avoid side-effects or any "real" work.
- */
-void OSD::_handle_sub_op(PG *pg, MOSDSubOp *op)
-{
-  dout(10) << *pg << " _handle_sub_op " << op << " " << *op << dendl;
-  assert(pg->is_locked());
-
-  // same pg?
-  //  if pg changes _at all_, we reset and repeer!
-  if (op->map_epoch < pg->info.history.same_interval_since) {
-    dout(10) << "handle_sub_op pg changed " << pg->info.history
-            << " after " << op->map_epoch 
-            << ", dropping" << dendl;
-    op->put();
-    return;
-  }
-
-  enqueue_op(pg, op);     // queue for worker threads
-}
-
 void OSD::handle_sub_op_reply(MOSDSubOpReply *op)
 {
   if (op->get_map_epoch() < up_epoch) {
@@ -5393,21 +5325,74 @@ void OSD::handle_sub_op_reply(MOSDSubOpReply *op)
     return;
   }
   pg->get();
-  _handle_sub_op_reply(pg, op);
+  enqueue_op(pg, op);
   pg->unlock();
   pg->put();
 }
 
 /*
- * queue an operation, or discard it.  avoid side-effects or any "real" work.
+ * discard operation, or return true.  no side-effects.
  */
-void OSD::_handle_sub_op_reply(PG *pg, MOSDSubOpReply *op)
+bool OSD::op_is_queueable(PG *pg, MOSDOp *op)
 {
-  dout(10) << *pg << " _handle_sub_op_reply " << op << " " << *op << dendl;
   assert(pg->is_locked());
-  enqueue_op(pg, op);     // queue for worker threads
+
+  if (!op_has_sufficient_caps(pg, op)) {
+    reply_op_error(op, -EPERM);
+    return false;
+  }
+
+  // misdirected?
+  if (op->may_write()) {
+    if (!pg->is_primary() ||
+       !pg->same_for_modify_since(op->get_map_epoch())) {
+      handle_misdirected_op(pg, op);
+      return false;
+    }
+  } else {
+    if (!pg->same_for_read_since(op->get_map_epoch())) {
+      handle_misdirected_op(pg, op);
+      return false;
+    }
+  }
+
+  if (!pg->is_active()) {
+    dout(7) << *pg << " not active (yet)" << dendl;
+    pg->waiting_for_active.push_back(op);
+    return false;
+  }
+
+  if (pg->is_replay()) {
+    if (op->get_version().version > 0) {
+      dout(7) << *pg << " queueing replay at " << op->get_version()
+             << " for " << *op << dendl;
+      pg->replay_queue[op->get_version()] = op;
+      return false;
+    }
+  }
+
+  return true;
 }
 
+/*
+ * discard operation, or return true.  no side-effects.
+ */
+bool OSD::subop_is_queueable(PG *pg, MOSDSubOp *op)
+{
+  assert(pg->is_locked());
+
+  // same pg?
+  //  if pg changes _at all_, we reset and repeer!
+  if (op->map_epoch < pg->info.history.same_interval_since) {
+    dout(10) << "handle_sub_op pg changed " << pg->info.history
+            << " after " << op->map_epoch 
+            << ", dropping" << dendl;
+    op->put();
+    return false;
+  }
+
+  return true;
+}
 
 /*
  * enqueue called with osd_lock held
@@ -5416,6 +5401,26 @@ void OSD::enqueue_op(PG *pg, Message *op)
 {
   dout(15) << *pg << " enqueue_op " << op << " " << *op << dendl;
   assert(pg->is_locked());
+
+  switch (op->get_type()) {
+  case CEPH_MSG_OSD_OP:
+    if (!op_is_queueable(pg, (MOSDOp*)op))
+      return;
+    break;
+
+  case MSG_OSD_SUBOP:
+    if (!subop_is_queueable(pg, (MOSDSubOp*)op))
+      return;
+    break;
+
+  case MSG_OSD_SUBOPREPLY:
+    // don't care.
+    break;
+
+  default:
+    assert(0 == "enqueued an illegal message type");
+  }
+
   // add to pg's op_queue
   pg->op_queue.push_back(op);
   pending_ops++;
@@ -5448,20 +5453,7 @@ void OSD::requeue_ops(PG *pg, list<Message*>& ls)
   while (!q.empty()) {
     Message *op = q.front();
     q.pop_front();
-
-    switch (op->get_type()) {
-    case CEPH_MSG_OSD_OP:
-      _handle_op(pg, (MOSDOp*)op);
-      break;
-    case MSG_OSD_SUBOP:
-      _handle_sub_op(pg, (MOSDSubOp*)op);
-      break;
-    case MSG_OSD_SUBOPREPLY:
-      _handle_sub_op_reply(pg, (MOSDSubOpReply*)op);
-      break;
-    default:
-      assert(0 == "requeued an illegal type");
-    }
+    enqueue_op(pg, op);
   }
 
   // put orig queue contents back in line, after the stuff we requeued.
index 3d2191cbdc317d181d7710c8649dfcfb0e6fdee5..1cd083bd65052f51cae12484e1f48ede530c3e81 100644 (file)
@@ -1094,14 +1094,8 @@ public:
 private:
   /// check if op has sufficient caps
   bool op_has_sufficient_caps(PG *pg, class MOSDOp *m);
-
-  /*
-   * these locked helpers assume pg is locked, and will do fina
-   * checks before enqueuing the given operation.
-   */
-  void _handle_op(PG *pg, class MOSDOp *m);
-  void _handle_sub_op(PG *pg, class MOSDSubOp *m);
-  void _handle_sub_op_reply(PG *pg, class MOSDSubOpReply *m);
+  bool op_is_queueable(PG *pg, class MOSDOp *m);
+  bool subop_is_queueable(PG *pg, class MOSDSubOp *m);
 
 public:
   void force_remount();