}
}
+void OSD::ms_handle_fast_connect(Connection *con)
+{
+ if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
+ Session *s = static_cast<Session*>(con->get_priv());
+ if (!s) {
+ s = new Session;
+ con->set_priv(s->get());
+ s->con = con;
+ dout(10) << " new session (outgoing)" << s << " con=" << s->con
+ << " addr=" << s->con->get_peer_addr() << dendl;
+ // we don't connect to clients
+ assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
+ s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
+ }
+ }
+}
+
+void OSD::ms_handle_fast_accept(Connection *con)
+{
+ if (con->get_peer_type() != CEPH_ENTITY_TYPE_MON) {
+ Session *s = static_cast<Session*>(con->get_priv());
+ if (!s) {
+ s = new Session();
+ con->set_priv(s->get());
+ s->con = con;
+ dout(10) << "new session (incoming)" << s << " con=" << con
+ << " addr=" << con->get_peer_addr()
+ << " must have raced with connect" << dendl;
+ assert(con->get_peer_type() == CEPH_ENTITY_TYPE_OSD);
+ s->entity_name.set_type(CEPH_ENTITY_TYPE_OSD);
+ }
+ s->put();
+ }
+}
+
bool OSD::ms_handle_reset(Connection *con)
{
OSD::Session *session = (OSD::Session *)con->get_priv();
return true;
}
+void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
+{
+ for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
+ i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
+ session->waiting_on_map.erase(i++));
+
+ if (session->waiting_on_map.empty()) {
+ clear_session_waiting_on_map(session);
+ } else {
+ register_session_waiting_on_map(session);
+ }
+}
+
+void OSD::ms_fast_dispatch(Message *m)
+{
+ OpRequestRef op = op_tracker.create_request<OpRequest>(m);
+ OSDMapRef nextmap = service.get_nextmap_reserved();
+ Session *session = static_cast<Session*>(m->get_connection()->get_priv());
+ assert(session);
+ {
+ Mutex::Locker l(session->session_dispatch_lock);
+ session->waiting_on_map.push_back(op);
+ dispatch_session_waiting(session, nextmap);
+ }
+ session->put();
+ service.release_map(nextmap);
+}
+
+void OSD::ms_fast_preprocess(Message *m)
+{
+ if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_OSD) {
+ if (m->get_type() == CEPH_MSG_OSD_MAP) {
+ MOSDMap *mm = static_cast<MOSDMap*>(m);
+ Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+ s->received_map_lock.Lock();
+ s->received_map_epoch = mm->get_last();
+ s->received_map_lock.Unlock();
+ s->put();
+ }
+ }
+}
+
bool OSD::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
{
dout(10) << "OSD::ms_get_authorizer type=" << ceph_entity_type_name(dest_type) << dendl;
case MSG_OSD_RECOVERY_RESERVE:
handle_pg_recovery_reserve(op);
break;
- default:
- epoch_t msg_epoch(op_required_epoch(op));
- if (msg_epoch > osdmap->get_epoch())
- return false;
+ }
+}
- switch(op->get_req()->get_type()) {
- // client ops
- case CEPH_MSG_OSD_OP:
- handle_op(op, osdmap);
- break;
+bool OSD::dispatch_op_fast(OpRequestRef op, OSDMapRef osdmap) {
+ if (is_stopping()) {
+ // we're shutting down, so drop the op
+ return true;
+ }
- // for replication etc.
- case MSG_OSD_SUBOP:
- handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
- break;
- case MSG_OSD_SUBOPREPLY:
- handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
- break;
- case MSG_OSD_PG_PUSH:
- handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
- break;
- case MSG_OSD_PG_PULL:
- handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op, osdmap);
- break;
- case MSG_OSD_PG_PUSH_REPLY:
- handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op, osdmap);
- break;
- case MSG_OSD_PG_SCAN:
- handle_replica_op<MOSDPGScan, MSG_OSD_PG_SCAN>(op, osdmap);
- break;
- case MSG_OSD_PG_BACKFILL:
- handle_replica_op<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(op, osdmap);
- break;
- case MSG_OSD_EC_WRITE:
- handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op, osdmap);
- break;
- case MSG_OSD_EC_WRITE_REPLY:
- handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op, osdmap);
- break;
- case MSG_OSD_EC_READ:
- handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op, osdmap);
- break;
- case MSG_OSD_EC_READ_REPLY:
- handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
- break;
- default:
- assert(0);
+ epoch_t msg_epoch(op_required_epoch(op));
+ if (msg_epoch > osdmap->get_epoch()) {
+ Session *s = static_cast<Session*>(op->get_req()->
+ get_connection()->get_priv());
+ s->received_map_lock.Lock();
+ epoch_t received_epoch = s->received_map_epoch;
+ s->received_map_lock.Unlock();
+ if (received_epoch < msg_epoch) {
+ osdmap_subscribe(msg_epoch, false);
}
+ s->put();
+ return false;
+ }
+
+ switch(op->get_req()->get_type()) {
+ // client ops
+ case CEPH_MSG_OSD_OP:
+ handle_op(op, osdmap);
+ break;
+ // for replication etc.
+ case MSG_OSD_SUBOP:
+ handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
break;
+ case MSG_OSD_SUBOPREPLY:
+ handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
+ break;
+ case MSG_OSD_PG_PUSH:
+ handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
+ break;
+ case MSG_OSD_PG_PULL:
+ handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op, osdmap);
+ break;
+ case MSG_OSD_PG_PUSH_REPLY:
+ handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op, osdmap);
+ break;
+ case MSG_OSD_PG_SCAN:
+ handle_replica_op<MOSDPGScan, MSG_OSD_PG_SCAN>(op, osdmap);
+ break;
+ case MSG_OSD_PG_BACKFILL:
+ handle_replica_op<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(op, osdmap);
+ break;
+ case MSG_OSD_EC_WRITE:
+ handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op, osdmap);
+ break;
+ case MSG_OSD_EC_WRITE_REPLY:
+ handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op, osdmap);
+ break;
+ case MSG_OSD_EC_READ:
+ handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op, osdmap);
+ break;
+ case MSG_OSD_EC_READ_REPLY:
+ handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
+ break;
+ default:
+ assert(0);
}
+ return true;
}
void OSD::_dispatch(Message *m)
service.await_reserved_maps();
service.publish_map(osdmap);
+ set<Session*> sessions_to_check;
+ get_sessions_waiting_for_map(&sessions_to_check);
+ for (set<Session*>::iterator i = sessions_to_check.begin();
+ i != sessions_to_check.end();
+ sessions_to_check.erase(i++)) {
+ (*i)->session_dispatch_lock.Lock();
+ dispatch_session_waiting(*i, osdmap);
+ (*i)->session_dispatch_lock.Unlock();
+ (*i)->put();
+ }
+
// scan pg's
{
RWLock::RLocker l(pg_map_lock);
<< " expected " << (osdmap->have_inst(from) ? osdmap->get_cluster_addr(from) : entity_addr_t())
<< dendl;
ConnectionRef con = m->get_connection();
- con->set_priv(NULL); // break ref <-> session cycle, if any
cluster_messenger->mark_down(con.get());
+ Session *s = static_cast<Session*>(con->get_priv());
+ if (s) {
+ s->session_dispatch_lock.Lock();
+ clear_session_waiting_on_map(s);
+ con->set_priv(NULL); // break ref <-> session cycle, if any
+ s->session_dispatch_lock.Unlock();
+ s->put();
+ }
return false;
}
}
return;
}
}
+
// calc actual pgid
pg_t _pgid = m->get_pg();
int64_t pool = _pgid.pool();
void tick();
void _dispatch(Message *m);
void dispatch_op(OpRequestRef op);
+ bool dispatch_op_fast(OpRequestRef op, OSDMapRef osdmap);
void check_osdmap_features(ObjectStore *store);
Mutex sent_epoch_lock;
epoch_t last_sent_epoch;
+ Mutex received_map_lock;
+ epoch_t received_map_epoch; // largest epoch seen in MOSDMap from here
Session() :
auid(-1), con(0),
session_dispatch_lock("Session::session_dispatch_lock"),
- sent_epoch_lock("Session::sent_epoch_lock"),
- last_sent_epoch(0)
+ sent_epoch_lock("Session::sent_epoch_lock"), last_sent_epoch(0),
+ received_map_lock("Session::received_map_lock"), received_map_epoch(0)
{}
};
+ void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
Mutex session_waiting_for_map_lock;
set<Session*> session_waiting_for_map;
/// Caller assumes refs for included Sessions
}
private:
+ bool ms_can_fast_dispatch_any() const { return true; }
+ bool ms_can_fast_dispatch(Message *m) const {
+ switch (m->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ case MSG_OSD_SUBOP:
+ case MSG_OSD_SUBOPREPLY:
+ case MSG_OSD_PG_PUSH:
+ case MSG_OSD_PG_PULL:
+ case MSG_OSD_PG_PUSH_REPLY:
+ case MSG_OSD_PG_SCAN:
+ case MSG_OSD_PG_BACKFILL:
+ case MSG_OSD_EC_WRITE:
+ case MSG_OSD_EC_WRITE_REPLY:
+ case MSG_OSD_EC_READ:
+ case MSG_OSD_EC_READ_REPLY:
+ return true;
+ default:
+ return false;
+ }
+ }
+ void ms_fast_dispatch(Message *m);
+ void ms_fast_preprocess(Message *m);
bool ms_dispatch(Message *m);
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new);
bool ms_verify_authorizer(Connection *con, int peer_type,
int protocol, bufferlist& authorizer, bufferlist& authorizer_reply,
bool& isvalid, CryptoKey& session_key);
void ms_handle_connect(Connection *con);
+ void ms_handle_fast_connect(Connection *con);
+ void ms_handle_fast_accept(Connection *con);
bool ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con) {}