#include "messages/MOSDPGBackfill.h"
#include "messages/MBackfillReserve.h"
#include "messages/MRecoveryReserve.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+#include "messages/MOSDPGPull.h"
#include "messages/MOSDSubOp.h"
#include "messages/MOSDSubOpReply.h"
do_backfill(op);
break;
+ case MSG_OSD_PG_PUSH:
+ if (!is_active()) {
+ waiting_for_active.push_back(op);
+ op->mark_delayed("waiting for active");
+ return;
+ }
+ do_push(op);
+ break;
+
+ case MSG_OSD_PG_PULL:
+ do_pull(op);
+ break;
+
+ case MSG_OSD_PG_PUSH_REPLY:
+ do_push_reply(op);
+ break;
+
default:
assert(0 == "bad message type in do_request");
}
return false;
}
-bool PG::can_discard_subop(OpRequestRef op)
+template<typename T, int MSGTYPE>
+bool PG::can_discard_replica_op(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
- assert(m->get_header().type == MSG_OSD_SUBOP);
+ T *m = static_cast<T *>(op->request);
+ assert(m->get_header().type == MSGTYPE);
// same pg?
// if pg changes _at all_, we reset and repeer!
if (old_peering_msg(m->map_epoch, m->map_epoch)) {
- dout(10) << "handle_sub_op pg changed " << info.history
+ dout(10) << "can_discard_replica_op pg changed " << info.history
<< " after " << m->map_epoch
<< ", dropping" << dendl;
return true;
case CEPH_MSG_OSD_OP:
return can_discard_op(op);
case MSG_OSD_SUBOP:
- return can_discard_subop(op);
+ return can_discard_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op);
+ case MSG_OSD_PG_PUSH:
+ return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
+ case MSG_OSD_PG_PULL:
+ return can_discard_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
+ case MSG_OSD_PG_PUSH_REPLY:
+ return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
case MSG_OSD_SUBOPREPLY:
return false;
case MSG_OSD_PG_SCAN:
switch (op->request->get_type()) {
case CEPH_MSG_OSD_OP:
return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match;
- case MSG_OSD_SUBOP:
- return false;
- case MSG_OSD_SUBOPREPLY:
- return false;
- case MSG_OSD_PG_SCAN:
- return false;
- case MSG_OSD_PG_BACKFILL:
- return false;
}
return false;
}
return !have_same_or_newer_map(
curmap,
static_cast<MOSDPGBackfill*>(op->request)->map_epoch);
+
+ case MSG_OSD_PG_PUSH:
+ return !have_same_or_newer_map(
+ curmap,
+ static_cast<MOSDPGPush*>(op->request)->map_epoch);
+
+ case MSG_OSD_PG_PULL:
+ return !have_same_or_newer_map(
+ curmap,
+ static_cast<MOSDPGPull*>(op->request)->map_epoch);
+
+ case MSG_OSD_PG_PUSH_REPLY:
+ return !have_same_or_newer_map(
+ curmap,
+ static_cast<MOSDPGPushReply*>(op->request)->map_epoch);
}
assert(0);
return false;
// OpRequest queueing
bool can_discard_op(OpRequestRef op);
bool can_discard_scan(OpRequestRef op);
- bool can_discard_subop(OpRequestRef op);
bool can_discard_backfill(OpRequestRef op);
bool can_discard_request(OpRequestRef op);
+ template<typename T, int MSGTYPE>
+ bool can_discard_replica_op(OpRequestRef op);
+
static bool op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op);
static bool split_request(OpRequestRef op, unsigned match, unsigned bits);
virtual void do_sub_op_reply(OpRequestRef op) = 0;
virtual void do_scan(OpRequestRef op) = 0;
virtual void do_backfill(OpRequestRef op) = 0;
+ virtual void do_push(OpRequestRef op) = 0;
+ virtual void do_pull(OpRequestRef op) = 0;
+ virtual void do_push_reply(OpRequestRef op) = 0;
virtual void snap_trimmer() = 0;
virtual int do_command(vector<string>& cmd, ostream& ss,
#include "messages/MOSDPing.h"
#include "messages/MWatchNotify.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPushReply.h"
+
#include "Watch.h"
#include "mds/inode_backtrace.h" // Ugh
}
}
+void ReplicatedPG::_do_push(OpRequestRef op)
+{
+ MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
+ assert(m->get_header().type == MSG_OSD_PG_PUSH);
+ int from = m->get_source().num();
+
+ vector<PushReplyOp> replies;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ for (vector<PushOp>::iterator i = m->pushes.begin();
+ i != m->pushes.end();
+ ++i) {
+ replies.push_back(PushReplyOp());
+ handle_push(from, *i, &(replies.back()), t);
+ }
+
+ MOSDPGPushReply *reply = new MOSDPGPushReply;
+ reply->set_priority(m->get_priority());
+ reply->pgid = info.pgid;
+ reply->map_epoch = m->map_epoch;
+ reply->replies.swap(replies);
+ reply->compute_cost(g_ceph_context);
+
+ t->register_on_complete(new C_OSD_SendMessageOnConn(
+ osd, reply, m->get_connection()));
+
+ osd->store->queue_transaction(osr.get(), t);
+}
+
+void ReplicatedPG::_do_pull_response(OpRequestRef op)
+{
+ MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
+ assert(m->get_header().type == MSG_OSD_PG_PUSH);
+ int from = m->get_source().num();
+
+ vector<PullOp> replies(1);
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ for (vector<PushOp>::iterator i = m->pushes.begin();
+ i != m->pushes.end();
+ ++i) {
+ bool more = handle_pull_response(from, *i, &(replies.back()), t);
+ if (more)
+ replies.push_back(PullOp());
+ }
+ replies.erase(replies.end() - 1);
+
+ if (replies.size()) {
+ MOSDPGPull *reply = new MOSDPGPull;
+ reply->set_priority(m->get_priority());
+ reply->pgid = info.pgid;
+ reply->map_epoch = m->map_epoch;
+ reply->pulls.swap(replies);
+ reply->compute_cost(g_ceph_context);
+
+ t->register_on_complete(new C_OSD_SendMessageOnConn(
+ osd, reply, m->get_connection()));
+ }
+
+ osd->store->queue_transaction(osr.get(), t);
+}
+
+void ReplicatedPG::do_pull(OpRequestRef op)
+{
+ MOSDPGPull *m = static_cast<MOSDPGPull *>(op->request);
+ assert(m->get_header().type == MSG_OSD_PG_PULL);
+ int from = m->get_source().num();
+
+ map<int, vector<PushOp> > replies;
+ for (vector<PullOp>::iterator i = m->pulls.begin();
+ i != m->pulls.end();
+ ++i) {
+ replies[from].push_back(PushOp());
+ handle_pull(from, *i, &(replies[from].back()));
+ }
+ send_pushes(m->get_priority(), replies);
+}
+
+void ReplicatedPG::do_push_reply(OpRequestRef op)
+{
+ MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->request);
+ assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY);
+ int from = m->get_source().num();
+
+ vector<PushOp> replies(1);
+ for (vector<PushReplyOp>::iterator i = m->replies.begin();
+ i != m->replies.end();
+ ++i) {
+ bool more = handle_push_reply(from, *i, &(replies.back()));
+ if (more)
+ replies.push_back(PushOp());
+ }
+ replies.erase(replies.end() - 1);
+
+ map<int, vector<PushOp> > _replies;
+ _replies[from].swap(replies);
+ send_pushes(m->get_priority(), _replies);
+}
+
void ReplicatedPG::do_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->request);
void do_sub_op_reply(OpRequestRef op);
void do_scan(OpRequestRef op);
void do_backfill(OpRequestRef op);
+ void _do_push(OpRequestRef op);
+ void _do_pull_response(OpRequestRef op);
+ void do_push(OpRequestRef op) {
+ if (is_primary()) {
+ _do_pull_response(op);
+ } else {
+ _do_push(op);
+ }
+ }
+ void do_pull(OpRequestRef op);
+ void do_push_reply(OpRequestRef op);
RepGather *trim_object(const hobject_t &coid);
void snap_trimmer();
int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
<< "PushOp(" << soid
<< ", version: " << version
<< ", data_included: " << data_included
+ << ", data_size: " << data.length()
<< ", omap_header_size: " << omap_header.length()
<< ", omap_entries_size: " << omap_entries.size()
<< ", attrset_size: " << attrset.size()