return pg;
}
+PG *OSD::get_pg_or_queue_for_pg(spg_t pgid, OpRequestRef op)
+{
+ {
+ RWLock::RLocker l(pg_map_lock);
+ if (pg_map.count(pgid))
+ return pg_map[pgid];
+ }
+ RWLock::WLocker l(pg_map_lock);
+ if (pg_map.count(pgid)) {
+ return pg_map[pgid];
+ } else {
+ waiting_for_pg[pgid].push_back(op);
+ return NULL;
+ }
+}
bool OSD::_have_pg(spg_t pgid)
{
return true;
};
-
void OSD::do_waiters()
{
assert(osd_lock.is_locked());
dout(10) << "do_waiters -- finish" << dendl;
}
+template<typename T, int MSGTYPE>
+epoch_t replica_op_required_epoch(OpRequestRef op)
+{
+ T *m = static_cast<T *>(op->get_req());
+ assert(m->get_header().type == MSGTYPE);
+ return m->map_epoch;
+}
+
+epoch_t op_required_epoch(OpRequestRef op)
+{
+ switch (op->get_req()->get_type()) {
+ case CEPH_MSG_OSD_OP: {
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+ return m->get_map_epoch();
+ }
+ case MSG_OSD_SUBOP:
+ return replica_op_required_epoch<MOSDSubOp, MSG_OSD_SUBOP>(op);
+ case MSG_OSD_SUBOPREPLY:
+ return replica_op_required_epoch<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(
+ op);
+ case MSG_OSD_PG_PUSH:
+ return replica_op_required_epoch<MOSDPGPush, MSG_OSD_PG_PUSH>(
+ op);
+ case MSG_OSD_PG_PULL:
+ return replica_op_required_epoch<MOSDPGPull, MSG_OSD_PG_PULL>(
+ op);
+ case MSG_OSD_PG_PUSH_REPLY:
+ return replica_op_required_epoch<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(
+ op);
+ case MSG_OSD_PG_SCAN:
+ return replica_op_required_epoch<MOSDPGScan, MSG_OSD_PG_SCAN>(op);
+ case MSG_OSD_PG_BACKFILL:
+ return replica_op_required_epoch<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(
+ op);
+ case MSG_OSD_EC_WRITE:
+ return replica_op_required_epoch<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
+ case MSG_OSD_EC_WRITE_REPLY:
+ return replica_op_required_epoch<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
+ case MSG_OSD_EC_READ:
+ return replica_op_required_epoch<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
+ case MSG_OSD_EC_READ_REPLY:
+ return replica_op_required_epoch<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
+ default:
+ assert(0);
+ return 0;
+ }
+}
+
void OSD::dispatch_op(OpRequestRef op)
{
switch (op->get_req()->get_type()) {
case MSG_OSD_RECOVERY_RESERVE:
handle_pg_recovery_reserve(op);
break;
+ default:
+ epoch_t msg_epoch(op_required_epoch(op));
+ if (msg_epoch > osdmap->get_epoch())
+ return false;
+ switch(op->get_req()->get_type()) {
// client ops
- case CEPH_MSG_OSD_OP:
- handle_op(op, osdmap);
- break;
+ case CEPH_MSG_OSD_OP:
+ handle_op(op, osdmap);
+ break;
- // for replication etc.
- case MSG_OSD_SUBOP:
- handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
- break;
- case MSG_OSD_SUBOPREPLY:
- handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
- break;
- case MSG_OSD_PG_PUSH:
- handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
- break;
- case MSG_OSD_PG_PULL:
- handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op, osdmap);
- break;
- case MSG_OSD_PG_PUSH_REPLY:
- handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op, osdmap);
- break;
- case MSG_OSD_PG_SCAN:
- handle_replica_op<MOSDPGScan, MSG_OSD_PG_SCAN>(op, osdmap);
- break;
- case MSG_OSD_PG_BACKFILL:
- handle_replica_op<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(op, osdmap);
- break;
- case MSG_OSD_EC_WRITE:
- handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op, osdmap);
- break;
- case MSG_OSD_EC_WRITE_REPLY:
- handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op, osdmap);
- break;
- case MSG_OSD_EC_READ:
- handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op, osdmap);
- break;
- case MSG_OSD_EC_READ_REPLY:
- handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
+ // for replication etc.
+ case MSG_OSD_SUBOP:
+ handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
+ break;
+ case MSG_OSD_SUBOPREPLY:
+ handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
+ break;
+ case MSG_OSD_PG_PUSH:
+ handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
+ break;
+ case MSG_OSD_PG_PULL:
+ handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op, osdmap);
+ break;
+ case MSG_OSD_PG_PUSH_REPLY:
+ handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op, osdmap);
+ break;
+ case MSG_OSD_PG_SCAN:
+ handle_replica_op<MOSDPGScan, MSG_OSD_PG_SCAN>(op, osdmap);
+ break;
+ case MSG_OSD_PG_BACKFILL:
+ handle_replica_op<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(op, osdmap);
+ break;
+ case MSG_OSD_EC_WRITE:
+ handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op, osdmap);
+ break;
+ case MSG_OSD_EC_WRITE_REPLY:
+ handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op, osdmap);
+ break;
+ case MSG_OSD_EC_READ:
+ handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op, osdmap);
+ break;
+ case MSG_OSD_EC_READ_REPLY:
+ handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
+ break;
+ default:
+ assert(0);
+ }
break;
}
}
// we don't need encoded payload anymore
m->clear_payload();
- // require same or newer map
- if (!require_same_or_newer_map(op, m->get_map_epoch()))
- return;
-
// object name too long?
if (m->get_oid().name.size() > MAX_CEPH_OBJECT_NAME_LEN) {
dout(4) << "handle_op '" << m->get_oid().name << "' is longer than "
return;
}
- // get and lock *pg.
- PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
- if (!pg) {
- dout(7) << "hit non-existent pg " << pgid << dendl;
-
- if (osdmap->get_pg_acting_role(pgid.pgid, whoami) >= 0) {
- dout(7) << "we are valid target for op, waiting" << dendl;
- waiting_for_pg[pgid].push_back(op);
- op->mark_delayed("waiting for pg to exist locally");
- return;
- }
+ OSDMapRef send_map = service.try_get_map(m->get_map_epoch());
+ // check send epoch
+ if (!send_map) {
- // okay, we aren't valid now; check send epoch
- if (m->get_map_epoch() < superblock.oldest_map) {
- dout(7) << "don't have sender's osdmap; assuming it was valid and that client will resend" << dendl;
- return;
- }
- OSDMapRef send_map = get_map(m->get_map_epoch());
-
- if (send_map->get_pg_acting_role(pgid.pgid, whoami) >= 0) {
- dout(7) << "dropping request; client will resend when they get new map" << dendl;
- } else if (!send_map->have_pg_pool(pgid.pool())) {
- dout(7) << "dropping request; pool did not exist" << dendl;
- clog.warn() << m->get_source_inst() << " invalid " << m->get_reqid()
- << " pg " << m->get_pg()
- << " to osd." << whoami
- << " in e" << osdmap->get_epoch()
- << ", client e" << m->get_map_epoch()
- << " when pool " << m->get_pg().pool() << " did not exist"
- << "\n";
- } else {
- dout(7) << "we are invalid target" << dendl;
- clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
- << " pg " << m->get_pg()
- << " to osd." << whoami
- << " in e" << osdmap->get_epoch()
- << ", client e" << m->get_map_epoch()
- << " pg " << pgid
- << " features " << m->get_connection()->get_features()
- << "\n";
- service.reply_op_error(op, -ENXIO);
- }
+ dout(7) << "don't have sender's osdmap; assuming it was valid and that client will resend" << dendl;
+ return;
+ }
+ if (!send_map->have_pg_pool(pgid.pool())) {
+ dout(7) << "dropping request; pool did not exist" << dendl;
+ clog.warn() << m->get_source_inst() << " invalid " << m->get_reqid()
+ << " pg " << m->get_pg()
+ << " to osd." << whoami
+ << " in e" << osdmap->get_epoch()
+ << ", client e" << m->get_map_epoch()
+ << " when pool " << m->get_pg().pool() << " did not exist"
+ << "\n";
+ return;
+ } else if (send_map->get_pg_acting_role(pgid.pgid, whoami) < 0) {
+ dout(7) << "we are invalid target" << dendl;
+ clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
+ << " pg " << m->get_pg()
+ << " to osd." << whoami
+ << " in e" << osdmap->get_epoch()
+ << ", client e" << m->get_map_epoch()
+ << " pg " << pgid
+ << " features " << m->get_connection()->get_features()
+ << "\n";
+ service.reply_op_error(op, -ENXIO);
return;
}
- enqueue_op(pg, op);
+ PG *pg = get_pg_or_queue_for_pg(pgid, op);
+ if (pg)
+ enqueue_op(pg, op);
}
template<typename T, int MSGTYPE>
// must be a rep op.
assert(m->get_source().is_osd());
- // require same or newer map
- if (!require_same_or_newer_map(op, m->map_epoch))
- return;
-
// share our map with sender, if they're old
Session *peer_session =
static_cast<Session*>(m->get_connection()->get_priv());
peer_session->put();
}
- // make sure we have the pg
- const spg_t pgid = m->pgid;
- if (service.splitting(pgid)) {
- waiting_for_pg[pgid].push_back(op);
- return;
- }
-
- PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
- if (!pg) {
- return;
- }
- enqueue_op(pg, op);
+ PG *pg = get_pg_or_queue_for_pg(m->pgid, op);
+ if (pg)
+ enqueue_op(pg, op);
}
bool OSD::op_is_discardable(MOSDOp *op)