From: Samuel Just Date: Mon, 17 Jun 2013 23:26:31 +0000 (-0700) Subject: ReplicatedPG: add handlers for MOSDPG(Push|Pull|PushReply) X-Git-Tag: v0.67-rc1~138^2~1^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ae1b2e97f52e3c4fa8683962369426c4609e8acf;p=ceph.git ReplicatedPG: add handlers for MOSDPG(Push|Pull|PushReply) Signed-off-by: Samuel Just --- diff --git a/src/osd/PG.cc b/src/osd/PG.cc index d925ff247207..cdea6b28166f 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -30,6 +30,9 @@ #include "messages/MOSDPGBackfill.h" #include "messages/MBackfillReserve.h" #include "messages/MRecoveryReserve.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPushReply.h" +#include "messages/MOSDPGPull.h" #include "messages/MOSDSubOp.h" #include "messages/MOSDSubOpReply.h" @@ -1429,6 +1432,23 @@ void PG::do_request(OpRequestRef op) do_backfill(op); break; + case MSG_OSD_PG_PUSH: + if (!is_active()) { + waiting_for_active.push_back(op); + op->mark_delayed("waiting for active"); + return; + } + do_push(op); + break; + + case MSG_OSD_PG_PULL: + do_pull(op); + break; + + case MSG_OSD_PG_PUSH_REPLY: + do_push_reply(op); + break; + default: assert(0 == "bad message type in do_request"); } @@ -4788,15 +4808,16 @@ bool PG::can_discard_op(OpRequestRef op) return false; } -bool PG::can_discard_subop(OpRequestRef op) +template +bool PG::can_discard_replica_op(OpRequestRef op) { - MOSDSubOp *m = static_cast(op->request); - assert(m->get_header().type == MSG_OSD_SUBOP); + T *m = static_cast(op->request); + assert(m->get_header().type == MSGTYPE); // same pg? // if pg changes _at all_, we reset and repeer! if (old_peering_msg(m->map_epoch, m->map_epoch)) { - dout(10) << "handle_sub_op pg changed " << info.history + dout(10) << "can_discard_replica_op pg changed " << info.history << " after " << m->map_epoch << ", dropping" << dendl; return true; @@ -4836,7 +4857,13 @@ bool PG::can_discard_request(OpRequestRef op) case CEPH_MSG_OSD_OP: return can_discard_op(op); case MSG_OSD_SUBOP: - return can_discard_subop(op); + return can_discard_replica_op(op); + case MSG_OSD_PG_PUSH: + return can_discard_replica_op(op); + case MSG_OSD_PG_PULL: + return can_discard_replica_op(op); + case MSG_OSD_PG_PUSH_REPLY: + return can_discard_replica_op(op); case MSG_OSD_SUBOPREPLY: return false; case MSG_OSD_PG_SCAN: @@ -4854,14 +4881,6 @@ bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits) switch (op->request->get_type()) { case CEPH_MSG_OSD_OP: return (static_cast(op->request)->get_pg().m_seed & mask) == match; - case MSG_OSD_SUBOP: - return false; - case MSG_OSD_SUBOPREPLY: - return false; - case MSG_OSD_PG_SCAN: - return false; - case MSG_OSD_PG_BACKFILL: - return false; } return false; } @@ -4893,6 +4912,21 @@ bool PG::op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op) return !have_same_or_newer_map( curmap, static_cast(op->request)->map_epoch); + + case MSG_OSD_PG_PUSH: + return !have_same_or_newer_map( + curmap, + static_cast(op->request)->map_epoch); + + case MSG_OSD_PG_PULL: + return !have_same_or_newer_map( + curmap, + static_cast(op->request)->map_epoch); + + case MSG_OSD_PG_PUSH_REPLY: + return !have_same_or_newer_map( + curmap, + static_cast(op->request)->map_epoch); } assert(0); return false; diff --git a/src/osd/PG.h b/src/osd/PG.h index c4e113bb9e8d..2239e6b12758 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1723,10 +1723,12 @@ public: // OpRequest queueing bool can_discard_op(OpRequestRef op); bool can_discard_scan(OpRequestRef op); - bool can_discard_subop(OpRequestRef op); bool can_discard_backfill(OpRequestRef op); bool can_discard_request(OpRequestRef op); + template + bool can_discard_replica_op(OpRequestRef op); + static bool op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op); static bool split_request(OpRequestRef op, unsigned match, unsigned bits); @@ -1778,6 +1780,9 @@ public: virtual void do_sub_op_reply(OpRequestRef op) = 0; virtual void do_scan(OpRequestRef op) = 0; virtual void do_backfill(OpRequestRef op) = 0; + virtual void do_push(OpRequestRef op) = 0; + virtual void do_pull(OpRequestRef op) = 0; + virtual void do_push_reply(OpRequestRef op) = 0; virtual void snap_trimmer() = 0; virtual int do_command(vector& cmd, ostream& ss, diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index d127ae09e0e8..a297f5df1711 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -35,6 +35,10 @@ #include "messages/MOSDPing.h" #include "messages/MWatchNotify.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPull.h" +#include "messages/MOSDPGPushReply.h" + #include "Watch.h" #include "mds/inode_backtrace.h" // Ugh @@ -1333,6 +1337,103 @@ void ReplicatedPG::do_scan(OpRequestRef op) } } +void ReplicatedPG::_do_push(OpRequestRef op) +{ + MOSDPGPush *m = static_cast(op->request); + assert(m->get_header().type == MSG_OSD_PG_PUSH); + int from = m->get_source().num(); + + vector replies; + ObjectStore::Transaction *t = new ObjectStore::Transaction; + for (vector::iterator i = m->pushes.begin(); + i != m->pushes.end(); + ++i) { + replies.push_back(PushReplyOp()); + handle_push(from, *i, &(replies.back()), t); + } + + MOSDPGPushReply *reply = new MOSDPGPushReply; + reply->set_priority(m->get_priority()); + reply->pgid = info.pgid; + reply->map_epoch = m->map_epoch; + reply->replies.swap(replies); + reply->compute_cost(g_ceph_context); + + t->register_on_complete(new C_OSD_SendMessageOnConn( + osd, reply, m->get_connection())); + + osd->store->queue_transaction(osr.get(), t); +} + +void ReplicatedPG::_do_pull_response(OpRequestRef op) +{ + MOSDPGPush *m = static_cast(op->request); + assert(m->get_header().type == MSG_OSD_PG_PUSH); + int from = m->get_source().num(); + + vector replies(1); + ObjectStore::Transaction *t = new ObjectStore::Transaction; + for (vector::iterator i = m->pushes.begin(); + i != m->pushes.end(); + ++i) { + bool more = handle_pull_response(from, *i, &(replies.back()), t); + if (more) + replies.push_back(PullOp()); + } + replies.erase(replies.end() - 1); + + if (replies.size()) { + MOSDPGPull *reply = new MOSDPGPull; + reply->set_priority(m->get_priority()); + reply->pgid = info.pgid; + reply->map_epoch = m->map_epoch; + reply->pulls.swap(replies); + reply->compute_cost(g_ceph_context); + + t->register_on_complete(new C_OSD_SendMessageOnConn( + osd, reply, m->get_connection())); + } + + osd->store->queue_transaction(osr.get(), t); +} + +void ReplicatedPG::do_pull(OpRequestRef op) +{ + MOSDPGPull *m = static_cast(op->request); + assert(m->get_header().type == MSG_OSD_PG_PULL); + int from = m->get_source().num(); + + map > replies; + for (vector::iterator i = m->pulls.begin(); + i != m->pulls.end(); + ++i) { + replies[from].push_back(PushOp()); + handle_pull(from, *i, &(replies[from].back())); + } + send_pushes(m->get_priority(), replies); +} + +void ReplicatedPG::do_push_reply(OpRequestRef op) +{ + MOSDPGPushReply *m = static_cast(op->request); + assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY); + int from = m->get_source().num(); + + vector replies(1); + for (vector::iterator i = m->replies.begin(); + i != m->replies.end(); + ++i) { + bool more = handle_push_reply(from, *i, &(replies.back())); + if (more) + replies.push_back(PushOp()); + } + replies.erase(replies.end() - 1); + + map > _replies; + _replies[from].swap(replies); + send_pushes(m->get_priority(), _replies); +} + void ReplicatedPG::do_backfill(OpRequestRef op) { MOSDPGBackfill *m = static_cast(op->request); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 0c7ba902328f..2ec2ebc40f8c 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -935,6 +935,17 @@ public: void do_sub_op_reply(OpRequestRef op); void do_scan(OpRequestRef op); void do_backfill(OpRequestRef op); + void _do_push(OpRequestRef op); + void _do_pull_response(OpRequestRef op); + void do_push(OpRequestRef op) { + if (is_primary()) { + _do_pull_response(op); + } else { + _do_push(op); + } + } + void do_pull(OpRequestRef op); + void do_push_reply(OpRequestRef op); RepGather *trim_object(const hobject_t &coid); void snap_trimmer(); int do_osd_ops(OpContext *ctx, vector& ops); diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 7d1ed3108a00..bfa3cdddaae6 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -3048,6 +3048,7 @@ ostream &PushOp::print(ostream &out) const << "PushOp(" << soid << ", version: " << version << ", data_included: " << data_included + << ", data_size: " << data.length() << ", omap_header_size: " << omap_header.length() << ", omap_entries_size: " << omap_entries.size() << ", attrset_size: " << attrset.size()