]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: add handlers for MOSDPG(Push|Pull|PushReply)
authorSamuel Just <sam.just@inktank.com>
Mon, 17 Jun 2013 23:26:31 +0000 (16:26 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 8 Jul 2013 23:43:32 +0000 (16:43 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.cc

index d925ff2472075316192495c245a3b36624d98333..cdea6b28166fd5fac5c638f4c38a78bc7297182e 100644 (file)
@@ -30,6 +30,9 @@
 #include "messages/MOSDPGBackfill.h"
 #include "messages/MBackfillReserve.h"
 #include "messages/MRecoveryReserve.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGPull.h"
 
 #include "messages/MOSDSubOp.h"
 #include "messages/MOSDSubOpReply.h"
@@ -1429,6 +1432,23 @@ void PG::do_request(OpRequestRef op)
     do_backfill(op);
     break;
 
+  case MSG_OSD_PG_PUSH:
+    if (!is_active()) {
+      waiting_for_active.push_back(op);
+      op->mark_delayed("waiting for active");
+      return;
+    }
+    do_push(op);
+    break;
+
+  case MSG_OSD_PG_PULL:
+    do_pull(op);
+    break;
+
+  case MSG_OSD_PG_PUSH_REPLY:
+    do_push_reply(op);
+    break;
+
   default:
     assert(0 == "bad message type in do_request");
   }
@@ -4788,15 +4808,16 @@ bool PG::can_discard_op(OpRequestRef op)
   return false;
 }
 
-bool PG::can_discard_subop(OpRequestRef op)
+template<typename T, int MSGTYPE>
+bool PG::can_discard_replica_op(OpRequestRef op)
 {
-  MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
-  assert(m->get_header().type == MSG_OSD_SUBOP);
+  T *m = static_cast<T *>(op->request);
+  assert(m->get_header().type == MSGTYPE);
 
   // same pg?
   //  if pg changes _at all_, we reset and repeer!
   if (old_peering_msg(m->map_epoch, m->map_epoch)) {
-    dout(10) << "handle_sub_op pg changed " << info.history
+    dout(10) << "can_discard_replica_op pg changed " << info.history
             << " after " << m->map_epoch
             << ", dropping" << dendl;
     return true;
@@ -4836,7 +4857,13 @@ bool PG::can_discard_request(OpRequestRef op)
   case CEPH_MSG_OSD_OP:
     return can_discard_op(op);
   case MSG_OSD_SUBOP:
-    return can_discard_subop(op);
+    return can_discard_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op);
+  case MSG_OSD_PG_PUSH:
+    return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
+  case MSG_OSD_PG_PULL:
+    return can_discard_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
+  case MSG_OSD_PG_PUSH_REPLY:
+    return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
   case MSG_OSD_SUBOPREPLY:
     return false;
   case MSG_OSD_PG_SCAN:
@@ -4854,14 +4881,6 @@ bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits)
   switch (op->request->get_type()) {
   case CEPH_MSG_OSD_OP:
     return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match;
-  case MSG_OSD_SUBOP:
-    return false;
-  case MSG_OSD_SUBOPREPLY:
-    return false;
-  case MSG_OSD_PG_SCAN:
-    return false;
-  case MSG_OSD_PG_BACKFILL:
-    return false;
   }
   return false;
 }
@@ -4893,6 +4912,21 @@ bool PG::op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op)
     return !have_same_or_newer_map(
       curmap,
       static_cast<MOSDPGBackfill*>(op->request)->map_epoch);
+
+  case MSG_OSD_PG_PUSH:
+    return !have_same_or_newer_map(
+      curmap,
+      static_cast<MOSDPGPush*>(op->request)->map_epoch);
+
+  case MSG_OSD_PG_PULL:
+    return !have_same_or_newer_map(
+      curmap,
+      static_cast<MOSDPGPull*>(op->request)->map_epoch);
+
+  case MSG_OSD_PG_PUSH_REPLY:
+    return !have_same_or_newer_map(
+      curmap,
+      static_cast<MOSDPGPushReply*>(op->request)->map_epoch);
   }
   assert(0);
   return false;
index c4e113bb9e8d74814b66e4847e8269656b205fc4..2239e6b1275851ebc41e0299fb707d673d1afb03 100644 (file)
@@ -1723,10 +1723,12 @@ public:
   // OpRequest queueing
   bool can_discard_op(OpRequestRef op);
   bool can_discard_scan(OpRequestRef op);
-  bool can_discard_subop(OpRequestRef op);
   bool can_discard_backfill(OpRequestRef op);
   bool can_discard_request(OpRequestRef op);
 
+  template<typename T, int MSGTYPE>
+  bool can_discard_replica_op(OpRequestRef op);
+
   static bool op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op);
 
   static bool split_request(OpRequestRef op, unsigned match, unsigned bits);
@@ -1778,6 +1780,9 @@ public:
   virtual void do_sub_op_reply(OpRequestRef op) = 0;
   virtual void do_scan(OpRequestRef op) = 0;
   virtual void do_backfill(OpRequestRef op) = 0;
+  virtual void do_push(OpRequestRef op) = 0;
+  virtual void do_pull(OpRequestRef op) = 0;
+  virtual void do_push_reply(OpRequestRef op) = 0;
   virtual void snap_trimmer() = 0;
 
   virtual int do_command(vector<string>& cmd, ostream& ss,
index d127ae09e0e80f7422c0c4c78bd3df50bb547ea9..a297f5df17118b25774a7fe993be30467fbcdabb 100644 (file)
 #include "messages/MOSDPing.h"
 #include "messages/MWatchNotify.h"
 
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPushReply.h"
+
 #include "Watch.h"
 
 #include "mds/inode_backtrace.h" // Ugh
@@ -1333,6 +1337,103 @@ void ReplicatedPG::do_scan(OpRequestRef op)
   }
 }
 
+void ReplicatedPG::_do_push(OpRequestRef op)
+{
+  MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
+  assert(m->get_header().type == MSG_OSD_PG_PUSH);
+  int from = m->get_source().num();
+
+  vector<PushReplyOp> replies;
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  for (vector<PushOp>::iterator i = m->pushes.begin();
+       i != m->pushes.end();
+       ++i) {
+    replies.push_back(PushReplyOp());
+    handle_push(from, *i, &(replies.back()), t);
+  }
+
+  MOSDPGPushReply *reply = new MOSDPGPushReply;
+  reply->set_priority(m->get_priority());
+  reply->pgid = info.pgid;
+  reply->map_epoch = m->map_epoch;
+  reply->replies.swap(replies);
+  reply->compute_cost(g_ceph_context);
+
+  t->register_on_complete(new C_OSD_SendMessageOnConn(
+                           osd, reply, m->get_connection()));
+
+  osd->store->queue_transaction(osr.get(), t);
+}
+
+void ReplicatedPG::_do_pull_response(OpRequestRef op)
+{
+  MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
+  assert(m->get_header().type == MSG_OSD_PG_PUSH);
+  int from = m->get_source().num();
+
+  vector<PullOp> replies(1);
+  ObjectStore::Transaction *t = new ObjectStore::Transaction;
+  for (vector<PushOp>::iterator i = m->pushes.begin();
+       i != m->pushes.end();
+       ++i) {
+    bool more = handle_pull_response(from, *i, &(replies.back()), t);
+    if (more)
+      replies.push_back(PullOp());
+  }
+  replies.erase(replies.end() - 1);
+
+  if (replies.size()) {
+    MOSDPGPull *reply = new MOSDPGPull;
+    reply->set_priority(m->get_priority());
+    reply->pgid = info.pgid;
+    reply->map_epoch = m->map_epoch;
+    reply->pulls.swap(replies);
+    reply->compute_cost(g_ceph_context);
+
+    t->register_on_complete(new C_OSD_SendMessageOnConn(
+                             osd, reply, m->get_connection()));
+  }
+
+  osd->store->queue_transaction(osr.get(), t);
+}
+
+void ReplicatedPG::do_pull(OpRequestRef op)
+{
+  MOSDPGPull *m = static_cast<MOSDPGPull *>(op->request);
+  assert(m->get_header().type == MSG_OSD_PG_PULL);
+  int from = m->get_source().num();
+
+  map<int, vector<PushOp> > replies;
+  for (vector<PullOp>::iterator i = m->pulls.begin();
+       i != m->pulls.end();
+       ++i) {
+    replies[from].push_back(PushOp());
+    handle_pull(from, *i, &(replies[from].back()));
+  }
+  send_pushes(m->get_priority(), replies);
+}
+
+void ReplicatedPG::do_push_reply(OpRequestRef op)
+{
+  MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->request);
+  assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY);
+  int from = m->get_source().num();
+
+  vector<PushOp> replies(1);
+  for (vector<PushReplyOp>::iterator i = m->replies.begin();
+       i != m->replies.end();
+       ++i) {
+    bool more = handle_push_reply(from, *i, &(replies.back()));
+    if (more)
+      replies.push_back(PushOp());
+  }
+  replies.erase(replies.end() - 1);
+
+  map<int, vector<PushOp> > _replies;
+  _replies[from].swap(replies);
+  send_pushes(m->get_priority(), _replies);
+}
+
 void ReplicatedPG::do_backfill(OpRequestRef op)
 {
   MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->request);
index 0c7ba902328fee2061418f3b870154babeee9e55..2ec2ebc40f8c32e10f14fd15c5c6df0d1ac3c8db 100644 (file)
@@ -935,6 +935,17 @@ public:
   void do_sub_op_reply(OpRequestRef op);
   void do_scan(OpRequestRef op);
   void do_backfill(OpRequestRef op);
+  void _do_push(OpRequestRef op);
+  void _do_pull_response(OpRequestRef op);
+  void do_push(OpRequestRef op) {
+    if (is_primary()) {
+      _do_pull_response(op);
+    } else {
+      _do_push(op);
+    }
+  }
+  void do_pull(OpRequestRef op);
+  void do_push_reply(OpRequestRef op);
   RepGather *trim_object(const hobject_t &coid);
   void snap_trimmer();
   int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
index 7d1ed3108a0054511622533fd8d9bb0844353342..bfa3cdddaae63a1c12a01880e83798a90ce03848 100644 (file)
@@ -3048,6 +3048,7 @@ ostream &PushOp::print(ostream &out) const
     << "PushOp(" << soid
     << ", version: " << version
     << ", data_included: " << data_included
+    << ", data_size: " << data.length()
     << ", omap_header_size: " << omap_header.length()
     << ", omap_entries_size: " << omap_entries.size()
     << ", attrset_size: " << attrset.size()