]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
PG,ReplicatedPG: handle do_request in ReplicatedPG,PGBackend
authorSamuel Just <sam.just@inktank.com>
Fri, 30 Aug 2013 01:16:55 +0000 (18:16 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 26 Sep 2013 18:24:25 +0000 (11:24 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index f319d160a39f51e2447d7038a720f01cffbd19c4..919d3e3913a085a1570c17a6f3b0a923eaf5b404 100644 (file)
@@ -1399,76 +1399,6 @@ void PG::queue_op(OpRequestRef op)
   osd->op_wq.queue(make_pair(PGRef(this), op));
 }
 
-void PG::do_request(
-  OpRequestRef op,
-  ThreadPool::TPHandle &handle)
-{
-  // do any pending flush
-  do_pending_flush();
-
-  if (!op_has_sufficient_caps(op)) {
-    osd->reply_op_error(op, -EPERM);
-    return;
-  }
-  assert(!op_must_wait_for_map(get_osdmap(), op));
-  if (can_discard_request(op)) {
-    return;
-  }
-  if (!flushed) {
-    dout(20) << " !flushed, waiting for active on " << op << dendl;
-    waiting_for_active.push_back(op);
-    return;
-  }
-
-  switch (op->request->get_type()) {
-  case CEPH_MSG_OSD_OP:
-    if (is_replay() || !is_active()) {
-      dout(20) << " replay, waiting for active on " << op << dendl;
-      waiting_for_active.push_back(op);
-      return;
-    }
-    do_op(op); // do it now
-    break;
-
-  case MSG_OSD_SUBOP:
-    do_sub_op(op);
-    break;
-
-  case MSG_OSD_SUBOPREPLY:
-    do_sub_op_reply(op);
-    break;
-
-  case MSG_OSD_PG_SCAN:
-    do_scan(op, handle);
-    break;
-
-  case MSG_OSD_PG_BACKFILL:
-    do_backfill(op);
-    break;
-
-  case MSG_OSD_PG_PUSH:
-    if (!is_active()) {
-      waiting_for_active.push_back(op);
-      op->mark_delayed("waiting for active");
-      return;
-    }
-    do_push(op);
-    break;
-
-  case MSG_OSD_PG_PULL:
-    do_pull(op);
-    break;
-
-  case MSG_OSD_PG_PUSH_REPLY:
-    do_push_reply(op);
-    break;
-
-  default:
-    assert(0 == "bad message type in do_request");
-  }
-}
-
-
 void PG::replay_queued_ops()
 {
   assert(is_replay() && is_active());
index b869a0e5e23033b36b431790f5129964b643fe8f..74809eea2684b5faab32d0f5c3efc529e9eaa00f 100644 (file)
@@ -1793,10 +1793,10 @@ public:
 
 
   // abstract bits
-  void do_request(
+  virtual void do_request(
     OpRequestRef op,
     ThreadPool::TPHandle &handle
-  );
+  ) = 0;
 
   virtual void do_op(OpRequestRef op) = 0;
   virtual void do_sub_op(OpRequestRef op) = 0;
@@ -1806,9 +1806,6 @@ public:
     ThreadPool::TPHandle &handle
   ) = 0;
   virtual void do_backfill(OpRequestRef op) = 0;
-  virtual void do_push(OpRequestRef op) = 0;
-  virtual void do_pull(OpRequestRef op) = 0;
-  virtual void do_push_reply(OpRequestRef op) = 0;
   virtual void snap_trimmer() = 0;
 
   virtual int do_command(cmdmap_t cmdmap, ostream& ss,
index d020b18d90128bc4bc5b4b1dd8b4128051d3bb8c..da57630e78ba3dfb6b5c638ca9bb557b84072208 100644 (file)
@@ -92,12 +92,14 @@ bool ReplicatedBackend::handle_message(
         // TODOXXX: needs to be active possibly
        sub_op_push(op);
        return true;
+      default:
+       break;
       }
     }
     break;
   }
 
-  case MSG_OSD_SUBOPREPLY:
+  case MSG_OSD_SUBOPREPLY: {
     MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
     if (r->ops.size() >= 1) {
       OSDOp &first = r->ops[0];
@@ -110,6 +112,10 @@ bool ReplicatedBackend::handle_message(
     }
     break;
   }
+
+  default:
+    break;
+  }
   return false;
 }
 
index 787a6082b455737cef820e4cbd39ba8e7fa0a92a..f7bcdd2949b113fadcb7e323d878e2a2b7748654 100644 (file)
@@ -643,6 +643,62 @@ void ReplicatedPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
     src_oloc.key = oid.name;
 }
 
+void ReplicatedPG::do_request(
+  OpRequestRef op,
+  ThreadPool::TPHandle &handle)
+{
+  // do any pending flush
+  do_pending_flush();
+
+  if (!op_has_sufficient_caps(op)) {
+    osd->reply_op_error(op, -EPERM);
+    return;
+  }
+  assert(!op_must_wait_for_map(get_osdmap(), op));
+  if (can_discard_request(op)) {
+    return;
+  }
+  if (!flushed) {
+    dout(20) << " !flushed, waiting for active on " << op << dendl;
+    waiting_for_active.push_back(op);
+    return;
+  }
+
+  if (pgbackend->handle_message(op))
+    return;
+
+  switch (op->request->get_type()) {
+  case CEPH_MSG_OSD_OP:
+    if (is_replay() || !is_active()) {
+      dout(20) << " replay, waiting for active on " << op << dendl;
+      waiting_for_active.push_back(op);
+      return;
+    }
+    do_op(op); // do it now
+    break;
+
+  case MSG_OSD_SUBOP:
+    do_sub_op(op);
+    break;
+
+  case MSG_OSD_SUBOPREPLY:
+    do_sub_op_reply(op);
+    break;
+
+  case MSG_OSD_PG_SCAN:
+    do_scan(op, handle);
+    break;
+
+  case MSG_OSD_PG_BACKFILL:
+    do_backfill(op);
+    break;
+
+  default:
+    assert(0 == "bad message type in do_request");
+  }
+}
+
+
 /** do_op - do an op
  * pg lock will be held (if multithreaded)
  * osd_lock NOT held.
@@ -1258,11 +1314,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op)
   OSDOp *first = NULL;
   if (m->ops.size() >= 1) {
     first = &m->ops[0];
-    switch (first->op.op) {
-    case CEPH_OSD_OP_PULL:
-      sub_op_pull(op);
-      return;
-    }
   }
 
   if (!is_active()) {
@@ -1273,9 +1324,6 @@ void ReplicatedPG::do_sub_op(OpRequestRef op)
 
   if (first) {
     switch (first->op.op) {
-    case CEPH_OSD_OP_PUSH:
-      sub_op_push(op);
-      return;
     case CEPH_OSD_OP_DELETE:
       sub_op_remove(op);
       return;
@@ -1304,11 +1352,6 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
   if (r->ops.size() >= 1) {
     OSDOp& first = r->ops[0];
     switch (first.op.op) {
-    case CEPH_OSD_OP_PUSH:
-      // continue peer recovery
-      sub_op_push_reply(op);
-      return;
-
     case CEPH_OSD_OP_SCRUB_RESERVE:
       sub_op_scrub_reserve_reply(op);
       return;
index 32300222b05d4b2a7063d03053e9bfeefa19cd40..24f001b0fba6b21816e4a2c7e374f59f47bcc928 100644 (file)
@@ -903,6 +903,9 @@ public:
   int do_command(cmdmap_t cmdmap, ostream& ss, bufferlist& idata,
                 bufferlist& odata);
 
+  void do_request(
+    OpRequestRef op,
+    ThreadPool::TPHandle &handle);
   void do_op(OpRequestRef op);
   bool pg_op_must_wait(MOSDOp *op);
   void do_pg_op(OpRequestRef op);