// for replication etc.
case MSG_OSD_SUBOP:
- handle_sub_op(op);
+ handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op);
break;
case MSG_OSD_SUBOPREPLY:
- handle_sub_op_reply(op);
+ handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op);
+ break;
+ case MSG_OSD_PG_PUSH:
+ handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
+ break;
+ case MSG_OSD_PG_PULL:
+ handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op);
+ break;
+ case MSG_OSD_PG_PUSH_REPLY:
+ handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
break;
}
}
enqueue_op(pg, op);
}
-void OSD::handle_sub_op(OpRequestRef op)
+template<typename T, int MSGTYPE>
+void OSD::handle_replica_op(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
- assert(m->get_header().type == MSG_OSD_SUBOP);
+ T *m = static_cast<T *>(op->request);
+ assert(m->get_header().type == MSGTYPE);
- dout(10) << "handle_sub_op " << *m << " epoch " << m->map_epoch << dendl;
+ dout(10) << __func__ << *m << " epoch " << m->map_epoch << dendl;
if (m->map_epoch < up_epoch) {
dout(3) << "replica op from before up" << dendl;
return;
// must be a rep op.
assert(m->get_source().is_osd());
- // make sure we have the pg
- const pg_t pgid = m->pgid;
-
// require same or newer map
if (!require_same_or_newer_map(op, m->map_epoch))
return;
_share_map_incoming(m->get_source(), m->get_connection().get(), m->map_epoch,
static_cast<Session*>(m->get_connection()->get_priv()));
+ // make sure we have the pg
+ const pg_t pgid = m->pgid;
if (service.splitting(pgid)) {
waiting_for_pg[pgid].push_back(op);
return;
enqueue_op(pg, op);
}
-void OSD::handle_sub_op_reply(OpRequestRef op)
-{
- MOSDSubOpReply *m = static_cast<MOSDSubOpReply*>(op->request);
- assert(m->get_header().type == MSG_OSD_SUBOPREPLY);
- if (m->get_map_epoch() < up_epoch) {
- dout(3) << "replica op reply from before up" << dendl;
- return;
- }
-
- if (!require_osd_peer(op))
- return;
-
- // must be a rep op.
- assert(m->get_source().is_osd());
-
- // make sure we have the pg
- const pg_t pgid = m->get_pg();
-
- // require same or newer map
- if (!require_same_or_newer_map(op, m->get_map_epoch())) return;
-
- // share our map with sender, if they're old
- _share_map_incoming(m->get_source(), m->get_connection().get(), m->get_map_epoch(),
- static_cast<Session*>(m->get_connection()->get_priv()));
-
- PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
- if (!pg) {
- return;
- }
- enqueue_op(pg, op);
-}
-
bool OSD::op_is_discardable(MOSDOp *op)
{
// drop client request if they are not connected and can't get the